1use std::str::FromStr;
2use std::time::Duration;
3
4use anyhow::Context;
5use ark::arkoor::package::ArkoorPackageBuilder;
6use bitcoin::{Amount, SignedAmount};
7use bitcoin::hex::DisplayHex;
8use futures::StreamExt;
9use lightning_invoice::Bolt11Invoice;
10use log::{trace, debug, info, warn};
11
12use ark::{ProtocolEncoding, Vtxo, VtxoPolicy};
13use ark::attestations::{LightningReceiveAttestation};
14use ark::fees::validate_and_subtract_fee;
15use ark::lightning::{Bolt11InvoiceExt, PaymentHash, Preimage};
16use bitcoin_ext::{BlockDelta, BlockHeight};
17use server_rpc::protos;
18use server_rpc::protos::prepare_lightning_receive_claim_request::LightningReceiveAntiDos;
19
20use crate::subsystem::{LightningMovement, LightningReceiveMovement, Subsystem};
21use crate::{Wallet, error};
22use crate::movement::{MovementDestination, MovementStatus};
23use crate::movement::update::MovementUpdate;
24use crate::persist::models::LightningReceive;
25
26const LIGHTNING_RECEIVE_LOCK_PREFIX: &str = "lightning_receive";
27
28const LIGHTNING_PREPARE_CLAIM_DELTA: BlockDelta = 2;
31
32const CLAIM_RETRY_BACKOFF_INITIAL: Duration = Duration::from_secs(2);
37const CLAIM_RETRY_BACKOFF_MAX: Duration = Duration::from_secs(30);
38
39impl Wallet {
40 pub async fn pending_lightning_receives(&self) -> anyhow::Result<Vec<LightningReceive>> {
42 Ok(self.inner.db.get_all_pending_lightning_receives().await?)
43 }
44
45 pub async fn claimable_lightning_receive_balance(&self) -> anyhow::Result<Amount> {
48 let receives = self.pending_lightning_receives().await?;
49
50 let mut total = Amount::ZERO;
51 for receive in receives {
52 total += receive.htlc_vtxos.iter().map(|v| v.amount()).sum::<Amount>();
53 }
54
55 Ok(total)
56 }
57
58 pub async fn bolt11_invoice(
65 &self,
66 amount: Amount,
67 description: Option<String>,
68 ) -> anyhow::Result<Bolt11Invoice> {
69 if amount == Amount::ZERO {
70 bail!("Cannot create invoice for 0 sats (this would create an explicit 0 sat invoice, not an any-amount invoice)");
71 }
72
73 let (mut srv, ark_info) = self.require_server().await?;
74 let config = self.config();
75
76 let fee = ark_info.fees.lightning_receive.calculate(amount).context("fee overflowed")?;
78 validate_and_subtract_fee(amount, fee)?;
79
80 let requested_min_cltv_delta = ark_info.vtxo_exit_delta +
85 ark_info.htlc_expiry_delta +
86 config.vtxo_exit_margin +
87 config.htlc_recv_claim_delta +
88 LIGHTNING_PREPARE_CLAIM_DELTA;
89
90 if requested_min_cltv_delta > ark_info.max_user_invoice_cltv_delta {
91 bail!("HTLC CLTV delta ({}) is greater than Server's max HTLC recv CLTV delta: {}",
92 requested_min_cltv_delta,
93 ark_info.max_user_invoice_cltv_delta,
94 );
95 }
96
97 let preimage = Preimage::random();
98 let payment_hash = preimage.compute_payment_hash();
99 info!("Start bolt11 board with preimage / payment hash: {} / {}",
100 preimage.as_hex(), payment_hash.as_hex());
101
102 let mailbox_kp = self.inner.seed.to_mailbox_keypair();
103 let mailbox_id = ark::mailbox::MailboxIdentifier::from_pubkey(mailbox_kp.public_key());
104
105 let req = protos::StartLightningReceiveRequest {
106 payment_hash: payment_hash.to_vec(),
107 amount_sat: amount.to_sat(),
108 min_cltv_delta: requested_min_cltv_delta as u32,
109 mailbox_id: Some(mailbox_id.serialize()),
110 description,
111 };
112
113 let resp = srv.client.start_lightning_receive(req).await?.into_inner();
114 info!("Ark Server is ready to receive LN payment to invoice: {}.", resp.bolt11);
115
116 let invoice = Bolt11Invoice::from_str(&resp.bolt11)
117 .context("invalid bolt11 invoice returned by Ark server")?;
118
119 self.inner.db.store_lightning_receive(
120 payment_hash,
121 preimage,
122 &invoice,
123 requested_min_cltv_delta,
124 ).await?;
125
126 Ok(invoice)
127 }
128
129 pub async fn lightning_receive_status(
131 &self,
132 payment: impl Into<PaymentHash>,
133 ) -> anyhow::Result<Option<LightningReceive>> {
134 Ok(self.inner.db.fetch_lightning_receive_by_payment_hash(payment.into()).await?)
135 }
136
137 async fn claim_lightning_receive(
158 &self,
159 receive: &mut LightningReceive,
160 ) -> anyhow::Result<()> {
161 let movement_id = receive.movement_id
162 .context("No movement created for lightning receive")?;
163 let (mut srv, _) = self.require_server().await?;
164
165 ensure!(!receive.htlc_vtxos.is_empty(), "no HTLC VTXOs set on record yet");
168 let mut input_ids = receive.htlc_vtxos.iter().map(|v| v.vtxo.id()).collect::<Vec<_>>();
169 input_ids.sort();
170 let inputs = self.inner.db.get_full_vtxos(&input_ids).await
171 .context("failed to hydrate htlc input vtxos")?;
172
173 let mut keypairs = Vec::with_capacity(inputs.len());
174 for v in &inputs {
175 keypairs.push(self.get_vtxo_key(v).await?);
176 }
177
178 let (claim_keypair, _) = self.derive_store_next_keypair().await?;
180 let receive_policy = VtxoPolicy::new_pubkey(claim_keypair.public_key());
181
182 trace!("ln arkoor builder params: inputs: {:?}; policy: {:?}", input_ids, receive_policy);
183 let builder = ArkoorPackageBuilder::new_claim_all_without_checkpoints(
184 inputs,
185 receive_policy.clone(),
186 ).context("creating claim arkoor builder failed")?;
187 let builder = builder.generate_user_nonces(&keypairs)
188 .context("arkoor nonce generation for claim failed")?;
189
190 info!("Claiming arkoor against payment preimage");
191 self.inner.db.set_preimage_revealed(receive.payment_hash).await?;
192 let package_cosign_request = protos::ArkoorPackageCosignRequest::from(
193 builder.cosign_request(),
194 );
195 let resp = srv.client.claim_lightning_receive(protos::ClaimLightningReceiveRequest {
196 payment_hash: receive.payment_hash.to_byte_array().to_vec(),
197 payment_preimage: receive.payment_preimage.to_vec(),
198 cosign_request: Some(package_cosign_request),
199 }).await?.into_inner();
200 let cosign_resp = resp.try_into().context("invalid cosign response")?;
201
202 let outputs = builder.user_cosign(&keypairs, cosign_resp)
203 .context("claim arkoor cosign failed with user response")?
204 .build_signed_vtxos();
205
206 self.register_vtxo_transactions_with_server(&outputs).await?;
208
209 let mut effective_balance = Amount::ZERO;
210 for vtxo in &outputs {
211 trace!("Validating Lightning receive claim VTXO {}: {}",
216 vtxo.id(), vtxo.serialize_hex(),
217 );
218 self.validate_vtxo(vtxo).await
219 .context("invalid arkoor from lightning receive")?;
220 effective_balance += vtxo.amount();
221 }
222
223 self.store_spendable_vtxos(&outputs).await?;
224 self.mark_vtxos_as_spent(&receive.htlc_vtxos).await?;
225
226 info!("Got arkoors from lightning: {}",
227 outputs.iter().map(|v| v.id().to_string()).collect::<Vec<_>>().join(", ")
228 );
229
230 self.inner.movements.finish_movement_with_update(
231 movement_id,
232 MovementStatus::Successful,
233 MovementUpdate::new()
234 .effective_balance(effective_balance.to_signed()?)
235 .produced_vtxos(&outputs)
236 ).await?;
237
238 self.inner.db.finish_pending_lightning_receive(receive.payment_hash).await?;
239 *receive = self.inner.db.fetch_lightning_receive_by_payment_hash(receive.payment_hash).await
240 .context("Database error")?
241 .context("Receive not found")?;
242
243 Ok(())
244 }
245
246 async fn compute_lightning_receive_anti_dos(
247 &self,
248 payment_hash: PaymentHash,
249 token: Option<&str>,
250 ) -> anyhow::Result<LightningReceiveAntiDos> {
251 Ok(if let Some(token) = token {
252 LightningReceiveAntiDos::Token(token.to_string())
253 } else {
254 let vtxo = self.select_vtxos_to_cover(Amount::ONE_SAT).await
256 .and_then(|vtxos| vtxos.into_iter().next()
257 .context("have no spendable vtxo to prove ownership of")
258 )?;
259 let vtxo_keypair = self.get_vtxo_key(&vtxo).await.expect("owned vtxo should be in database");
260 let attestation = LightningReceiveAttestation::new(payment_hash, vtxo.id(), &vtxo_keypair);
261 LightningReceiveAntiDos::InputVtxo(protos::InputVtxo {
262 vtxo_id: vtxo.id().to_bytes().to_vec(),
263 attestation: attestation.serialize(),
264 })
265 })
266 }
267
268 async fn check_lightning_receive(
299 &self,
300 payment_hash: PaymentHash,
301 wait: bool,
302 token: Option<&str>,
303 ) -> anyhow::Result<Option<LightningReceive>> {
304 let (mut srv, ark_info) = self.require_server().await?;
305 let current_height = self.inner.chain.tip().await?;
306
307 let mut receive = self.inner.db.fetch_lightning_receive_by_payment_hash(payment_hash).await?
308 .context("no pending lightning receive found for payment hash, might already be claimed")?;
309
310 if !receive.htlc_vtxos.is_empty() {
312 return Ok(Some(receive))
313 }
314
315 trace!("Requesting updates for ln-receive to server with for wait={} and hash={}", wait, payment_hash);
316 let sub = srv.client.check_lightning_receive(protos::CheckLightningReceiveRequest {
317 hash: payment_hash.to_byte_array().to_vec(), wait,
318 }).await?.into_inner();
319
320
321 let status = protos::LightningReceiveStatus::try_from(sub.status)
322 .with_context(|| format!("unknown payment status: {}", sub.status))?;
323
324 debug!("Received status {:?} for {}", status, payment_hash);
325 match status {
326 protos::LightningReceiveStatus::Accepted |
328 protos::LightningReceiveStatus::HtlcsReady => {},
329 protos::LightningReceiveStatus::Created => {
330 return Ok(None);
331 },
332 protos::LightningReceiveStatus::Settled => bail!("payment already settled"),
333 protos::LightningReceiveStatus::Canceled => {
334 warn!("payment was canceled. removing pending lightning receive");
335 self.exit_or_cancel_lightning_receive(&receive).await?;
336 return Ok(None);
337 },
338 }
339
340 let lightning_receive_anti_dos = match self.compute_lightning_receive_anti_dos(
341 payment_hash, token,
342 ).await {
343 Ok(anti_dos) => Some(anti_dos),
344 Err(e) => {
345 info!("Could not compute anti-dos: {e:#}. Trying without");
346 None
347 },
348 };
349
350 let htlc_recv_expiry = current_height + receive.htlc_recv_cltv_delta as BlockHeight;
351
352 let (next_keypair, _) = self.derive_store_next_keypair().await?;
353 let req = protos::PrepareLightningReceiveClaimRequest {
354 payment_hash: receive.payment_hash.to_vec(),
355 user_pubkey: next_keypair.public_key().serialize().to_vec(),
356 htlc_recv_expiry,
357 lightning_receive_anti_dos,
358 };
359 let res = srv.client.prepare_lightning_receive_claim(req).await
360 .context("error preparing lightning receive claim")?.into_inner();
361 let vtxos = res.htlc_vtxos.into_iter()
362 .map(|b| Vtxo::deserialize(&b))
363 .collect::<Result<Vec<_>, _>>()
364 .context("invalid htlc vtxos from server")?;
365
366 let mut htlc_amount = Amount::ZERO;
368 for vtxo in &vtxos {
369 trace!("Received HTLC VTXO {} from server: {}", vtxo.id(), vtxo.serialize_hex());
370 self.validate_vtxo(vtxo).await
371 .context("received invalid HTLC VTXO from server")?;
372 htlc_amount += vtxo.amount();
373
374 if let VtxoPolicy::ServerHtlcRecv(p) = vtxo.policy() {
375 if p.payment_hash != receive.payment_hash {
376 bail!("invalid payment hash on HTLC VTXOs received from server: {}",
377 p.payment_hash,
378 );
379 }
380 if p.user_pubkey != next_keypair.public_key() {
381 bail!("invalid pubkey on HTLC VTXOs received from server: {}", p.user_pubkey);
382 }
383 if p.htlc_expiry < htlc_recv_expiry {
384 bail!("HTLC VTXO expiry height is less than requested: Requested {}, received {}", htlc_recv_expiry, p.htlc_expiry);
385 }
386 } else {
387 bail!("invalid HTLC VTXO policy: {:?}", vtxo.policy());
388 }
389 }
390
391 let invoice_amount = receive.invoice.get_payment_amount(None)
395 .context("ln receive invoice should have amount")?;
396 let server_received_amount = res.receive.map(|r| Amount::from_sat(r.amount_sat));
397 let fee = {
398 let fee = server_received_amount
399 .and_then(|a| ark_info.fees.lightning_receive.calculate(a));
400 match (server_received_amount, fee) {
401 (Some(amount), Some(fee)) if htlc_amount + fee == amount => {
402 fee
404 },
405 _ => {
406 ark_info.fees.lightning_receive.calculate(invoice_amount)
411 .expect("we previously validated this")
412 }
413 }
414 };
415 let received = htlc_amount + fee;
416 ensure!(received >= invoice_amount,
417 "Server didn't return enough VTXOs to cover invoice amount"
418 );
419
420 let movement_id = if let Some(movement_id) = receive.movement_id {
421 movement_id
422 } else {
423 self.inner.movements.new_movement_with_update(
424 Subsystem::LIGHTNING_RECEIVE,
425 LightningReceiveMovement::Receive.to_string(),
426 MovementUpdate::new()
427 .intended_balance(invoice_amount.to_signed()?)
428 .effective_balance(htlc_amount.to_signed()?)
429 .fee(fee)
430 .metadata(LightningMovement::metadata(
431 receive.payment_hash, &vtxos, Some(receive.payment_preimage),
432 ))
433 .received_on(
434 [MovementDestination::new(receive.invoice.clone().into(), received)],
435 ),
436 ).await?
437 };
438 self.store_locked_vtxos(
439 &vtxos,
440 Some(crate::vtxo::VtxoLockHolder::Movement { id: movement_id }),
441 ).await?;
442
443 let vtxo_ids = vtxos.iter().map(|v| v.id()).collect::<Vec<_>>();
444 self.inner.db.update_lightning_receive(payment_hash, &vtxo_ids, movement_id).await?;
445
446 let mut wallet_vtxos = vec![];
447 for vtxo in vtxos {
448 let v = self.inner.db.get_wallet_vtxo(vtxo.id()).await?
449 .context("Failed to get wallet VTXO for lightning receive")?;
450 wallet_vtxos.push(v);
451 }
452
453 receive.htlc_vtxos = wallet_vtxos;
454 receive.movement_id = Some(movement_id);
455
456 Ok(Some(receive))
457 }
458
459 async fn exit_lightning_receive(
465 &self,
466 lightning_receive: &LightningReceive,
467 ) -> anyhow::Result<()> {
468 ensure!(!lightning_receive.htlc_vtxos.is_empty(), "no HTLC VTXOs to exit");
469 let vtxos = lightning_receive.htlc_vtxos.iter().map(|v| &v.vtxo).collect::<Vec<_>>();
470
471 info!("Exiting HTLC VTXOs for lightning_receive with payment hash {}", lightning_receive.payment_hash);
472 self.inner.exit.start_exit_for_vtxos(&vtxos).await?;
473
474 if let Some(movement_id) = lightning_receive.movement_id {
475 self.inner.movements.finish_movement_with_update(
476 movement_id,
477 MovementStatus::Failed,
478 MovementUpdate::new().exited_vtxos(vtxos),
479 ).await?;
480 } else {
481 error!("movement id is missing but we disclosed preimage: {}", lightning_receive.payment_hash);
482 }
483
484 self.inner.db.finish_pending_lightning_receive(lightning_receive.payment_hash).await?;
485 Ok(())
486 }
487
488 pub(crate) async fn exit_or_cancel_lightning_receive(
489 &self,
490 lightning_receive: &LightningReceive,
491 ) -> anyhow::Result<()> {
492 let vtxos = &lightning_receive.htlc_vtxos;
493
494 let update_opt = match (vtxos.is_empty(), lightning_receive.preimage_revealed_at) {
495 (false, Some(_)) => {
496 return self.exit_lightning_receive(lightning_receive).await;
497 }
498 (false, None) => {
499 warn!("HTLC-recv VTXOs are about to expire, but preimage has not been disclosed yet. Canceling");
500 self.mark_vtxos_as_spent(vtxos).await?;
501 if let Some(movement_id) = lightning_receive.movement_id {
502 Some((
503 movement_id,
504 MovementUpdate::new()
505 .effective_balance(SignedAmount::ZERO),
506 MovementStatus::Canceled,
507 ))
508 } else {
509 error!("movement id is missing but we got HTLC vtxos: {}", lightning_receive.payment_hash);
510 None
511 }
512 }
513 (true, Some(_)) => {
514 error!("No HTLC vtxos set on ln receive but preimage has been disclosed. Canceling");
515 lightning_receive.movement_id.map(|id| (id,
516 MovementUpdate::new()
517 .effective_balance(SignedAmount::ZERO),
518 MovementStatus::Canceled,
519 ))
520 }
521 (true, None) => None,
522 };
523
524 if let Some((movement_id, update, status)) = update_opt {
525 self.inner.movements.finish_movement_with_update(movement_id, status, update).await?;
526 }
527
528 self.inner.db.finish_pending_lightning_receive(lightning_receive.payment_hash).await?;
529
530 Ok(())
531 }
532
533 pub async fn cancel_lightning_receive(
542 &self,
543 payment_hash: PaymentHash,
544 ) -> anyhow::Result<()> {
545 let receive = self.inner.db.fetch_lightning_receive_by_payment_hash(payment_hash).await?
546 .context("no pending lightning receive found for this payment hash")?;
547
548 if receive.preimage_revealed_at.is_some() {
549 bail!("cannot cancel: preimage has already been revealed");
550 }
551
552 if receive.finished_at.is_some() {
553 bail!("lightning receive is already finished");
554 }
555
556 let (mut srv, _) = self.require_server().await?;
557 srv.client.cancel_lightning_receive(protos::CancelLightningReceiveRequest {
558 payment_hash: payment_hash.to_vec(),
559 }).await.context("server refused cancellation")?;
560
561 self.exit_or_cancel_lightning_receive(&receive).await?;
563
564 Ok(())
565 }
566
567 pub async fn try_claim_lightning_receive(
593 &self,
594 payment_hash: PaymentHash,
595 wait: bool,
596 token: Option<&str>,
597 ) -> anyhow::Result<LightningReceive> {
598 trace!("Claiming lightning receive for payment hash: {}", payment_hash);
599
600 let key = format!("{}.{}", LIGHTNING_RECEIVE_LOCK_PREFIX, payment_hash);
604 let _guard = match self.inner.lock_manager.try_lock(&key).await {
605 Some(guard) => guard,
606 None => {
607 debug!("Receive operation already in progress for this payment");
608 return self.inner.db.fetch_lightning_receive_by_payment_hash(payment_hash).await?
609 .context("no receive for payment hash");
610 },
611 };
612
613 self.try_claim_lightning_receive_inner(payment_hash, wait, token).await
614 }
615
616 async fn try_claim_lightning_receive_inner(
618 &self,
619 payment_hash: PaymentHash,
620 wait: bool,
621 token: Option<&str>,
622 ) -> anyhow::Result<LightningReceive> {
623 let mut receive = match self.check_lightning_receive(payment_hash, wait, token).await? {
626 Some(receive) => receive,
627 None => {
628 return self.inner.db.fetch_lightning_receive_by_payment_hash(payment_hash).await?
629 .context("No receive for payment_hash")
630 }
631 };
632
633 if receive.finished_at.is_some() {
634 return Ok(receive);
635 }
636
637 if receive.htlc_vtxos.is_empty() {
640 return Ok(receive);
641 }
642
643 let mut retries_left = self.inner.config.lightning_receive_claim_retries;
644 let mut backoff = CLAIM_RETRY_BACKOFF_INITIAL;
645 let claim_result = loop {
646 match self.claim_lightning_receive(&mut receive).await {
647 Ok(()) => break Ok(()),
648 Err(e) if retries_left == 0 => break Err(e),
649 Err(e) => {
650 warn!(
651 "Error claiming lightning receive {} ({} retries left, retrying in {:?}): {:#}",
652 receive.payment_hash, retries_left, backoff, e,
653 );
654 retries_left -= 1;
655 tokio::time::sleep(backoff).await;
656 backoff = (backoff * 2).min(CLAIM_RETRY_BACKOFF_MAX);
657 }
658 }
659 };
660
661 match claim_result {
662 Ok(()) => Ok(receive),
663 Err(e) => {
664 error!("Failed to claim htlcs for payment_hash: {}", receive.payment_hash);
665 warn!("Exiting lightning receive VTXOs");
669 self.exit_lightning_receive(&receive).await?;
670 return Err(e)
671 }
672 }
673 }
674
675 pub async fn try_claim_all_lightning_receives(&self, wait: bool) -> anyhow::Result<Vec<LightningReceive>> {
691 let pending = self.pending_lightning_receives().await?;
692 let total = pending.len();
693
694 if total == 0 {
695 return Ok(vec![]);
696 }
697
698 let results: Vec<_> = tokio_stream::iter(pending)
699 .map(|rcv| async move {
700 self.try_claim_lightning_receive(rcv.invoice.into(), wait, None).await
701 })
702 .buffer_unordered(3)
703 .collect()
704 .await;
705
706 let mut claimed = vec![];
707 let mut failed = 0;
708
709 for result in results {
710 match result {
711 Ok(receive) => claimed.push(receive),
712 Err(e) => {
713 error!("Error claiming lightning receive: {:#}", e);
714 failed += 1;
715 }
716 }
717 }
718
719 if failed > 0 {
720 info!(
721 "Lightning receive claims: {} succeeded, {} failed out of {} pending",
722 claimed.len(), failed, total
723 );
724 }
725
726 if claimed.is_empty() {
727 anyhow::bail!("All {} lightning receive claim(s) failed", failed);
728 }
729
730 Ok(claimed)
731 }
732}