bark-wallet 0.1.3

Wallet library and CLI for the bitcoin Ark protocol built by Second
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
use std::str::FromStr;

use anyhow::Context;
use ark::arkoor::package::ArkoorPackageBuilder;
use bitcoin::{Amount, SignedAmount};
use bitcoin::hex::DisplayHex;
use futures::StreamExt;
use lightning_invoice::Bolt11Invoice;
use log::{trace, debug, info, warn};

use ark::{ProtocolEncoding, Vtxo, VtxoPolicy};
use ark::attestations::{LightningReceiveAttestation};
use ark::fees::validate_and_subtract_fee;
use ark::lightning::{Bolt11InvoiceExt, PaymentHash, Preimage};
use bitcoin_ext::{BlockDelta, BlockHeight};
use server_rpc::protos;
use server_rpc::protos::prepare_lightning_receive_claim_request::LightningReceiveAntiDos;

use crate::subsystem::{LightningMovement, LightningReceiveMovement, Subsystem};
use crate::{Wallet, error};
use crate::movement::{MovementDestination, MovementStatus};
use crate::movement::update::MovementUpdate;
use crate::persist::models::LightningReceive;

/// Leniency delta to allow claim when blocks were mined between htlc
/// receive and claim preparation
const LIGHTNING_PREPARE_CLAIM_DELTA: BlockDelta = 2;

impl Wallet {
	/// Fetches all pending lightning receives ordered from newest to oldest.
	pub async fn pending_lightning_receives(&self) -> anyhow::Result<Vec<LightningReceive>> {
		Ok(self.db.get_all_pending_lightning_receives().await?)
	}

	/// Calculates how much balance can currently be claimed via inbound lightning payments.
	/// Invoices which have yet to be paid are not including in this.
	pub async fn claimable_lightning_receive_balance(&self) -> anyhow::Result<Amount> {
		let receives = self.pending_lightning_receives().await?;

		let mut total = Amount::ZERO;
		for receive in receives {
			total += receive.htlc_vtxos.iter().map(|v| v.amount()).sum::<Amount>();
		}

		Ok(total)
	}

	/// Create, store and return a [Bolt11Invoice] for offchain boarding
	pub async fn bolt11_invoice(&self, amount: Amount) -> anyhow::Result<Bolt11Invoice> {
		if amount == Amount::ZERO {
			bail!("Cannot create invoice for 0 sats (this would create an explicit 0 sat invoice, not an any-amount invoice)");
		}

		let (mut srv, ark_info) = self.require_server().await?;
		let config = self.config();

		// Calculate and validate lightning receive fees
		let fee = ark_info.fees.lightning_receive.calculate(amount).context("fee overflowed")?;
		validate_and_subtract_fee(amount, fee)?;

		// User needs to enfore the following delta:
		// - vtxo exit delta + htlc expiry delta (to give him time to exit the vtxo before htlc expires)
		// - vtxo exit margin (to give him time to exit the vtxo before htlc expires)
		// - htlc recv claim delta (to give him time to claim the htlc before it expires)
		let requested_min_cltv_delta = ark_info.vtxo_exit_delta +
			ark_info.htlc_expiry_delta +
			config.vtxo_exit_margin +
			config.htlc_recv_claim_delta +
			LIGHTNING_PREPARE_CLAIM_DELTA;

		if requested_min_cltv_delta > ark_info.max_user_invoice_cltv_delta {
			bail!("HTLC CLTV delta ({}) is greater than Server's max HTLC recv CLTV delta: {}",
				requested_min_cltv_delta,
				ark_info.max_user_invoice_cltv_delta,
			);
		}

		let preimage = Preimage::random();
		let payment_hash = preimage.compute_payment_hash();
		info!("Start bolt11 board with preimage / payment hash: {} / {}",
			preimage.as_hex(), payment_hash.as_hex());

		let mailbox_kp = self.seed.to_mailbox_keypair();
		let mailbox_id = ark::mailbox::MailboxIdentifier::from_pubkey(mailbox_kp.public_key());

		let req = protos::StartLightningReceiveRequest {
			payment_hash: payment_hash.to_vec(),
			amount_sat: amount.to_sat(),
			min_cltv_delta: requested_min_cltv_delta as u32,
			mailbox_id: Some(mailbox_id.to_vec()),
		};

		let resp = srv.client.start_lightning_receive(req).await?.into_inner();
		info!("Ark Server is ready to receive LN payment to invoice: {}.", resp.bolt11);

		let invoice = Bolt11Invoice::from_str(&resp.bolt11)
			.context("invalid bolt11 invoice returned by Ark server")?;

		self.db.store_lightning_receive(
			payment_hash,
			preimage,
			&invoice,
			requested_min_cltv_delta,
		).await?;

		Ok(invoice)
	}

