1use std::str::FromStr;
2use std::time::Duration;
3
4use anyhow::Context;
5use ark::arkoor::package::ArkoorPackageBuilder;
6use bitcoin::{Amount, SignedAmount};
7use bitcoin::hex::DisplayHex;
8use futures::StreamExt;
9use lightning_invoice::Bolt11Invoice;
10use log::{trace, debug, info, warn};
11
12use ark::{ProtocolEncoding, Vtxo, VtxoPolicy};
13use ark::attestations::{LightningReceiveAttestation};
14use ark::fees::validate_and_subtract_fee;
15use ark::lightning::{Bolt11InvoiceExt, PaymentHash, Preimage};
16use bitcoin_ext::{BlockDelta, BlockHeight};
17use server_rpc::protos;
18use server_rpc::protos::prepare_lightning_receive_claim_request::LightningReceiveAntiDos;
19
20use crate::subsystem::{LightningMovement, LightningReceiveMovement, Subsystem};
21use crate::{Wallet, error};
22use crate::movement::{MovementDestination, MovementStatus};
23use crate::movement::update::MovementUpdate;
24use crate::persist::models::LightningReceive;
25
26const LIGHTNING_RECEIVE_LOCK_PREFIX: &str = "lightning_receive";
27
28const LIGHTNING_PREPARE_CLAIM_DELTA: BlockDelta = 2;
31
32const CLAIM_RETRY_BACKOFF_INITIAL: Duration = Duration::from_secs(2);
37const CLAIM_RETRY_BACKOFF_MAX: Duration = Duration::from_secs(30);
38
39impl Wallet {
40 pub async fn pending_lightning_receives(&self) -> anyhow::Result<Vec<LightningReceive>> {
42 Ok(self.inner.db.get_all_pending_lightning_receives().await?)
43 }
44
45 pub async fn claimable_lightning_receive_balance(&self) -> anyhow::Result<Amount> {
48 let receives = self.pending_lightning_receives().await?;
49
50 let mut total = Amount::ZERO;
51 for receive in receives {
52 total += receive.htlc_vtxos.iter().map(|v| v.amount()).sum::<Amount>();
53 }
54
55 Ok(total)
56 }
57
58 pub async fn bolt11_invoice(
65 &self,
66 amount: Amount,
67 description: Option<String>,
68 ) -> anyhow::Result<Bolt11Invoice> {
69 if amount == Amount::ZERO {
70 bail!("Cannot create invoice for 0 sats (this would create an explicit 0 sat invoice, not an any-amount invoice)");
71 }
72
73 let (mut srv, ark_info) = self.require_server().await?;
74 let config = self.config();
75
76 let fee = ark_info.fees.lightning_receive.calculate(amount).context("fee overflowed")?;
78 validate_and_subtract_fee(amount, fee)?;
79
80 let requested_min_cltv_delta = ark_info.vtxo_exit_delta +
85 ark_info.htlc_expiry_delta +
86 config.vtxo_exit_margin +
87 config.htlc_recv_claim_delta +
88 LIGHTNING_PREPARE_CLAIM_DELTA;
89
90 if requested_min_cltv_delta > ark_info.max_user_invoice_cltv_delta {
91 bail!("HTLC CLTV delta ({}) is greater than Server's max HTLC recv CLTV delta: {}",
92 requested_min_cltv_delta,
93 ark_info.max_user_invoice_cltv_delta,
94 );
95 }
96
97 let preimage = Preimage::random();
98 let payment_hash = preimage.compute_payment_hash();
99 info!("Start bolt11 board with preimage / payment hash: {} / {}",
100 preimage.as_hex(), payment_hash.as_hex());
101
102 let mailbox_kp = self.inner.seed.to_mailbox_keypair();
103 let mailbox_id = ark::mailbox::MailboxIdentifier::from_pubkey(mailbox_kp.public_key());
104
105 let req = protos::StartLightningReceiveRequest {
106 payment_hash: payment_hash.to_vec(),
107 amount_sat: amount.to_sat(),
108 min_cltv_delta: requested_min_cltv_delta as u32,
109 mailbox_id: Some(mailbox_id.serialize()),
110 description,
111 };
112
113 let resp = srv.client.start_lightning_receive(req).await?.into_inner();
114 info!("Ark Server is ready to receive LN payment to invoice: {}.", resp.bolt11);
115
116 let invoice = Bolt11Invoice::from_str(&resp.bolt11)
117 .context("invalid bolt11 invoice returned by Ark server")?;
118
119 self.inner.db.store_lightning_receive(
120 payment_hash,
121 preimage,
122 &invoice,
123 requested_min_cltv_delta,
124 ).await?;
125
126 Ok(invoice)
127 }
128
129 pub async fn lightning_receive_status(
131 &self,
132 payment: impl Into<PaymentHash>,
133 ) -> anyhow::Result<Option<LightningReceive>> {
134 Ok(self.inner.db.fetch_lightning_receive_by_payment_hash(payment.into()).await?)
135 }
136
137 pub async fn attempt_lightning_receive_exit(
145 &self,
146 payment: impl Into<PaymentHash>,
147 ) -> anyhow::Result<()> {
148 let receive = self.inner.db.fetch_lightning_receive_by_payment_hash(payment.into()).await?
149 .context("no pending lightning receive found for payment hash")?;
150 if receive.preimage_revealed_at.is_none() {
151 bail!("preimage must be revealed before attempting to exit");
152 }
153 if receive.htlc_vtxos.is_empty() {
154 bail!("Nothing to exit, no htlcs have been created yet!");
155 }
156 self.exit_lightning_receive(&receive).await
157 }
158
159 async fn claim_lightning_receive(
180 &self,
181 receive: &mut LightningReceive,
182 ) -> anyhow::Result<()> {
183 let movement_id = receive.movement_id
184 .context("No movement created for lightning receive")?;
185 let (mut srv, _) = self.require_server().await?;
186
187 ensure!(!receive.htlc_vtxos.is_empty(), "no HTLC VTXOs set on record yet");
190 let mut input_ids = receive.htlc_vtxos.iter().map(|v| v.vtxo.id()).collect::<Vec<_>>();
191 input_ids.sort();
192 let inputs = self.inner.db.get_full_vtxos(&input_ids).await
193 .context("failed to hydrate htlc input vtxos")?;
194
195 let mut keypairs = Vec::with_capacity(inputs.len());
196 for v in &inputs {
197 keypairs.push(self.get_vtxo_key(v).await?);
198 }
199
200 let (claim_keypair, _) = self.derive_store_next_keypair().await?;
202 let receive_policy = VtxoPolicy::new_pubkey(claim_keypair.public_key());
203
204 trace!("ln arkoor builder params: inputs: {:?}; policy: {:?}", input_ids, receive_policy);
205 let builder = ArkoorPackageBuilder::new_claim_all_without_checkpoints(
206 inputs,
207 receive_policy.clone(),
208 ).context("creating claim arkoor builder failed")?;
209 let builder = builder.generate_user_nonces(&keypairs)
210 .context("arkoor nonce generation for claim failed")?;
211
212 info!("Claiming arkoor against payment preimage");
213 self.inner.db.set_preimage_revealed(receive.payment_hash).await?;
214 *receive = self.inner.db.fetch_lightning_receive_by_payment_hash(receive.payment_hash).await
219 .context("Database error")?
220 .context("Receive not found")?;
221 let package_cosign_request = protos::ArkoorPackageCosignRequest::from(
222 builder.cosign_request(),
223 );
224 let resp = srv.client.claim_lightning_receive(protos::ClaimLightningReceiveRequest {
225 payment_hash: receive.payment_hash.to_byte_array().to_vec(),
226 payment_preimage: receive.payment_preimage.to_vec(),
227 cosign_request: Some(package_cosign_request),
228 }).await?.into_inner();
229 let cosign_resp = resp.try_into().context("invalid cosign response")?;
230
231 let outputs = builder.user_cosign(&keypairs, cosign_resp)
232 .context("claim arkoor cosign failed with user response")?
233 .build_signed_vtxos();
234
235 self.register_vtxo_transactions_with_server(&outputs).await?;
237
238 let mut effective_balance = Amount::ZERO;
239 for vtxo in &outputs {
240 trace!("Validating Lightning receive claim VTXO {}: {}",
245 vtxo.id(), vtxo.serialize_hex(),
246 );
247 self.validate_vtxo(vtxo).await
248 .context("invalid arkoor from lightning receive")?;
249 effective_balance += vtxo.amount();
250 }
251
252 self.store_spendable_vtxos(&outputs).await?;
253 self.mark_vtxos_as_spent(&receive.htlc_vtxos).await?;
254
255 info!("Got arkoors from lightning: {}",
256 outputs.iter().map(|v| v.id().to_string()).collect::<Vec<_>>().join(", ")
257 );
258
259 self.inner.movements.finish_movement_with_update(
260 movement_id,
261 MovementStatus::Successful,
262 MovementUpdate::new()
263 .effective_balance(effective_balance.to_signed()?)
264 .produced_vtxos(&outputs)
265 ).await?;
266
267 self.inner.db.finish_pending_lightning_receive(receive.payment_hash).await?;
268 *receive = self.inner.db.fetch_lightning_receive_by_payment_hash(receive.payment_hash).await
269 .context("Database error")?
270 .context("Receive not found")?;
271
272 Ok(())
273 }
274
275 async fn compute_lightning_receive_anti_dos(
276 &self,
277 payment_hash: PaymentHash,
278 token: Option<&str>,
279 ) -> anyhow::Result<LightningReceiveAntiDos> {
280 Ok(if let Some(token) = token {
281 LightningReceiveAntiDos::Token(token.to_string())
282 } else {
283 let vtxo = self.select_vtxos_to_cover(Amount::ONE_SAT).await
285 .and_then(|vtxos| vtxos.into_iter().next()
286 .context("have no spendable vtxo to prove ownership of")
287 )?;
288 let vtxo_keypair = self.get_vtxo_key(&vtxo).await.expect("owned vtxo should be in database");
289 let attestation = LightningReceiveAttestation::new(payment_hash, vtxo.id(), &vtxo_keypair);
290 LightningReceiveAntiDos::InputVtxo(protos::InputVtxo {
291 vtxo_id: vtxo.id().to_bytes().to_vec(),
292 attestation: attestation.serialize(),
293 })
294 })
295 }
296
297 async fn check_lightning_receive(
328 &self,
329 payment_hash: PaymentHash,
330 wait: bool,
331 token: Option<&str>,
332 ) -> anyhow::Result<Option<LightningReceive>> {
333 let (mut srv, ark_info) = self.require_server().await?;
334 let current_height = self.inner.chain.tip().await?;
335
336 let mut receive = self.inner.db.fetch_lightning_receive_by_payment_hash(payment_hash).await?
337 .context("no pending lightning receive found for payment hash, might already be claimed")?;
338
339 if !receive.htlc_vtxos.is_empty() {
341 return Ok(Some(receive))
342 }
343
344 trace!("Requesting updates for ln-receive to server with for wait={} and hash={}", wait, payment_hash);
345 let sub = srv.client.check_lightning_receive(protos::CheckLightningReceiveRequest {
346 hash: payment_hash.to_byte_array().to_vec(), wait,
347 }).await?.into_inner();
348
349
350 let status = protos::LightningReceiveStatus::try_from(sub.status)
351 .with_context(|| format!("unknown payment status: {}", sub.status))?;
352
353 debug!("Received status {:?} for {}", status, payment_hash);
354 match status {
355 protos::LightningReceiveStatus::Accepted |
357 protos::LightningReceiveStatus::HtlcsReady => {},
358 protos::LightningReceiveStatus::Created => {
359 return Ok(None);
360 },
361 protos::LightningReceiveStatus::Settled => bail!("payment already settled"),
362 protos::LightningReceiveStatus::Canceled => {
363 warn!("payment was canceled. removing pending lightning receive");
364 self.handle_failed_lightning_receive(&receive).await?;
365 return Ok(None);
366 },
367 }
368
369 let lightning_receive_anti_dos = match self.compute_lightning_receive_anti_dos(
370 payment_hash, token,
371 ).await {
372 Ok(anti_dos) => Some(anti_dos),
373 Err(e) => {
374 info!("Could not compute anti-dos: {e:#}. Trying without");
375 None
376 },
377 };
378
379 let htlc_recv_expiry = current_height + receive.htlc_recv_cltv_delta as BlockHeight;
380
381 let (next_keypair, _) = self.derive_store_next_keypair().await?;
382 let req = protos::PrepareLightningReceiveClaimRequest {
383 payment_hash: receive.payment_hash.to_vec(),
384 user_pubkey: next_keypair.public_key().serialize().to_vec(),
385 htlc_recv_expiry,
386 lightning_receive_anti_dos,
387 };
388 let res = srv.client.prepare_lightning_receive_claim(req).await
389 .context("error preparing lightning receive claim")?.into_inner();
390 let vtxos = res.htlc_vtxos.into_iter()
391 .map(|b| Vtxo::deserialize(&b))
392 .collect::<Result<Vec<_>, _>>()
393 .context("invalid htlc vtxos from server")?;
394
395 let mut htlc_amount = Amount::ZERO;
397 for vtxo in &vtxos {
398 trace!("Received HTLC VTXO {} from server: {}", vtxo.id(), vtxo.serialize_hex());
399 self.validate_vtxo(vtxo).await
400 .context("received invalid HTLC VTXO from server")?;
401 htlc_amount += vtxo.amount();
402
403 if let VtxoPolicy::ServerHtlcRecv(p) = vtxo.policy() {
404 if p.payment_hash != receive.payment_hash {
405 bail!("invalid payment hash on HTLC VTXOs received from server: {}",
406 p.payment_hash,
407 );
408 }
409 if p.user_pubkey != next_keypair.public_key() {
410 bail!("invalid pubkey on HTLC VTXOs received from server: {}", p.user_pubkey);
411 }
412 if p.htlc_expiry < htlc_recv_expiry {
413 bail!("HTLC VTXO expiry height is less than requested: Requested {}, received {}", htlc_recv_expiry, p.htlc_expiry);
414 }
415 } else {
416 bail!("invalid HTLC VTXO policy: {:?}", vtxo.policy());
417 }
418 }
419
420 let invoice_amount = receive.invoice.get_payment_amount(None)
424 .context("ln receive invoice should have amount")?;
425 let server_received_amount = res.receive.map(|r| Amount::from_sat(r.amount_sat));
426 let fee = {
427 let fee = server_received_amount
428 .and_then(|a| ark_info.fees.lightning_receive.calculate(a));
429 match (server_received_amount, fee) {
430 (Some(amount), Some(fee)) if htlc_amount + fee == amount => {
431 fee
433 },
434 _ => {
435 ark_info.fees.lightning_receive.calculate(invoice_amount)
440 .expect("we previously validated this")
441 }
442 }
443 };
444 let received = htlc_amount + fee;
445 ensure!(received >= invoice_amount,
446 "Server didn't return enough VTXOs to cover invoice amount"
447 );
448
449 let movement_id = if let Some(movement_id) = receive.movement_id {
450 movement_id
451 } else {
452 self.inner.movements.new_movement_with_update(
453 Subsystem::LIGHTNING_RECEIVE,
454 LightningReceiveMovement::Receive.to_string(),
455 MovementUpdate::new()
456 .intended_balance(invoice_amount.to_signed()?)
457 .effective_balance(htlc_amount.to_signed()?)
458 .fee(fee)
459 .metadata(LightningMovement::metadata(
460 receive.payment_hash, &vtxos, Some(receive.payment_preimage),
461 ))
462 .received_on(
463 [MovementDestination::new(receive.invoice.clone().into(), received)],
464 ),
465 ).await?
466 };
467 self.store_locked_vtxos(
468 &vtxos,
469 Some(crate::vtxo::VtxoLockHolder::Movement { id: movement_id }),
470 ).await?;
471
472 let vtxo_ids = vtxos.iter().map(|v| v.id()).collect::<Vec<_>>();
473 self.inner.db.update_lightning_receive(payment_hash, &vtxo_ids, movement_id).await?;
474
475 let mut wallet_vtxos = vec![];
476 for vtxo in vtxos {
477 let v = self.inner.db.get_wallet_vtxo(vtxo.id()).await?
478 .context("Failed to get wallet VTXO for lightning receive")?;
479 wallet_vtxos.push(v);
480 }
481
482 receive.htlc_vtxos = wallet_vtxos;
483 receive.movement_id = Some(movement_id);
484
485 Ok(Some(receive))
486 }
487
488 async fn exit_lightning_receive(
494 &self,
495 lightning_receive: &LightningReceive,
496 ) -> anyhow::Result<()> {
497 ensure!(!lightning_receive.htlc_vtxos.is_empty(), "no HTLC VTXOs to exit");
498 let vtxos = lightning_receive.htlc_vtxos.iter().map(|v| &v.vtxo).collect::<Vec<_>>();
499
500 info!("Exiting HTLC VTXOs for lightning_receive with payment hash {}", lightning_receive.payment_hash);
501 self.inner.exit.start_exit_for_vtxos(&vtxos).await?;
502
503 if let Some(movement_id) = lightning_receive.movement_id {
504 self.inner.movements.finish_movement_with_update(
505 movement_id,
506 MovementStatus::Failed,
507 MovementUpdate::new().exited_vtxos(vtxos),
508 ).await?;
509 } else {
510 error!("movement id is missing but we disclosed preimage: {}", lightning_receive.payment_hash);
511 }
512
513 self.inner.db.finish_pending_lightning_receive(lightning_receive.payment_hash).await?;
514 Ok(())
515 }
516
517 pub(crate) async fn handle_failed_lightning_receive(
518 &self,
519 lightning_receive: &LightningReceive,
520 ) -> anyhow::Result<()> {
521 let vtxos = &lightning_receive.htlc_vtxos;
522
523 let update_opt = match (vtxos.is_empty(), lightning_receive.preimage_revealed_at) {
524 (false, Some(_)) => {
525 return Ok(());
530 }
531 (false, None) => {
532 warn!("HTLC-recv VTXOs are about to expire, but preimage has not been disclosed yet. Canceling");
533 self.mark_vtxos_as_spent(vtxos).await?;
534 if let Some(movement_id) = lightning_receive.movement_id {
535 Some((
536 movement_id,
537 MovementUpdate::new()
538 .effective_balance(SignedAmount::ZERO),
539 MovementStatus::Canceled,
540 ))
541 } else {
542 error!("movement id is missing but we got HTLC vtxos: {}", lightning_receive.payment_hash);
543 None
544 }
545 }
546 (true, Some(_)) => {
547 error!("No HTLC vtxos set on ln receive but preimage has been disclosed. Canceling");
548 lightning_receive.movement_id.map(|id| (id,
549 MovementUpdate::new()
550 .effective_balance(SignedAmount::ZERO),
551 MovementStatus::Canceled,
552 ))
553 }
554 (true, None) => None,
555 };
556
557 if let Some((movement_id, update, status)) = update_opt {
558 self.inner.movements.finish_movement_with_update(movement_id, status, update).await?;
559 }
560
561 self.inner.db.finish_pending_lightning_receive(lightning_receive.payment_hash).await?;
562
563 Ok(())
564 }
565
566 pub async fn cancel_lightning_receive(
575 &self,
576 payment_hash: PaymentHash,
577 ) -> anyhow::Result<()> {
578 let receive = self.inner.db.fetch_lightning_receive_by_payment_hash(payment_hash).await?
579 .context("no pending lightning receive found for this payment hash")?;
580
581 if receive.preimage_revealed_at.is_some() {
582 bail!("cannot cancel: preimage has already been revealed");
583 }
584
585 if receive.finished_at.is_some() {
586 bail!("lightning receive is already finished");
587 }
588
589 let (mut srv, _) = self.require_server().await?;
590 srv.client.cancel_lightning_receive(protos::CancelLightningReceiveRequest {
591 payment_hash: payment_hash.to_vec(),
592 }).await.context("server refused cancellation")?;
593
594 self.handle_failed_lightning_receive(&receive).await?;
596
597 Ok(())
598 }
599
600 pub async fn try_claim_lightning_receive(
626 &self,
627 payment_hash: PaymentHash,
628 wait: bool,
629 token: Option<&str>,
630 ) -> anyhow::Result<LightningReceive> {
631 trace!("Claiming lightning receive for payment hash: {}", payment_hash);
632
633 let key = format!("{}.{}", LIGHTNING_RECEIVE_LOCK_PREFIX, payment_hash);
637 let _guard = match self.inner.lock_manager.try_lock(&key).await {
638 Some(guard) => guard,
639 None => {
640 debug!("Receive operation already in progress for this payment");
641 return self.inner.db.fetch_lightning_receive_by_payment_hash(payment_hash).await?
642 .context("no receive for payment hash");
643 },
644 };
645
646 self.try_claim_lightning_receive_inner(payment_hash, wait, token).await
647 }
648
649 async fn try_claim_lightning_receive_inner(
651 &self,
652 payment_hash: PaymentHash,
653 wait: bool,
654 token: Option<&str>,
655 ) -> anyhow::Result<LightningReceive> {
656 let mut receive = match self.check_lightning_receive(payment_hash, wait, token).await? {
659 Some(receive) => receive,
660 None => {
661 return self.inner.db.fetch_lightning_receive_by_payment_hash(payment_hash).await?
662 .context("No receive for payment_hash")
663 }
664 };
665
666 if receive.finished_at.is_some() {
667 return Ok(receive);
668 }
669
670 if receive.htlc_vtxos.is_empty() {
673 return Ok(receive);
674 }
675
676 let mut retries_left = self.inner.config.lightning_receive_claim_retries;
677 let mut backoff = CLAIM_RETRY_BACKOFF_INITIAL;
678 let claim_result = loop {
679 match self.claim_lightning_receive(&mut receive).await {
680 Ok(()) => break Ok(()),
681 Err(e) if retries_left == 0 => break Err(e),
682 Err(e) => {
683 warn!(
684 "Error claiming lightning receive {} ({} retries left, retrying in {:?}): {:#}",
685 receive.payment_hash, retries_left, backoff, e,
686 );
687 retries_left -= 1;
688 tokio::time::sleep(backoff).await;
689 backoff = (backoff * 2).min(CLAIM_RETRY_BACKOFF_MAX);
690 }
691 }
692 };
693
694 match claim_result {
695 Ok(()) => Ok(receive),
696 Err(e) => {
697 error!("Failed to claim htlcs for payment_hash: {}", receive.payment_hash);
698 self.handle_failed_lightning_receive(&receive).await?;
701 Err(e)
702 }
703 }
704 }
705
706 pub async fn try_claim_all_lightning_receives(&self, wait: bool) -> anyhow::Result<Vec<LightningReceive>> {
722 let pending = self.pending_lightning_receives().await?;
723 let total = pending.len();
724
725 if total == 0 {
726 return Ok(vec![]);
727 }
728
729 let results: Vec<_> = tokio_stream::iter(pending)
730 .map(|rcv| async move {
731 self.try_claim_lightning_receive(rcv.invoice.into(), wait, None).await
732 })
733 .buffer_unordered(3)
734 .collect()
735 .await;
736
737 let mut claimed = vec![];
738 let mut failed = 0;
739
740 for result in results {
741 match result {
742 Ok(receive) => claimed.push(receive),
743 Err(e) => {
744 error!("Error claiming lightning receive: {:#}", e);
745 failed += 1;
746 }
747 }
748 }
749
750 if failed > 0 {
751 info!(
752 "Lightning receive claims: {} succeeded, {} failed out of {} pending",
753 claimed.len(), failed, total
754 );
755 }
756
757 if claimed.is_empty() {
758 anyhow::bail!("All {} lightning receive claim(s) failed", failed);
759 }
760
761 Ok(claimed)
762 }
763}