1use std::str::FromStr;
2use std::time::Duration;
3
4use anyhow::Context;
5use ark::arkoor::package::ArkoorPackageBuilder;
6use bitcoin::{Amount, SignedAmount};
7use bitcoin::hex::DisplayHex;
8use futures::StreamExt;
9use lightning_invoice::Bolt11Invoice;
10use log::{trace, debug, info, warn};
11
12use ark::{ProtocolEncoding, Vtxo, VtxoPolicy};
13use ark::attestations::{LightningReceiveAttestation};
14use ark::fees::validate_and_subtract_fee;
15use ark::lightning::{Bolt11InvoiceExt, PaymentHash, Preimage};
16use bitcoin_ext::{BlockDelta, BlockHeight};
17use server_rpc::protos;
18use server_rpc::protos::prepare_lightning_receive_claim_request::LightningReceiveAntiDos;
19
20use crate::subsystem::{LightningMovement, LightningReceiveMovement, Subsystem};
21use crate::{Wallet, error};
22use crate::movement::{MovementDestination, MovementStatus};
23use crate::movement::update::MovementUpdate;
24use crate::persist::models::LightningReceive;
25
26const LIGHTNING_RECEIVE_LOCK_PREFIX: &str = "lightning_receive";
27
28const LIGHTNING_PREPARE_CLAIM_DELTA: BlockDelta = 2;
31
32const CLAIM_RETRY_BACKOFF_INITIAL: Duration = Duration::from_secs(2);
37const CLAIM_RETRY_BACKOFF_MAX: Duration = Duration::from_secs(30);
38
39impl Wallet {
40 pub async fn pending_lightning_receives(&self) -> anyhow::Result<Vec<LightningReceive>> {
42 Ok(self.inner.db.get_all_pending_lightning_receives().await?)
43 }
44
45 pub async fn claimable_lightning_receive_balance(&self) -> anyhow::Result<Amount> {
48 let receives = self.pending_lightning_receives().await?;
49
50 let mut total = Amount::ZERO;
51 for receive in receives {
52 total += receive.htlc_vtxos.iter().map(|v| v.amount()).sum::<Amount>();
53 }
54
55 Ok(total)
56 }
57
58 pub async fn bolt11_invoice(
65 &self,
66 amount: Amount,
67 description: Option<String>,
68 ) -> anyhow::Result<Bolt11Invoice> {
69 if amount == Amount::ZERO {
70 bail!("Cannot create invoice for 0 sats (this would create an explicit 0 sat invoice, not an any-amount invoice)");
71 }
72
73 let (mut srv, ark_info) = self.require_server().await?;
74 let config = self.config();
75
76 let fee = ark_info.fees.lightning_receive.calculate(amount).context("fee overflowed")?;
78 validate_and_subtract_fee(amount, fee)?;
79
80 let requested_min_cltv_delta = ark_info.vtxo_exit_delta +
85 ark_info.htlc_expiry_delta +
86 config.vtxo_exit_margin +
87 config.htlc_recv_claim_delta +
88 LIGHTNING_PREPARE_CLAIM_DELTA;
89
90 if requested_min_cltv_delta > ark_info.max_user_invoice_cltv_delta {
91 bail!("HTLC CLTV delta ({}) is greater than Server's max HTLC recv CLTV delta: {}",
92 requested_min_cltv_delta,
93 ark_info.max_user_invoice_cltv_delta,
94 );
95 }
96
97 let preimage = Preimage::random();
98 let payment_hash = preimage.compute_payment_hash();
99 info!("Start bolt11 board with preimage / payment hash: {} / {}",
100 preimage.as_hex(), payment_hash.as_hex());
101
102 let mailbox_kp = self.inner.seed.to_mailbox_keypair();
103 let mailbox_id = ark::mailbox::MailboxIdentifier::from_pubkey(mailbox_kp.public_key());
104
105 let req = protos::StartLightningReceiveRequest {
106 payment_hash: payment_hash.to_vec(),
107 amount_sat: amount.to_sat(),
108 min_cltv_delta: requested_min_cltv_delta as u32,
109 mailbox_id: Some(mailbox_id.serialize()),
110 description,
111 };
112
113 let resp = srv.client.start_lightning_receive(req).await?.into_inner();
114 info!("Ark Server is ready to receive LN payment to invoice: {}.", resp.bolt11);
115
116 let invoice = Bolt11Invoice::from_str(&resp.bolt11)
117 .context("invalid bolt11 invoice returned by Ark server")?;
118
119 self.inner.db.store_lightning_receive(
120 payment_hash,
121 preimage,
122 &invoice,
123 requested_min_cltv_delta,
124 ).await?;
125
126 Ok(invoice)
127 }
128
129 pub async fn lightning_receive_status(
131 &self,
132 payment: impl Into<PaymentHash>,
133 ) -> anyhow::Result<Option<LightningReceive>> {
134 Ok(self.inner.db.fetch_lightning_receive_by_payment_hash(payment.into()).await?)
135 }
136
137 pub async fn attempt_lightning_receive_exit(
145 &self,
146 payment: impl Into<PaymentHash>,
147 ) -> anyhow::Result<()> {
148 let receive = self.inner.db.fetch_lightning_receive_by_payment_hash(payment.into()).await?
149 .context("no pending lightning receive found for payment hash")?;
150 if receive.preimage_revealed_at.is_none() {
151 bail!("preimage must be revealed before attempting to exit");
152 }
153 if receive.htlc_vtxos.is_empty() {
154 bail!("Nothing to exit, no htlcs have been created yet!");
155 }
156 self.exit_lightning_receive(&receive).await
157 }
158
159 async fn claim_lightning_receive(
180 &self,
181 receive: &mut LightningReceive,
182 ) -> anyhow::Result<()> {
183 let movement_id = receive.movement_id
184 .context("No movement created for lightning receive")?;
185 let (mut srv, _) = self.require_server().await?;
186
187 ensure!(!receive.htlc_vtxos.is_empty(), "no HTLC VTXOs set on record yet");
190 let mut input_ids = receive.htlc_vtxos.iter().map(|v| v.vtxo.id()).collect::<Vec<_>>();
191 input_ids.sort();
192 let inputs = self.inner.db.get_full_vtxos(&input_ids).await
193 .context("failed to hydrate htlc input vtxos")?;
194
195 let mut keypairs = Vec::with_capacity(inputs.len());
196 for v in &inputs {
197 keypairs.push(self.get_vtxo_key(v).await?);
198 }
199
200 let (claim_keypair, _) = self.derive_store_next_keypair().await?;
202 let receive_policy = VtxoPolicy::new_pubkey(claim_keypair.public_key());
203
204 trace!("ln arkoor builder params: inputs: {:?}; policy: {:?}", input_ids, receive_policy);
205 let builder = ArkoorPackageBuilder::new_claim_all_without_checkpoints(
206 inputs,
207 receive_policy.clone(),
208 ).context("creating claim arkoor builder failed")?;
209 let builder = builder.generate_user_nonces(&keypairs)
210 .context("arkoor nonce generation for claim failed")?;
211
212 info!("Claiming arkoor against payment preimage");
213 self.inner.db.set_preimage_revealed(receive.payment_hash).await?;
214 let package_cosign_request = protos::ArkoorPackageCosignRequest::from(
215 builder.cosign_request(),
216 );
217 let resp = srv.client.claim_lightning_receive(protos::ClaimLightningReceiveRequest {
218 payment_hash: receive.payment_hash.to_byte_array().to_vec(),
219 payment_preimage: receive.payment_preimage.to_vec(),
220 cosign_request: Some(package_cosign_request),
221 }).await?.into_inner();
222 let cosign_resp = resp.try_into().context("invalid cosign response")?;
223
224 let outputs = builder.user_cosign(&keypairs, cosign_resp)
225 .context("claim arkoor cosign failed with user response")?
226 .build_signed_vtxos();
227
228 self.register_vtxo_transactions_with_server(&outputs).await?;
230
231 let mut effective_balance = Amount::ZERO;
232 for vtxo in &outputs {
233 trace!("Validating Lightning receive claim VTXO {}: {}",
238 vtxo.id(), vtxo.serialize_hex(),
239 );
240 self.validate_vtxo(vtxo).await
241 .context("invalid arkoor from lightning receive")?;
242 effective_balance += vtxo.amount();
243 }
244
245 self.store_spendable_vtxos(&outputs).await?;
246 self.mark_vtxos_as_spent(&receive.htlc_vtxos).await?;
247
248 info!("Got arkoors from lightning: {}",
249 outputs.iter().map(|v| v.id().to_string()).collect::<Vec<_>>().join(", ")
250 );
251
252 self.inner.movements.finish_movement_with_update(
253 movement_id,
254 MovementStatus::Successful,
255 MovementUpdate::new()
256 .effective_balance(effective_balance.to_signed()?)
257 .produced_vtxos(&outputs)
258 ).await?;
259
260 self.inner.db.finish_pending_lightning_receive(receive.payment_hash).await?;
261 *receive = self.inner.db.fetch_lightning_receive_by_payment_hash(receive.payment_hash).await
262 .context("Database error")?
263 .context("Receive not found")?;
264
265 Ok(())
266 }
267
268 async fn compute_lightning_receive_anti_dos(
269 &self,
270 payment_hash: PaymentHash,
271 token: Option<&str>,
272 ) -> anyhow::Result<LightningReceiveAntiDos> {
273 Ok(if let Some(token) = token {
274 LightningReceiveAntiDos::Token(token.to_string())
275 } else {
276 let vtxo = self.select_vtxos_to_cover(Amount::ONE_SAT).await
278 .and_then(|vtxos| vtxos.into_iter().next()
279 .context("have no spendable vtxo to prove ownership of")
280 )?;
281 let vtxo_keypair = self.get_vtxo_key(&vtxo).await.expect("owned vtxo should be in database");
282 let attestation = LightningReceiveAttestation::new(payment_hash, vtxo.id(), &vtxo_keypair);
283 LightningReceiveAntiDos::InputVtxo(protos::InputVtxo {
284 vtxo_id: vtxo.id().to_bytes().to_vec(),
285 attestation: attestation.serialize(),
286 })
287 })
288 }
289
290 async fn check_lightning_receive(
321 &self,
322 payment_hash: PaymentHash,
323 wait: bool,
324 token: Option<&str>,
325 ) -> anyhow::Result<Option<LightningReceive>> {
326 let (mut srv, ark_info) = self.require_server().await?;
327 let current_height = self.inner.chain.tip().await?;
328
329 let mut receive = self.inner.db.fetch_lightning_receive_by_payment_hash(payment_hash).await?
330 .context("no pending lightning receive found for payment hash, might already be claimed")?;
331
332 if !receive.htlc_vtxos.is_empty() {
334 return Ok(Some(receive))
335 }
336
337 trace!("Requesting updates for ln-receive to server with for wait={} and hash={}", wait, payment_hash);
338 let sub = srv.client.check_lightning_receive(protos::CheckLightningReceiveRequest {
339 hash: payment_hash.to_byte_array().to_vec(), wait,
340 }).await?.into_inner();
341
342
343 let status = protos::LightningReceiveStatus::try_from(sub.status)
344 .with_context(|| format!("unknown payment status: {}", sub.status))?;
345
346 debug!("Received status {:?} for {}", status, payment_hash);
347 match status {
348 protos::LightningReceiveStatus::Accepted |
350 protos::LightningReceiveStatus::HtlcsReady => {},
351 protos::LightningReceiveStatus::Created => {
352 return Ok(None);
353 },
354 protos::LightningReceiveStatus::Settled => bail!("payment already settled"),
355 protos::LightningReceiveStatus::Canceled => {
356 warn!("payment was canceled. removing pending lightning receive");
357 self.handle_failed_lightning_receive(&receive).await?;
358 return Ok(None);
359 },
360 }
361
362 let lightning_receive_anti_dos = match self.compute_lightning_receive_anti_dos(
363 payment_hash, token,
364 ).await {
365 Ok(anti_dos) => Some(anti_dos),
366 Err(e) => {
367 info!("Could not compute anti-dos: {e:#}. Trying without");
368 None
369 },
370 };
371
372 let htlc_recv_expiry = current_height + receive.htlc_recv_cltv_delta as BlockHeight;
373
374 let (next_keypair, _) = self.derive_store_next_keypair().await?;
375 let req = protos::PrepareLightningReceiveClaimRequest {
376 payment_hash: receive.payment_hash.to_vec(),
377 user_pubkey: next_keypair.public_key().serialize().to_vec(),
378 htlc_recv_expiry,
379 lightning_receive_anti_dos,
380 };
381 let res = srv.client.prepare_lightning_receive_claim(req).await
382 .context("error preparing lightning receive claim")?.into_inner();
383 let vtxos = res.htlc_vtxos.into_iter()
384 .map(|b| Vtxo::deserialize(&b))
385 .collect::<Result<Vec<_>, _>>()
386 .context("invalid htlc vtxos from server")?;
387
388 let mut htlc_amount = Amount::ZERO;
390 for vtxo in &vtxos {
391 trace!("Received HTLC VTXO {} from server: {}", vtxo.id(), vtxo.serialize_hex());
392 self.validate_vtxo(vtxo).await
393 .context("received invalid HTLC VTXO from server")?;
394 htlc_amount += vtxo.amount();
395
396 if let VtxoPolicy::ServerHtlcRecv(p) = vtxo.policy() {
397 if p.payment_hash != receive.payment_hash {
398 bail!("invalid payment hash on HTLC VTXOs received from server: {}",
399 p.payment_hash,
400 );
401 }
402 if p.user_pubkey != next_keypair.public_key() {
403 bail!("invalid pubkey on HTLC VTXOs received from server: {}", p.user_pubkey);
404 }
405 if p.htlc_expiry < htlc_recv_expiry {
406 bail!("HTLC VTXO expiry height is less than requested: Requested {}, received {}", htlc_recv_expiry, p.htlc_expiry);
407 }
408 } else {
409 bail!("invalid HTLC VTXO policy: {:?}", vtxo.policy());
410 }
411 }
412
413 let invoice_amount = receive.invoice.get_payment_amount(None)
417 .context("ln receive invoice should have amount")?;
418 let server_received_amount = res.receive.map(|r| Amount::from_sat(r.amount_sat));
419 let fee = {
420 let fee = server_received_amount
421 .and_then(|a| ark_info.fees.lightning_receive.calculate(a));
422 match (server_received_amount, fee) {
423 (Some(amount), Some(fee)) if htlc_amount + fee == amount => {
424 fee
426 },
427 _ => {
428 ark_info.fees.lightning_receive.calculate(invoice_amount)
433 .expect("we previously validated this")
434 }
435 }
436 };
437 let received = htlc_amount + fee;
438 ensure!(received >= invoice_amount,
439 "Server didn't return enough VTXOs to cover invoice amount"
440 );
441
442 let movement_id = if let Some(movement_id) = receive.movement_id {
443 movement_id
444 } else {
445 self.inner.movements.new_movement_with_update(
446 Subsystem::LIGHTNING_RECEIVE,
447 LightningReceiveMovement::Receive.to_string(),
448 MovementUpdate::new()
449 .intended_balance(invoice_amount.to_signed()?)
450 .effective_balance(htlc_amount.to_signed()?)
451 .fee(fee)
452 .metadata(LightningMovement::metadata(
453 receive.payment_hash, &vtxos, Some(receive.payment_preimage),
454 ))
455 .received_on(
456 [MovementDestination::new(receive.invoice.clone().into(), received)],
457 ),
458 ).await?
459 };
460 self.store_locked_vtxos(
461 &vtxos,
462 Some(crate::vtxo::VtxoLockHolder::Movement { id: movement_id }),
463 ).await?;
464
465 let vtxo_ids = vtxos.iter().map(|v| v.id()).collect::<Vec<_>>();
466 self.inner.db.update_lightning_receive(payment_hash, &vtxo_ids, movement_id).await?;
467
468 let mut wallet_vtxos = vec![];
469 for vtxo in vtxos {
470 let v = self.inner.db.get_wallet_vtxo(vtxo.id()).await?
471 .context("Failed to get wallet VTXO for lightning receive")?;
472 wallet_vtxos.push(v);
473 }
474
475 receive.htlc_vtxos = wallet_vtxos;
476 receive.movement_id = Some(movement_id);
477
478 Ok(Some(receive))
479 }
480
481 async fn exit_lightning_receive(
487 &self,
488 lightning_receive: &LightningReceive,
489 ) -> anyhow::Result<()> {
490 ensure!(!lightning_receive.htlc_vtxos.is_empty(), "no HTLC VTXOs to exit");
491 let vtxos = lightning_receive.htlc_vtxos.iter().map(|v| &v.vtxo).collect::<Vec<_>>();
492
493 info!("Exiting HTLC VTXOs for lightning_receive with payment hash {}", lightning_receive.payment_hash);
494 self.inner.exit.start_exit_for_vtxos(&vtxos).await?;
495
496 if let Some(movement_id) = lightning_receive.movement_id {
497 self.inner.movements.finish_movement_with_update(
498 movement_id,
499 MovementStatus::Failed,
500 MovementUpdate::new().exited_vtxos(vtxos),
501 ).await?;
502 } else {
503 error!("movement id is missing but we disclosed preimage: {}", lightning_receive.payment_hash);
504 }
505
506 self.inner.db.finish_pending_lightning_receive(lightning_receive.payment_hash).await?;
507 Ok(())
508 }
509
510 pub(crate) async fn handle_failed_lightning_receive(
511 &self,
512 lightning_receive: &LightningReceive,
513 ) -> anyhow::Result<()> {
514 let vtxos = &lightning_receive.htlc_vtxos;
515
516 let update_opt = match (vtxos.is_empty(), lightning_receive.preimage_revealed_at) {
517 (false, Some(_)) => {
518 return Ok(());
523 }
524 (false, None) => {
525 warn!("HTLC-recv VTXOs are about to expire, but preimage has not been disclosed yet. Canceling");
526 self.mark_vtxos_as_spent(vtxos).await?;
527 if let Some(movement_id) = lightning_receive.movement_id {
528 Some((
529 movement_id,
530 MovementUpdate::new()
531 .effective_balance(SignedAmount::ZERO),
532 MovementStatus::Canceled,
533 ))
534 } else {
535 error!("movement id is missing but we got HTLC vtxos: {}", lightning_receive.payment_hash);
536 None
537 }
538 }
539 (true, Some(_)) => {
540 error!("No HTLC vtxos set on ln receive but preimage has been disclosed. Canceling");
541 lightning_receive.movement_id.map(|id| (id,
542 MovementUpdate::new()
543 .effective_balance(SignedAmount::ZERO),
544 MovementStatus::Canceled,
545 ))
546 }
547 (true, None) => None,
548 };
549
550 if let Some((movement_id, update, status)) = update_opt {
551 self.inner.movements.finish_movement_with_update(movement_id, status, update).await?;
552 }
553
554 self.inner.db.finish_pending_lightning_receive(lightning_receive.payment_hash).await?;
555
556 Ok(())
557 }
558
559 pub async fn cancel_lightning_receive(
568 &self,
569 payment_hash: PaymentHash,
570 ) -> anyhow::Result<()> {
571 let receive = self.inner.db.fetch_lightning_receive_by_payment_hash(payment_hash).await?
572 .context("no pending lightning receive found for this payment hash")?;
573
574 if receive.preimage_revealed_at.is_some() {
575 bail!("cannot cancel: preimage has already been revealed");
576 }
577
578 if receive.finished_at.is_some() {
579 bail!("lightning receive is already finished");
580 }
581
582 let (mut srv, _) = self.require_server().await?;
583 srv.client.cancel_lightning_receive(protos::CancelLightningReceiveRequest {
584 payment_hash: payment_hash.to_vec(),
585 }).await.context("server refused cancellation")?;
586
587 self.handle_failed_lightning_receive(&receive).await?;
589
590 Ok(())
591 }
592
593 pub async fn try_claim_lightning_receive(
619 &self,
620 payment_hash: PaymentHash,
621 wait: bool,
622 token: Option<&str>,
623 ) -> anyhow::Result<LightningReceive> {
624 trace!("Claiming lightning receive for payment hash: {}", payment_hash);
625
626 let key = format!("{}.{}", LIGHTNING_RECEIVE_LOCK_PREFIX, payment_hash);
630 let _guard = match self.inner.lock_manager.try_lock(&key).await {
631 Some(guard) => guard,
632 None => {
633 debug!("Receive operation already in progress for this payment");
634 return self.inner.db.fetch_lightning_receive_by_payment_hash(payment_hash).await?
635 .context("no receive for payment hash");
636 },
637 };
638
639 self.try_claim_lightning_receive_inner(payment_hash, wait, token).await
640 }
641
642 async fn try_claim_lightning_receive_inner(
644 &self,
645 payment_hash: PaymentHash,
646 wait: bool,
647 token: Option<&str>,
648 ) -> anyhow::Result<LightningReceive> {
649 let mut receive = match self.check_lightning_receive(payment_hash, wait, token).await? {
652 Some(receive) => receive,
653 None => {
654 return self.inner.db.fetch_lightning_receive_by_payment_hash(payment_hash).await?
655 .context("No receive for payment_hash")
656 }
657 };
658
659 if receive.finished_at.is_some() {
660 return Ok(receive);
661 }
662
663 if receive.htlc_vtxos.is_empty() {
666 return Ok(receive);
667 }
668
669 let mut retries_left = self.inner.config.lightning_receive_claim_retries;
670 let mut backoff = CLAIM_RETRY_BACKOFF_INITIAL;
671 let claim_result = loop {
672 match self.claim_lightning_receive(&mut receive).await {
673 Ok(()) => break Ok(()),
674 Err(e) if retries_left == 0 => break Err(e),
675 Err(e) => {
676 warn!(
677 "Error claiming lightning receive {} ({} retries left, retrying in {:?}): {:#}",
678 receive.payment_hash, retries_left, backoff, e,
679 );
680 retries_left -= 1;
681 tokio::time::sleep(backoff).await;
682 backoff = (backoff * 2).min(CLAIM_RETRY_BACKOFF_MAX);
683 }
684 }
685 };
686
687 match claim_result {
688 Ok(()) => Ok(receive),
689 Err(e) => {
690 error!("Failed to claim htlcs for payment_hash: {}", receive.payment_hash);
691 warn!("Exiting lightning receive VTXOs");
695 self.exit_lightning_receive(&receive).await?;
696 return Err(e)
697 }
698 }
699 }
700
701 pub async fn try_claim_all_lightning_receives(&self, wait: bool) -> anyhow::Result<Vec<LightningReceive>> {
717 let pending = self.pending_lightning_receives().await?;
718 let total = pending.len();
719
720 if total == 0 {
721 return Ok(vec![]);
722 }
723
724 let results: Vec<_> = tokio_stream::iter(pending)
725 .map(|rcv| async move {
726 self.try_claim_lightning_receive(rcv.invoice.into(), wait, None).await
727 })
728 .buffer_unordered(3)
729 .collect()
730 .await;
731
732 let mut claimed = vec![];
733 let mut failed = 0;
734
735 for result in results {
736 match result {
737 Ok(receive) => claimed.push(receive),
738 Err(e) => {
739 error!("Error claiming lightning receive: {:#}", e);
740 failed += 1;
741 }
742 }
743 }
744
745 if failed > 0 {
746 info!(
747 "Lightning receive claims: {} succeeded, {} failed out of {} pending",
748 claimed.len(), failed, total
749 );
750 }
751
752 if claimed.is_empty() {
753 anyhow::bail!("All {} lightning receive claim(s) failed", failed);
754 }
755
756 Ok(claimed)
757 }
758}