	/// Fetches the status of a lightning receive for the given [PaymentHash].
	pub async fn lightning_receive_status(
		&self,
		payment: impl Into<PaymentHash>,
	) -> anyhow::Result<Option<LightningReceive>> {
		Ok(self.db.fetch_lightning_receive_by_payment_hash(payment.into()).await?)
	}

	/// Claim given incoming lightning payment.
	///
	/// This function reveals the preimage of the lightning payment in
	/// exchange of getting pubkey VTXOs from HTLC ones
	///
	/// # Returns
	///
	/// Returns an `anyhow::Result<()>`, which is:
	/// * `Ok(())` if the process completes successfully.
	///   the receive object is also updated correctly
	/// * `Err` if an error occurs at any stage of the operation.
	///
	/// # Remarks
	///
	/// * The list of HTLC VTXOs must have the hash lock clause matching the given
	///   [PaymentHash].
	/// * The preimage is revealed to the server before the cosign response is
	///   received. If the call fails after that point, the server's
	///   `claim_lightning_receive` is idempotent so this method can be retried
	///   to obtain fresh cosign signatures.
	async fn claim_lightning_receive(
		&self,
		receive: &mut LightningReceive,
	) -> anyhow::Result<()> {
		let movement_id = receive.movement_id
			.context("No movement created for lightning receive")?;
		let (mut srv, _) = self.require_server().await?;

		// order inputs by vtxoid before we generate nonces
		let inputs = {
			ensure!(!receive.htlc_vtxos.is_empty(), "no HTLC VTXOs set on record yet");
			let mut ret = receive.htlc_vtxos.iter().map(|v| &v.vtxo).collect::<Vec<_>>();
			ret.sort_by_key(|v| v.id());
			ret
		};

		let mut keypairs = Vec::with_capacity(inputs.len());
		for v in &inputs {
			keypairs.push(self.get_vtxo_key(*v).await?);
		}

		// Claiming arkoor against preimage
		let (claim_keypair, _) = self.derive_store_next_keypair().await?;
		let receive_policy = VtxoPolicy::new_pubkey(claim_keypair.public_key());

		trace!("ln arkoor builder params: inputs: {:?}; policy: {:?}",
			inputs.iter().map(|v| v.id()).collect::<Vec<_>>(), receive_policy,
		);
		let builder = ArkoorPackageBuilder::new_claim_all_without_checkpoints(
			inputs.iter().copied().cloned(),
			receive_policy.clone(),
		).context("creating claim arkoor builder failed")?;
		let builder = builder.generate_user_nonces(&keypairs)
			.context("arkoor nonce generation for claim failed")?;

		info!("Claiming arkoor against payment preimage");
		self.db.set_preimage_revealed(receive.payment_hash).await?;
		let package_cosign_request = protos::ArkoorPackageCosignRequest::from(
			builder.cosign_request(),
		);
		let resp = srv.client.claim_lightning_receive(protos::ClaimLightningReceiveRequest {
			payment_hash: receive.payment_hash.to_byte_array().to_vec(),
			payment_preimage: receive.payment_preimage.to_vec(),
			cosign_request: Some(package_cosign_request),
		}).await?.into_inner();
		let cosign_resp = resp.try_into().context("invalid cosign response")?;

		let outputs = builder.user_cosign(&keypairs, cosign_resp)
			.context("claim arkoor cosign failed with user response")?
			.build_signed_vtxos();

		let mut effective_balance = Amount::ZERO;
		for vtxo in &outputs {
			// NB: bailing here results in vtxos not being registered despite the
			// preimage being revealed.  The server's claim_lightning_receive is
			// idempotent, so bark can retry and obtain fresh cosign signatures,
			// but if all retries fail the user will be forced to exit on-chain.
			trace!("Validating Lightning receive claim VTXO {}: {}",
				vtxo.id(), vtxo.serialize_hex(),
			);
			self.validate_vtxo(vtxo).await
				.context("invalid arkoor from lightning receive")?;
			effective_balance += vtxo.amount();
		}

		self.store_spendable_vtxos(&outputs).await?;
		self.mark_vtxos_as_spent(inputs).await?;

		info!("Got arkoors from lightning: {}",
			outputs.iter().map(|v| v.id().to_string()).collect::<Vec<_>>().join(", ")
		);

		self.movements.finish_movement_with_update(
			movement_id,
			MovementStatus::Successful,
			MovementUpdate::new()
				.effective_balance(effective_balance.to_signed()?)
				.produced_vtxos(&outputs)
		).await?;

		self.db.finish_pending_lightning_receive(receive.payment_hash).await?;
		*receive = self.db.fetch_lightning_receive_by_payment_hash(receive.payment_hash).await
			.context("Database error")?
			.context("Receive not found")?;

		Ok(())
	}

