Skip to main content

bark/lightning/
receive.rs

1use std::str::FromStr;
2
3use anyhow::Context;
4use ark::arkoor::package::ArkoorPackageBuilder;
5use bitcoin::{Amount, SignedAmount};
6use bitcoin::hex::DisplayHex;
7use futures::StreamExt;
8use lightning_invoice::Bolt11Invoice;
9use log::{trace, debug, info, warn};
10
11use ark::{ProtocolEncoding, Vtxo, VtxoPolicy};
12use ark::attestations::{LightningReceiveAttestation};
13use ark::fees::validate_and_subtract_fee;
14use ark::lightning::{Bolt11InvoiceExt, PaymentHash, Preimage};
15use bitcoin_ext::{BlockDelta, BlockHeight};
16use server_rpc::protos;
17use server_rpc::protos::prepare_lightning_receive_claim_request::LightningReceiveAntiDos;
18
19use crate::subsystem::{LightningMovement, LightningReceiveMovement, Subsystem};
20use crate::{Wallet, error};
21use crate::movement::{MovementDestination, MovementStatus};
22use crate::movement::update::MovementUpdate;
23use crate::persist::models::LightningReceive;
24
25/// Leniency delta to allow claim when blocks were mined between htlc
26/// receive and claim preparation
27const LIGHTNING_PREPARE_CLAIM_DELTA: BlockDelta = 2;
28
29impl Wallet {
30	/// Fetches all pending lightning receives ordered from newest to oldest.
31	pub async fn pending_lightning_receives(&self) -> anyhow::Result<Vec<LightningReceive>> {
32		Ok(self.db.get_all_pending_lightning_receives().await?)
33	}
34
35	/// Calculates how much balance can currently be claimed via inbound lightning payments.
36	/// Invoices which have yet to be paid are not including in this.
37	pub async fn claimable_lightning_receive_balance(&self) -> anyhow::Result<Amount> {
38		let receives = self.pending_lightning_receives().await?;
39
40		let mut total = Amount::ZERO;
41		for receive in receives {
42			total += receive.htlc_vtxos.iter().map(|v| v.amount()).sum::<Amount>();
43		}
44
45		Ok(total)
46	}
47
48	/// Create, store and return a [Bolt11Invoice] for offchain boarding.
49	///
50	/// An optional `description` is embedded in the invoice as its memo. Note
51	/// that on retry for the same payment hash, the description of the
52	/// originally-generated invoice is preserved (the server returns the
53	/// previously-issued invoice).
54	pub async fn bolt11_invoice(
55		&self,
56		amount: Amount,
57		description: Option<String>,
58	) -> anyhow::Result<Bolt11Invoice> {
59		if amount == Amount::ZERO {
60			bail!("Cannot create invoice for 0 sats (this would create an explicit 0 sat invoice, not an any-amount invoice)");
61		}
62
63		let (mut srv, ark_info) = self.require_server().await?;
64		let config = self.config();
65
66		// Calculate and validate lightning receive fees
67		let fee = ark_info.fees.lightning_receive.calculate(amount).context("fee overflowed")?;
68		validate_and_subtract_fee(amount, fee)?;
69
70		// User needs to enfore the following delta:
71		// - vtxo exit delta + htlc expiry delta (to give him time to exit the vtxo before htlc expires)
72		// - vtxo exit margin (to give him time to exit the vtxo before htlc expires)
73		// - htlc recv claim delta (to give him time to claim the htlc before it expires)
74		let requested_min_cltv_delta = ark_info.vtxo_exit_delta +
75			ark_info.htlc_expiry_delta +
76			config.vtxo_exit_margin +
77			config.htlc_recv_claim_delta +
78			LIGHTNING_PREPARE_CLAIM_DELTA;
79
80		if requested_min_cltv_delta > ark_info.max_user_invoice_cltv_delta {
81			bail!("HTLC CLTV delta ({}) is greater than Server's max HTLC recv CLTV delta: {}",
82				requested_min_cltv_delta,
83				ark_info.max_user_invoice_cltv_delta,
84			);
85		}
86
87		let preimage = Preimage::random();
88		let payment_hash = preimage.compute_payment_hash();
89		info!("Start bolt11 board with preimage / payment hash: {} / {}",
90			preimage.as_hex(), payment_hash.as_hex());
91
92		let mailbox_kp = self.seed.to_mailbox_keypair();
93		let mailbox_id = ark::mailbox::MailboxIdentifier::from_pubkey(mailbox_kp.public_key());
94
95		let req = protos::StartLightningReceiveRequest {
96			payment_hash: payment_hash.to_vec(),
97			amount_sat: amount.to_sat(),
98			min_cltv_delta: requested_min_cltv_delta as u32,
99			mailbox_id: Some(mailbox_id.to_vec()),
100			description,
101		};
102
103		let resp = srv.client.start_lightning_receive(req).await?.into_inner();
104		info!("Ark Server is ready to receive LN payment to invoice: {}.", resp.bolt11);
105
106		let invoice = Bolt11Invoice::from_str(&resp.bolt11)
107			.context("invalid bolt11 invoice returned by Ark server")?;
108
109		self.db.store_lightning_receive(
110			payment_hash,
111			preimage,
112			&invoice,
113			requested_min_cltv_delta,
114		).await?;
115
116		Ok(invoice)
117	}
118
119	/// Fetches the status of a lightning receive for the given [PaymentHash].
120	pub async fn lightning_receive_status(
121		&self,
122		payment: impl Into<PaymentHash>,
123	) -> anyhow::Result<Option<LightningReceive>> {
124		Ok(self.db.fetch_lightning_receive_by_payment_hash(payment.into()).await?)
125	}
126
127	/// Claim given incoming lightning payment.
128	///
129	/// This function reveals the preimage of the lightning payment in
130	/// exchange of getting pubkey VTXOs from HTLC ones
131	///
132	/// # Returns
133	///
134	/// Returns an `anyhow::Result<()>`, which is:
135	/// * `Ok(())` if the process completes successfully.
136	///   the receive object is also updated correctly
137	/// * `Err` if an error occurs at any stage of the operation.
138	///
139	/// # Remarks
140	///
141	/// * The list of HTLC VTXOs must have the hash lock clause matching the given
142	///   [PaymentHash].
143	/// * The preimage is revealed to the server before the cosign response is
144	///   received. If the call fails after that point, the server's
145	///   `claim_lightning_receive` is idempotent so this method can be retried
146	///   to obtain fresh cosign signatures.
147	async fn claim_lightning_receive(
148		&self,
149		receive: &mut LightningReceive,
150	) -> anyhow::Result<()> {
151		let movement_id = receive.movement_id
152			.context("No movement created for lightning receive")?;
153		let (mut srv, _) = self.require_server().await?;
154
155		// order inputs by vtxoid before we generate nonces
156		let inputs = {
157			ensure!(!receive.htlc_vtxos.is_empty(), "no HTLC VTXOs set on record yet");
158			let mut ret = receive.htlc_vtxos.iter().map(|v| &v.vtxo).collect::<Vec<_>>();
159			ret.sort_by_key(|v| v.id());
160			ret
161		};
162
163		let mut keypairs = Vec::with_capacity(inputs.len());
164		for v in &inputs {
165			keypairs.push(self.get_vtxo_key(*v).await?);
166		}
167
168		// Claiming arkoor against preimage
169		let (claim_keypair, _) = self.derive_store_next_keypair().await?;
170		let receive_policy = VtxoPolicy::new_pubkey(claim_keypair.public_key());
171
172		trace!("ln arkoor builder params: inputs: {:?}; policy: {:?}",
173			inputs.iter().map(|v| v.id()).collect::<Vec<_>>(), receive_policy,
174		);
175		let builder = ArkoorPackageBuilder::new_claim_all_without_checkpoints(
176			inputs.iter().copied().cloned(),
177			receive_policy.clone(),
178		).context("creating claim arkoor builder failed")?;
179		let builder = builder.generate_user_nonces(&keypairs)
180			.context("arkoor nonce generation for claim failed")?;
181
182		info!("Claiming arkoor against payment preimage");
183		self.db.set_preimage_revealed(receive.payment_hash).await?;
184		let package_cosign_request = protos::ArkoorPackageCosignRequest::from(
185			builder.cosign_request(),
186		);
187		let resp = srv.client.claim_lightning_receive(protos::ClaimLightningReceiveRequest {
188			payment_hash: receive.payment_hash.to_byte_array().to_vec(),
189			payment_preimage: receive.payment_preimage.to_vec(),
190			cosign_request: Some(package_cosign_request),
191		}).await?.into_inner();
192		let cosign_resp = resp.try_into().context("invalid cosign response")?;
193
194		let outputs = builder.user_cosign(&keypairs, cosign_resp)
195			.context("claim arkoor cosign failed with user response")?
196			.build_signed_vtxos();
197
198		// Register the claim output so it is spendable for any later flow.
199		self.register_vtxo_transactions_with_server(&outputs).await?;
200
201		let mut effective_balance = Amount::ZERO;
202		for vtxo in &outputs {
203			// NB: bailing here results in vtxos not being registered despite the
204			// preimage being revealed.  The server's claim_lightning_receive is
205			// idempotent, so bark can retry and obtain fresh cosign signatures,
206			// but if all retries fail the user will be forced to exit on-chain.
207			trace!("Validating Lightning receive claim VTXO {}: {}",
208				vtxo.id(), vtxo.serialize_hex(),
209			);
210			self.validate_vtxo(vtxo).await
211				.context("invalid arkoor from lightning receive")?;
212			effective_balance += vtxo.amount();
213		}
214
215		self.store_spendable_vtxos(&outputs).await?;
216		self.mark_vtxos_as_spent(inputs).await?;
217
218		info!("Got arkoors from lightning: {}",
219			outputs.iter().map(|v| v.id().to_string()).collect::<Vec<_>>().join(", ")
220		);
221
222		self.movements.finish_movement_with_update(
223			movement_id,
224			MovementStatus::Successful,
225			MovementUpdate::new()
226				.effective_balance(effective_balance.to_signed()?)
227				.produced_vtxos(&outputs)
228		).await?;
229
230		self.db.finish_pending_lightning_receive(receive.payment_hash).await?;
231		*receive = self.db.fetch_lightning_receive_by_payment_hash(receive.payment_hash).await
232			.context("Database error")?
233			.context("Receive not found")?;
234
235		Ok(())
236	}
237
238	async fn compute_lightning_receive_anti_dos(
239		&self,
240		payment_hash: PaymentHash,
241		token: Option<&str>,
242	) -> anyhow::Result<LightningReceiveAntiDos> {
243		Ok(if let Some(token) = token {
244			LightningReceiveAntiDos::Token(token.to_string())
245		} else {
246			// We get an existing VTXO as an anti-dos measure.
247			let vtxo = self.select_vtxos_to_cover(Amount::ONE_SAT).await
248				.and_then(|vtxos| vtxos.into_iter().next()
249					.context("have no spendable vtxo to prove ownership of")
250				)?;
251			let vtxo_keypair = self.get_vtxo_key(&vtxo).await.expect("owned vtxo should be in database");
252			let attestation = LightningReceiveAttestation::new(payment_hash, vtxo.id(), &vtxo_keypair);
253			LightningReceiveAntiDos::InputVtxo(protos::InputVtxo {
254				vtxo_id: vtxo.id().to_bytes().to_vec(),
255				attestation: attestation.serialize(),
256			})
257		})
258	}
259
260	/// Check for incoming lightning payment with the given [PaymentHash].
261	///
262	/// This function checks for an incoming lightning payment with the
263	/// given [PaymentHash] and returns the HTLC VTXOs that are associated
264	/// with it.
265	///
266	/// # Arguments
267	///
268	/// * `payment_hash` - The [PaymentHash] of the lightning payment
269	/// to check for.
270	/// * `wait` - Whether to wait for the payment to be initiated by the sender.
271	/// * `token` - An optional lightning receive token used to authenticate a lightning
272	/// receive when no spendable VTXOs are owned by this wallet.
273	///
274	/// # Returns
275	///
276	/// Returns an `anyhow::Result<Option<LightningReceive>>`, which is:
277	/// * `Ok(Some(lightning_receive))` if the payment was initiated by
278	///   the sender and the HTLC VTXOs were successfully prepared.
279	/// * `Ok(None)` if the payment was not initiated by the sender or
280	///   the payment was canceled by server.
281	/// * `Err` if an error occurs at any stage of the operation.
282	///
283	/// # Remarks
284	///
285	/// * The invoice must contain an explicit amount specified in milli-satoshis.
286	/// * The HTLC expiry height is calculated by adding the servers' HTLC expiry delta to the
287	///   current chain tip.
288	/// * The payment hash must be from an invoice previously generated using
289	///   [Wallet::bolt11_invoice].
290	async fn check_lightning_receive(
291		&self,
292		payment_hash: PaymentHash,
293		wait: bool,
294		token: Option<&str>,
295	) -> anyhow::Result<Option<LightningReceive>> {
296		let (mut srv, ark_info) = self.require_server().await?;
297		let current_height = self.chain.tip().await?;
298
299		let mut receive = self.db.fetch_lightning_receive_by_payment_hash(payment_hash).await?
300			.context("no pending lightning receive found for payment hash, might already be claimed")?;
301
302		// If we have already HTLC VTXOs stored, we can return them without asking the server
303		if !receive.htlc_vtxos.is_empty() {
304			return Ok(Some(receive))
305		}
306
307		trace!("Requesting updates for ln-receive to server with for wait={} and hash={}", wait, payment_hash);
308		let sub = srv.client.check_lightning_receive(protos::CheckLightningReceiveRequest {
309			hash: payment_hash.to_byte_array().to_vec(), wait,
310		}).await?.into_inner();
311
312
313		let status = protos::LightningReceiveStatus::try_from(sub.status)
314			.with_context(|| format!("unknown payment status: {}", sub.status))?;
315
316		debug!("Received status {:?} for {}", status, payment_hash);
317		match status {
318			// this is the good case
319			protos::LightningReceiveStatus::Accepted |
320			protos::LightningReceiveStatus::HtlcsReady => {},
321			protos::LightningReceiveStatus::Created => {
322				return Ok(None);
323			},
324			protos::LightningReceiveStatus::Settled => bail!("payment already settled"),
325			protos::LightningReceiveStatus::Canceled => {
326				warn!("payment was canceled. removing pending lightning receive");
327				self.exit_or_cancel_lightning_receive(&receive).await?;
328				return Ok(None);
329			},
330		}
331
332		let lightning_receive_anti_dos = match self.compute_lightning_receive_anti_dos(
333			payment_hash, token,
334		).await {
335			Ok(anti_dos) => Some(anti_dos),
336			Err(e) => {
337				warn!("Could not compute anti-dos: {e:#}. Trying without");
338				None
339			},
340		};
341
342		let htlc_recv_expiry = current_height + receive.htlc_recv_cltv_delta as BlockHeight;
343
344		let (next_keypair, _) = self.derive_store_next_keypair().await?;
345		let req = protos::PrepareLightningReceiveClaimRequest {
346			payment_hash: receive.payment_hash.to_vec(),
347			user_pubkey: next_keypair.public_key().serialize().to_vec(),
348			htlc_recv_expiry,
349			lightning_receive_anti_dos,
350		};
351		let res = srv.client.prepare_lightning_receive_claim(req).await
352			.context("error preparing lightning receive claim")?.into_inner();
353		let vtxos = res.htlc_vtxos.into_iter()
354			.map(|b| Vtxo::deserialize(&b))
355			.collect::<Result<Vec<_>, _>>()
356			.context("invalid htlc vtxos from server")?;
357
358		// sanity check the vtxos
359		let mut htlc_amount = Amount::ZERO;
360		for vtxo in &vtxos {
361			trace!("Received HTLC VTXO {} from server: {}", vtxo.id(), vtxo.serialize_hex());
362			self.validate_vtxo(vtxo).await
363				.context("received invalid HTLC VTXO from server")?;
364			htlc_amount += vtxo.amount();
365
366			if let VtxoPolicy::ServerHtlcRecv(p) = vtxo.policy() {
367				if p.payment_hash != receive.payment_hash {
368					bail!("invalid payment hash on HTLC VTXOs received from server: {}",
369						p.payment_hash,
370					);
371				}
372				if p.user_pubkey != next_keypair.public_key() {
373					bail!("invalid pubkey on HTLC VTXOs received from server: {}", p.user_pubkey);
374				}
375				if p.htlc_expiry < htlc_recv_expiry {
376					bail!("HTLC VTXO expiry height is less than requested: Requested {}, received {}", htlc_recv_expiry, p.htlc_expiry);
377				}
378			} else {
379				bail!("invalid HTLC VTXO policy: {:?}", vtxo.policy());
380			}
381		}
382
383		// Check that the sum exceeds the invoice amount; we can't entirely trust the
384		// server-reported payment amount, so if there is a discrepancy, we should fall back to
385		// checking the invoice amount.
386		let invoice_amount = receive.invoice.get_payment_amount(None)
387			.context("ln receive invoice should have amount")?;
388		let server_received_amount = res.receive.map(|r| Amount::from_sat(r.amount_sat));
389		let fee = {
390			let fee = server_received_amount
391				.and_then(|a| ark_info.fees.lightning_receive.calculate(a));
392			match (server_received_amount, fee) {
393				(Some(amount), Some(fee)) if htlc_amount + fee == amount => {
394					// If this is true then the server is telling the truth.
395					fee
396				},
397				_ => {
398					// We should verify against the invoice amount instead. Unfortunately, that
399					// means the fee value in the movement won't be entirely accurate, however, it's
400					// better to avoid rejecting payments when we have received enough to cover an
401					// invoice.
402					ark_info.fees.lightning_receive.calculate(invoice_amount)
403						.expect("we previously validated this")
404				}
405			}
406		};
407		let received = htlc_amount + fee;
408		ensure!(received >= invoice_amount,
409			"Server didn't return enough VTXOs to cover invoice amount"
410		);
411
412		let movement_id = if let Some(movement_id) = receive.movement_id {
413			movement_id
414		} else {
415			self.movements.new_movement_with_update(
416				Subsystem::LIGHTNING_RECEIVE,
417				LightningReceiveMovement::Receive.to_string(),
418				MovementUpdate::new()
419					.intended_balance(invoice_amount.to_signed()?)
420					.effective_balance(htlc_amount.to_signed()?)
421					.fee(fee)
422					.metadata(LightningMovement::metadata(
423						receive.payment_hash, &vtxos, Some(receive.payment_preimage),
424					))
425					.received_on(
426						[MovementDestination::new(receive.invoice.clone().into(), received)],
427					),
428			).await?
429		};
430		self.store_locked_vtxos(&vtxos, Some(movement_id)).await?;
431
432		let vtxo_ids = vtxos.iter().map(|v| v.id()).collect::<Vec<_>>();
433		self.db.update_lightning_receive(payment_hash, &vtxo_ids, movement_id).await?;
434
435		let mut wallet_vtxos = vec![];
436		for vtxo in vtxos {
437			let v =  self.db.get_wallet_vtxo(vtxo.id()).await?
438				.context("Failed to get wallet VTXO for lightning receive")?;
439			wallet_vtxos.push(v);
440		}
441
442		receive.htlc_vtxos = wallet_vtxos;
443		receive.movement_id = Some(movement_id);
444
445		Ok(Some(receive))
446	}
447
448	/// Exit HTLC-recv VTXOs when preimage has been disclosed but the claim failed.
449	///
450	/// NOTE: Calling this function will always result in the HTLC VTXO being exited
451	/// regardless of the presence of the `preimage_revealed_at` field of
452	/// the `lightning_receive` struct.
453	async fn exit_lightning_receive(
454		&self,
455		lightning_receive: &LightningReceive,
456	) -> anyhow::Result<()> {
457		ensure!(!lightning_receive.htlc_vtxos.is_empty(), "no HTLC VTXOs to exit");
458		let vtxos = lightning_receive.htlc_vtxos.iter().map(|v| &v.vtxo).collect::<Vec<_>>();
459
460		info!("Exiting HTLC VTXOs for lightning_receive with payment hash {}", lightning_receive.payment_hash);
461		self.exit.write().await.start_exit_for_vtxos(&vtxos).await?;
462
463		if let Some(movement_id) = lightning_receive.movement_id {
464			self.movements.finish_movement_with_update(
465				movement_id,
466				MovementStatus::Failed,
467				MovementUpdate::new().exited_vtxos(vtxos),
468			).await?;
469		} else {
470			error!("movement id is missing but we disclosed preimage: {}", lightning_receive.payment_hash);
471		}
472
473		self.db.finish_pending_lightning_receive(lightning_receive.payment_hash).await?;
474		Ok(())
475	}
476
477	pub(crate) async fn exit_or_cancel_lightning_receive(
478		&self,
479		lightning_receive: &LightningReceive,
480	) -> anyhow::Result<()> {
481		let vtxos = &lightning_receive.htlc_vtxos;
482
483		let update_opt = match (vtxos.is_empty(), lightning_receive.preimage_revealed_at) {
484			(false, Some(_)) => {
485				return self.exit_lightning_receive(lightning_receive).await;
486			}
487			(false, None) => {
488				warn!("HTLC-recv VTXOs are about to expire, but preimage has not been disclosed yet. Canceling");
489				self.mark_vtxos_as_spent(vtxos).await?;
490				if let Some(movement_id) = lightning_receive.movement_id {
491					Some((
492						movement_id,
493						MovementUpdate::new()
494							.effective_balance(SignedAmount::ZERO),
495						MovementStatus::Canceled,
496					))
497				} else {
498					error!("movement id is missing but we got HTLC vtxos: {}", lightning_receive.payment_hash);
499					None
500				}
501			}
502			(true, Some(_)) => {
503				error!("No HTLC vtxos set on ln receive but preimage has been disclosed. Canceling");
504				lightning_receive.movement_id.map(|id| (id,
505					MovementUpdate::new()
506						.effective_balance(SignedAmount::ZERO),
507					MovementStatus::Canceled,
508				))
509			}
510			(true, None) => None,
511		};
512
513		if let Some((movement_id, update, status)) = update_opt {
514			self.movements.finish_movement_with_update(movement_id, status, update).await?;
515		}
516
517		self.db.finish_pending_lightning_receive(lightning_receive.payment_hash).await?;
518
519		Ok(())
520	}
521
522	/// Cancel a pending lightning receive.
523	///
524	/// This asks the server to cancel the hold invoice. The server will
525	/// refuse if HTLC-recv vtxos have already been granted.
526	///
527	/// Bark additionally prevents cancellation when the preimage has
528	/// already been revealed, since revealing the preimage means the
529	/// sender's payment is in flight and cancelling would lose funds.
530	pub async fn cancel_lightning_receive(
531		&self,
532		payment_hash: PaymentHash,
533	) -> anyhow::Result<()> {
534		let receive = self.db.fetch_lightning_receive_by_payment_hash(payment_hash).await?
535			.context("no pending lightning receive found for this payment hash")?;
536
537		if receive.preimage_revealed_at.is_some() {
538			bail!("cannot cancel: preimage has already been revealed");
539		}
540
541		if receive.finished_at.is_some() {
542			bail!("lightning receive is already finished");
543		}
544
545		let (mut srv, _) = self.require_server().await?;
546		srv.client.cancel_lightning_receive(protos::CancelLightningReceiveRequest {
547			payment_hash: payment_hash.to_vec(),
548		}).await.context("server refused cancellation")?;
549
550		// Clean up local state: mark htlc vtxos as spent and finish the receive
551		self.exit_or_cancel_lightning_receive(&receive).await?;
552
553		Ok(())
554	}
555
556	/// Check and claim a Lightning receive
557	///
558	/// This function checks for an incoming lightning payment with the given [PaymentHash]
559	/// and then claims the payment using returned HTLC VTXOs.
560	///
561	/// # Arguments
562	///
563	/// * `payment_hash` - The [PaymentHash] of the lightning payment
564	/// to check for.
565	/// * `wait` - Whether to wait for the payment to be received.
566	/// * `token` - An optional lightning receive token used to authenticate a lightning
567	/// receive when no spendable VTXOs are owned by this wallet.
568	///
569	/// # Returns
570	///
571	/// Returns an `anyhow::Result<LightningReceive>`, which is:
572	/// * `Ok(LightningReceive)` if the claim was completed or is awaiting HTLC VTXOs
573	/// * `Err` if an error occurs at any stage of the operation.
574	///
575	/// # Remarks
576	///
577	/// * The payment hash must be from an invoice previously generated using
578	///   [Wallet::bolt11_invoice].
579	pub async fn try_claim_lightning_receive(
580		&self,
581		payment_hash: PaymentHash,
582		wait: bool,
583		token: Option<&str>,
584	) -> anyhow::Result<LightningReceive> {
585		trace!("Claiming lightning receive for payment hash: {}", payment_hash);
586
587		// Try to mark this payment as in-flight to prevent concurrent claim attempts.
588		// This prevents race conditions where multiple concurrent calls could both
589		// attempt to check/claim the same receive, leading to duplicate operations.
590		{
591			let mut inflight = self.inflight_lightning_payments.lock().await;
592			if !inflight.insert(payment_hash) {
593				bail!("Receive operation already in progress for this payment");
594			}
595		}
596
597		let result = self.try_claim_lightning_receive_inner(payment_hash, wait, token).await;
598
599		// Always remove from inflight set when done
600		{
601			let mut inflight = self.inflight_lightning_payments.lock().await;
602			inflight.remove(&payment_hash);
603		}
604
605		result
606	}
607
608	/// Internal implementation of lightning receive claim after concurrency check.
609	async fn try_claim_lightning_receive_inner(
610		&self,
611		payment_hash: PaymentHash,
612		wait: bool,
613		token: Option<&str>,
614	) -> anyhow::Result<LightningReceive> {
615		// check_lightning_receive returns None if there is no incoming payment (yet)
616		// In that case we just return and don't try to claim
617		let mut receive = match self.check_lightning_receive(payment_hash, wait, token).await? {
618			Some(receive) => receive,
619			None => {
620				return self.db.fetch_lightning_receive_by_payment_hash(payment_hash).await?
621					.context("No receive for payment_hash")
622			}
623		};
624
625		if receive.finished_at.is_some() {
626			return Ok(receive);
627		}
628
629		// No need to claim anything if there
630		// are no htlcs yet
631		if receive.htlc_vtxos.is_empty() {
632			return Ok(receive);
633		}
634
635		match self.claim_lightning_receive(&mut receive).await {
636			Ok(()) => Ok(receive),
637			Err(e) => {
638				error!("Failed to claim htlcs for payment_hash: {}", receive.payment_hash);
639				// We're now in a our havoc era. Just exit. The server prepared our HTLCs,
640				// but then later refused to / couldn't allow our claim. This shoul be quite
641				// rare.
642				warn!("Exiting lightning receive VTXOs");
643				self.exit_lightning_receive(&receive).await?;
644				return Err(e)
645			}
646		}
647	}
648
649	/// Check and claim all opened Lightning receive
650	///
651	/// This function fetches all opened lightning receives and then
652	/// concurrently tries to check and claim them.
653	///
654	/// # Arguments
655	///
656	/// * `wait` - Whether to wait for each payment to be received.
657	///
658	/// # Returns
659	///
660	/// Returns an `anyhow::Result<()>`, which is:
661	/// * `Ok(Vec<LightningReceive>)` contains the successfully claimed receives, if at least one
662	/// claim succeeded, or an empty vector if no claim succeeded.
663	/// * `Err` if all claim attempts failed.
664	pub async fn try_claim_all_lightning_receives(&self, wait: bool) -> anyhow::Result<Vec<LightningReceive>> {
665		let pending = self.pending_lightning_receives().await?;
666		let total = pending.len();
667
668		if total == 0 {
669			return Ok(vec![]);
670		}
671
672		let results: Vec<_> = tokio_stream::iter(pending)
673			.map(|rcv| async move {
674				self.try_claim_lightning_receive(rcv.invoice.into(), wait, None).await
675			})
676			.buffer_unordered(3)
677			.collect()
678			.await;
679
680		let mut claimed = vec![];
681		let mut failed = 0;
682
683		for result in results {
684			match result {
685				Ok(receive) => claimed.push(receive),
686				Err(e) => {
687					error!("Error claiming lightning receive: {:#}", e);
688					failed += 1;
689				}
690			}
691		}
692
693		if failed > 0 {
694			info!(
695				"Lightning receive claims: {} succeeded, {} failed out of {} pending",
696				claimed.len(), failed, total
697			);
698		}
699
700		if claimed.is_empty() {
701			anyhow::bail!("All {} lightning receive claim(s) failed", failed);
702		}
703
704		Ok(claimed)
705	}
706}