	async fn compute_lightning_receive_anti_dos(
		&self,
		payment_hash: PaymentHash,
		token: Option<&str>,
	) -> anyhow::Result<LightningReceiveAntiDos> {
		Ok(if let Some(token) = token {
			LightningReceiveAntiDos::Token(token.to_string())
		} else {
			// We get an existing VTXO as an anti-dos measure.
			let vtxo = self.select_vtxos_to_cover(Amount::ONE_SAT).await
				.and_then(|vtxos| vtxos.into_iter().next()
					.context("have no spendable vtxo to prove ownership of")
				)?;
			let vtxo_keypair = self.get_vtxo_key(&vtxo).await.expect("owned vtxo should be in database");
			let attestation = LightningReceiveAttestation::new(payment_hash, vtxo.id(), &vtxo_keypair);
			LightningReceiveAntiDos::InputVtxo(protos::InputVtxo {
				vtxo_id: vtxo.id().to_bytes().to_vec(),
				attestation: attestation.serialize(),
			})
		})
	}

	/// Check for incoming lightning payment with the given [PaymentHash].
	///
	/// This function checks for an incoming lightning payment with the
	/// given [PaymentHash] and returns the HTLC VTXOs that are associated
	/// with it.
	///
	/// # Arguments
	///
	/// * `payment_hash` - The [PaymentHash] of the lightning payment
	/// to check for.
	/// * `wait` - Whether to wait for the payment to be initiated by the sender.
	/// * `token` - An optional lightning receive token used to authenticate a lightning
	/// receive when no spendable VTXOs are owned by this wallet.
	///
	/// # Returns
	///
	/// Returns an `anyhow::Result<Option<LightningReceive>>`, which is:
	/// * `Ok(Some(lightning_receive))` if the payment was initiated by
	///   the sender and the HTLC VTXOs were successfully prepared.
	/// * `Ok(None)` if the payment was not initiated by the sender or
	///   the payment was canceled by server.
	/// * `Err` if an error occurs at any stage of the operation.
	///
	/// # Remarks
	///
	/// * The invoice must contain an explicit amount specified in milli-satoshis.
	/// * The HTLC expiry height is calculated by adding the servers' HTLC expiry delta to the
	///   current chain tip.
	/// * The payment hash must be from an invoice previously generated using
	///   [Wallet::bolt11_invoice].
	async fn check_lightning_receive(
		&self,
		payment_hash: PaymentHash,
		wait: bool,
		token: Option<&str>,
	) -> anyhow::Result<Option<LightningReceive>> {
		let (mut srv, ark_info) = self.require_server().await?;
		let current_height = self.chain.tip().await?;

		let mut receive = self.db.fetch_lightning_receive_by_payment_hash(payment_hash).await?
			.context("no pending lightning receive found for payment hash, might already be claimed")?;

		// If we have already HTLC VTXOs stored, we can return them without asking the server
		if !receive.htlc_vtxos.is_empty() {
			return Ok(Some(receive))
		}

		trace!("Requesting updates for ln-receive to server with for wait={} and hash={}", wait, payment_hash);
		let sub = srv.client.check_lightning_receive(protos::CheckLightningReceiveRequest {
			hash: payment_hash.to_byte_array().to_vec(), wait,
		}).await?.into_inner();


		let status = protos::LightningReceiveStatus::try_from(sub.status)
			.with_context(|| format!("unknown payment status: {}", sub.status))?;

		debug!("Received status {:?} for {}", status, payment_hash);
		match status {
			// this is the good case
			protos::LightningReceiveStatus::Accepted |
			protos::LightningReceiveStatus::HtlcsReady => {},
			protos::LightningReceiveStatus::Created => {
				return Ok(None);
			},
			protos::LightningReceiveStatus::Settled => bail!("payment already settled"),
			protos::LightningReceiveStatus::Canceled => {
				warn!("payment was canceled. removing pending lightning receive");
				self.exit_or_cancel_lightning_receive(&receive).await?;
				return Ok(None);
			},
		}

		let lightning_receive_anti_dos = match self.compute_lightning_receive_anti_dos(
			payment_hash, token,
		).await {
			Ok(anti_dos) => Some(anti_dos),
			Err(e) => {
				warn!("Could not compute anti-dos: {e:#}. Trying without");
				None
			},
		};

		let htlc_recv_expiry = current_height + receive.htlc_recv_cltv_delta as BlockHeight;

		let (next_keypair, _) = self.derive_store_next_keypair().await?;
		let req = protos::PrepareLightningReceiveClaimRequest {
			payment_hash: receive.payment_hash.to_vec(),
			user_pubkey: next_keypair.public_key().serialize().to_vec(),
			htlc_recv_expiry,
			lightning_receive_anti_dos,
		};
		let res = srv.client.prepare_lightning_receive_claim(req).await
			.context("error preparing lightning receive claim")?.into_inner();
		let vtxos = res.htlc_vtxos.into_iter()
			.map(|b| Vtxo::deserialize(&b))
			.collect::<Result<Vec<_>, _>>()
			.context("invalid htlc vtxos from server")?;

		// sanity check the vtxos
		let mut htlc_amount = Amount::ZERO;
		for vtxo in &vtxos {
			trace!("Received HTLC VTXO {} from server: {}", vtxo.id(), vtxo.serialize_hex());
			self.validate_vtxo(vtxo).await
				.context("received invalid HTLC VTXO from server")?;
			htlc_amount += vtxo.amount();

			if let VtxoPolicy::ServerHtlcRecv(p) = vtxo.policy() {
				if p.payment_hash != receive.payment_hash {
					bail!("invalid payment hash on HTLC VTXOs received from server: {}",
						p.payment_hash,
					);
				}
				if p.user_pubkey != next_keypair.public_key() {
					bail!("invalid pubkey on HTLC VTXOs received from server: {}", p.user_pubkey);
				}
				if p.htlc_expiry < htlc_recv_expiry {
					bail!("HTLC VTXO expiry height is less than requested: Requested {}, received {}", htlc_recv_expiry, p.htlc_expiry);
				}
			} else {
				bail!("invalid HTLC VTXO policy: {:?}", vtxo.policy());
			}
		}

		// Check that the sum exceeds the invoice amount; we can't entirely trust the
		// server-reported payment amount, so if there is a discrepancy, we should fall back to
		// checking the invoice amount.
		let invoice_amount = receive.invoice.get_payment_amount(None)
			.context("ln receive invoice should have amount")?;
		let server_received_amount = res.receive.map(|r| Amount::from_sat(r.amount_sat));
		let fee = {
			let fee = server_received_amount
				.and_then(|a| ark_info.fees.lightning_receive.calculate(a));
			match (server_received_amount, fee) {
				(Some(amount), Some(fee)) if htlc_amount + fee == amount => {
					// If this is true then the server is telling the truth.
					fee
				},
				_ => {
					// We should verify against the invoice amount instead. Unfortunately, that
					// means the fee value in the movement won't be entirely accurate, however, it's
					// better to avoid rejecting payments when we have received enough to cover an
					// invoice.
					ark_info.fees.lightning_receive.calculate(invoice_amount)
						.expect("we previously validated this")
				}
			}
		};
		let received = htlc_amount + fee;
		ensure!(received >= invoice_amount,
			"Server didn't return enough VTXOs to cover invoice amount"
		);

		let movement_id = if let Some(movement_id) = receive.movement_id {
			movement_id
		} else {
			self.movements.new_movement_with_update(
				Subsystem::LIGHTNING_RECEIVE,
				LightningReceiveMovement::Receive.to_string(),
				MovementUpdate::new()
					.intended_balance(invoice_amount.to_signed()?)
					.effective_balance(htlc_amount.to_signed()?)
					.fee(fee)
					.metadata(LightningMovement::metadata(
						receive.payment_hash, &vtxos, Some(receive.payment_preimage),
					))
					.received_on(
						[MovementDestination::new(receive.invoice.clone().into(), received)],
					),
			).await?
		};
		self.store_locked_vtxos(&vtxos, Some(movement_id)).await?;

		let vtxo_ids = vtxos.iter().map(|v| v.id()).collect::<Vec<_>>();
		self.db.update_lightning_receive(payment_hash, &vtxo_ids, movement_id).await?;

		let mut wallet_vtxos = vec![];
		for vtxo in vtxos {
			let v =  self.db.get_wallet_vtxo(vtxo.id()).await?
				.context("Failed to get wallet VTXO for lightning receive")?;
			wallet_vtxos.push(v);
		}

		receive.htlc_vtxos = wallet_vtxos;
		receive.movement_id = Some(movement_id);

		Ok(Some(receive))
	}

	/// Exit HTLC-recv VTXOs when preimage has been disclosed but the claim failed.
	///
	/// NOTE: Calling this function will always result in the HTLC VTXO being exited
	/// regardless of the presence of the `preimage_revealed_at` field of
	/// the `lightning_receive` struct.
	async fn exit_lightning_receive(
		&self,
		lightning_receive: &LightningReceive,
	) -> anyhow::Result<()> {
		ensure!(!lightning_receive.htlc_vtxos.is_empty(), "no HTLC VTXOs to exit");
		let vtxos = lightning_receive.htlc_vtxos.iter().map(|v| &v.vtxo).collect::<Vec<_>>();

		info!("Exiting HTLC VTXOs for lightning_receive with payment hash {}", lightning_receive.payment_hash);
		self.exit.write().await.start_exit_for_vtxos(&vtxos).await?;

		if let Some(movement_id) = lightning_receive.movement_id {
			self.movements.finish_movement_with_update(
				movement_id,
				MovementStatus::Failed,
				MovementUpdate::new().exited_vtxos(vtxos),
			).await?;
		} else {
			error!("movement id is missing but we disclosed preimage: {}", lightning_receive.payment_hash);
		}

		self.db.finish_pending_lightning_receive(lightning_receive.payment_hash).await?;
		Ok(())
	}

	pub(crate) async fn exit_or_cancel_lightning_receive(
		&self,
		lightning_receive: &LightningReceive,
	) -> anyhow::Result<()> {
		let vtxos = &lightning_receive.htlc_vtxos;

		let update_opt = match (vtxos.is_empty(), lightning_receive.preimage_revealed_at) {
			(false, Some(_)) => {
				return self.exit_lightning_receive(lightning_receive).await;
			}
			(false, None) => {
				warn!("HTLC-recv VTXOs are about to expire, but preimage has not been disclosed yet. Canceling");
				self.mark_vtxos_as_spent(vtxos).await?;
				if let Some(movement_id) = lightning_receive.movement_id {
					Some((
						movement_id,
						MovementUpdate::new()
							.effective_balance(SignedAmount::ZERO),
						MovementStatus::Canceled,
					))
				} else {
					error!("movement id is missing but we got HTLC vtxos: {}", lightning_receive.payment_hash);
					None
				}
			}
			(true, Some(_)) => {
				error!("No HTLC vtxos set on ln receive but preimage has been disclosed. Canceling");
				lightning_receive.movement_id.map(|id| (id,
					MovementUpdate::new()
						.effective_balance(SignedAmount::ZERO),
					MovementStatus::Canceled,
				))
			}
			(true, None) => None,
		};

		if let Some((movement_id, update, status)) = update_opt {
			self.movements.finish_movement_with_update(movement_id, status, update).await?;
		}

		self.db.finish_pending_lightning_receive(lightning_receive.payment_hash).await?;

		Ok(())
	}

	/// Cancel a pending lightning receive.
	///
	/// This asks the server to cancel the hold invoice. The server will
	/// refuse if HTLC-recv vtxos have already been granted.
	///
	/// Bark additionally prevents cancellation when the preimage has
	/// already been revealed, since revealing the preimage means the
	/// sender's payment is in flight and cancelling would lose funds.
	pub async fn cancel_lightning_receive(
		&self,
		payment_hash: PaymentHash,
	) -> anyhow::Result<()> {
		let receive = self.db.fetch_lightning_receive_by_payment_hash(payment_hash).await?
			.context("no pending lightning receive found for this payment hash")?;

		if receive.preimage_revealed_at.is_some() {
			bail!("cannot cancel: preimage has already been revealed");
		}

		if receive.finished_at.is_some() {
			bail!("lightning receive is already finished");
		}

		let (mut srv, _) = self.require_server().await?;
		srv.client.cancel_lightning_receive(protos::CancelLightningReceiveRequest {
			payment_hash: payment_hash.to_vec(),
		}).await.context("server refused cancellation")?;

		// Clean up local state: mark htlc vtxos as spent and finish the receive
		self.exit_or_cancel_lightning_receive(&receive).await?;

		Ok(())
	}

	/// Check and claim a Lightning receive
	///
	/// This function checks for an incoming lightning payment with the given [PaymentHash]
	/// and then claims the payment using returned HTLC VTXOs.
	///
	/// # Arguments
	///
	/// * `payment_hash` - The [PaymentHash] of the lightning payment
	/// to check for.
	/// * `wait` - Whether to wait for the payment to be received.
	/// * `token` - An optional lightning receive token used to authenticate a lightning
	/// receive when no spendable VTXOs are owned by this wallet.
	///
	/// # Returns
	///
	/// Returns an `anyhow::Result<LightningReceive>`, which is:
	/// * `Ok(LightningReceive)` if the claim was completed or is awaiting HTLC VTXOs
	/// * `Err` if an error occurs at any stage of the operation.
	///
	/// # Remarks
	///
	/// * The payment hash must be from an invoice previously generated using
	///   [Wallet::bolt11_invoice].
	pub async fn try_claim_lightning_receive(
		&self,
		payment_hash: PaymentHash,
		wait: bool,
		token: Option<&str>,
	) -> anyhow::Result<LightningReceive> {
		trace!("Claiming lightning receive for payment hash: {}", payment_hash);

		// Try to mark this payment as in-flight to prevent concurrent claim attempts.
		// This prevents race conditions where multiple concurrent calls could both
		// attempt to check/claim the same receive, leading to duplicate operations.
		{
			let mut inflight = self.inflight_lightning_payments.lock().await;
			if !inflight.insert(payment_hash) {
				bail!("Receive operation already in progress for this payment");
			}
		}

		let result = self.try_claim_lightning_receive_inner(payment_hash, wait, token).await;

		// Always remove from inflight set when done
		{
			let mut inflight = self.inflight_lightning_payments.lock().await;
			inflight.remove(&payment_hash);
		}

		result
	}

	/// Internal implementation of lightning receive claim after concurrency check.
	async fn try_claim_lightning_receive_inner(
		&self,
		payment_hash: PaymentHash,
		wait: bool,
		token: Option<&str>,
	) -> anyhow::Result<LightningReceive> {
		// check_lightning_receive returns None if there is no incoming payment (yet)
		// In that case we just return and don't try to claim
		let mut receive = match self.check_lightning_receive(payment_hash, wait, token).await? {
			Some(receive) => receive,
			None => {
				return self.db.fetch_lightning_receive_by_payment_hash(payment_hash).await?
					.context("No receive for payment_hash")
			}
		};

		if receive.finished_at.is_some() {
			return Ok(receive);
		}

		// No need to claim anything if there
		// are no htlcs yet
		if receive.htlc_vtxos.is_empty() {
			return Ok(receive);
		}

		match self.claim_lightning_receive(&mut receive).await {
			Ok(()) => Ok(receive),
			Err(e) => {
				error!("Failed to claim htlcs for payment_hash: {}", receive.payment_hash);
				// We're now in a our havoc era. Just exit. The server prepared our HTLCs,
				// but then later refused to / couldn't allow our claim. This shoul be quite
				// rare.
				warn!("Exiting lightning receive VTXOs");
				self.exit_lightning_receive(&receive).await?;
				return Err(e)
			}
		}
	}

	/// Check and claim all opened Lightning receive
	///
	/// This function fetches all opened lightning receives and then
	/// concurrently tries to check and claim them.
	///
	/// # Arguments
	///
	/// * `wait` - Whether to wait for each payment to be received.
	///
	/// # Returns
	///
	/// Returns an `anyhow::Result<()>`, which is:
	/// * `Ok(Vec<LightningReceive>)` contains the successfully claimed receives, if at least one
	/// claim succeeded, or an empty vector if no claim succeeded.
	/// * `Err` if all claim attempts failed.
	pub async fn try_claim_all_lightning_receives(&self, wait: bool) -> anyhow::Result<Vec<LightningReceive>> {
		let pending = self.pending_lightning_receives().await?;
		let total = pending.len();

		if total == 0 {
			return Ok(vec![]);
		}

		let results: Vec<_> = tokio_stream::iter(pending)
			.map(|rcv| async move {
				self.try_claim_lightning_receive(rcv.invoice.into(), wait, None).await
			})
			.buffer_unordered(3)
			.collect()
			.await;

		let mut claimed = vec![];
		let mut failed = 0;

		for result in results {
			match result {
				Ok(receive) => claimed.push(receive),
				Err(e) => {
					error!("Error claiming lightning receive: {:#}", e);
					failed += 1;
				}
			}
		}

		if failed > 0 {
			info!(
				"Lightning receive claims: {} succeeded, {} failed out of {} pending",
				claimed.len(), failed, total
			);
		}

		if claimed.is_empty() {
			anyhow::bail!("All {} lightning receive claim(s) failed", failed);
		}

		Ok(claimed)
	}
}