1#![doc = include_str!("../README.md")]
6#![warn(missing_docs)]
7#![warn(rustdoc::bare_urls)]
8
9use std::cmp::max;
10use std::path::PathBuf;
11use std::pin::Pin;
12use std::str::FromStr;
13use std::sync::atomic::{AtomicBool, Ordering};
14use std::sync::Arc;
15
16use anyhow::anyhow;
17use async_trait::async_trait;
18use cdk_common::amount::{to_unit, Amount, MSAT_IN_SAT};
19use cdk_common::bitcoin::hashes::Hash;
20use cdk_common::common::FeeReserve;
21use cdk_common::database::mint::DynMintKVStore;
22use cdk_common::nuts::{CurrencyUnit, MeltOptions, MeltQuoteState};
23use cdk_common::payment::{
24 self, Bolt11Settings, CreateIncomingPaymentResponse, Event, IncomingPaymentOptions,
25 MakePaymentResponse, MintPayment, OutgoingPaymentOptions, PaymentIdentifier,
26 PaymentQuoteResponse, WaitPaymentResponse,
27};
28use cdk_common::util::hex;
29use cdk_common::Bolt11Invoice;
30use error::Error;
31use futures::{Stream, StreamExt};
32use lnrpc::fee_limit::Limit;
33use lnrpc::payment::PaymentStatus;
34use lnrpc::{FeeLimit, Hop, MppRecord};
35use tokio_util::sync::CancellationToken;
36use tracing::instrument;
37
38mod client;
39pub mod error;
40
41mod proto;
42pub(crate) use proto::{lnrpc, routerrpc};
43
44use crate::lnrpc::invoice::InvoiceState;
45
46const LND_KV_PRIMARY_NAMESPACE: &str = "cdk_lnd_lightning_backend";
48const LND_KV_SECONDARY_NAMESPACE: &str = "payment_indices";
49const LAST_ADD_INDEX_KV_KEY: &str = "last_add_index";
50const LAST_SETTLE_INDEX_KV_KEY: &str = "last_settle_index";
51
52#[derive(Clone)]
54pub struct Lnd {
55 _address: String,
56 _cert_file: PathBuf,
57 _macaroon_file: PathBuf,
58 lnd_client: client::Client,
59 fee_reserve: FeeReserve,
60 kv_store: DynMintKVStore,
61 wait_invoice_cancel_token: CancellationToken,
62 wait_invoice_is_active: Arc<AtomicBool>,
63 settings: Bolt11Settings,
64}
65
66impl Lnd {
67 pub const MAX_ROUTE_RETRIES: usize = 50;
69
70 pub async fn new(
72 address: String,
73 cert_file: PathBuf,
74 macaroon_file: PathBuf,
75 fee_reserve: FeeReserve,
76 kv_store: DynMintKVStore,
77 ) -> Result<Self, Error> {
78 if address.is_empty() {
80 return Err(Error::InvalidConfig("LND address cannot be empty".into()));
81 }
82
83 if !cert_file.exists() || cert_file.metadata().map(|m| m.len() == 0).unwrap_or(true) {
85 return Err(Error::InvalidConfig(format!(
86 "LND certificate file not found or empty: {cert_file:?}"
87 )));
88 }
89
90 if !macaroon_file.exists()
92 || macaroon_file
93 .metadata()
94 .map(|m| m.len() == 0)
95 .unwrap_or(true)
96 {
97 return Err(Error::InvalidConfig(format!(
98 "LND macaroon file not found or empty: {macaroon_file:?}"
99 )));
100 }
101
102 let lnd_client = client::connect(&address, &cert_file, &macaroon_file)
103 .await
104 .map_err(|err| {
105 tracing::error!("Connection error: {}", err.to_string());
106 Error::Connection
107 })
108 .unwrap();
109
110 Ok(Self {
111 _address: address,
112 _cert_file: cert_file,
113 _macaroon_file: macaroon_file,
114 lnd_client,
115 fee_reserve,
116 kv_store,
117 wait_invoice_cancel_token: CancellationToken::new(),
118 wait_invoice_is_active: Arc::new(AtomicBool::new(false)),
119 settings: Bolt11Settings {
120 mpp: true,
121 unit: CurrencyUnit::Msat,
122 invoice_description: true,
123 amountless: true,
124 bolt12: false,
125 },
126 })
127 }
128
129 #[instrument(skip_all)]
131 async fn get_last_indices(&self) -> Result<(Option<u64>, Option<u64>), Error> {
132 let add_index = if let Some(stored_index) = self
133 .kv_store
134 .kv_read(
135 LND_KV_PRIMARY_NAMESPACE,
136 LND_KV_SECONDARY_NAMESPACE,
137 LAST_ADD_INDEX_KV_KEY,
138 )
139 .await
140 .map_err(|e| Error::Database(e.to_string()))?
141 {
142 if let Ok(index_str) = std::str::from_utf8(stored_index.as_slice()) {
143 index_str.parse::<u64>().ok()
144 } else {
145 None
146 }
147 } else {
148 None
149 };
150
151 let settle_index = if let Some(stored_index) = self
152 .kv_store
153 .kv_read(
154 LND_KV_PRIMARY_NAMESPACE,
155 LND_KV_SECONDARY_NAMESPACE,
156 LAST_SETTLE_INDEX_KV_KEY,
157 )
158 .await
159 .map_err(|e| Error::Database(e.to_string()))?
160 {
161 if let Ok(index_str) = std::str::from_utf8(stored_index.as_slice()) {
162 index_str.parse::<u64>().ok()
163 } else {
164 None
165 }
166 } else {
167 None
168 };
169
170 tracing::debug!(
171 "LND: Retrieved last indices from KV store - add_index: {:?}, settle_index: {:?}",
172 add_index,
173 settle_index
174 );
175 Ok((add_index, settle_index))
176 }
177}
178
179#[async_trait]
180impl MintPayment for Lnd {
181 type Err = payment::Error;
182
183 #[instrument(skip_all)]
184 async fn get_settings(&self) -> Result<serde_json::Value, Self::Err> {
185 Ok(serde_json::to_value(&self.settings)?)
186 }
187
188 #[instrument(skip_all)]
189 fn is_wait_invoice_active(&self) -> bool {
190 self.wait_invoice_is_active.load(Ordering::SeqCst)
191 }
192
193 #[instrument(skip_all)]
194 fn cancel_wait_invoice(&self) {
195 self.wait_invoice_cancel_token.cancel()
196 }
197
198 #[instrument(skip_all)]
199 async fn wait_payment_event(
200 &self,
201 ) -> Result<Pin<Box<dyn Stream<Item = Event> + Send>>, Self::Err> {
202 let mut lnd_client = self.lnd_client.clone();
203
204 let (last_add_index, last_settle_index) =
206 self.get_last_indices().await.unwrap_or((None, None));
207
208 let stream_req = lnrpc::InvoiceSubscription {
209 add_index: last_add_index.unwrap_or(0),
210 settle_index: last_settle_index.unwrap_or(0),
211 };
212
213 tracing::debug!(
214 "LND: Starting invoice subscription with add_index: {}, settle_index: {}",
215 stream_req.add_index,
216 stream_req.settle_index
217 );
218
219 let stream = lnd_client
220 .lightning()
221 .subscribe_invoices(stream_req)
222 .await
223 .map_err(|_err| {
224 tracing::error!("Could not subscribe to invoice");
225 Error::Connection
226 })?
227 .into_inner();
228
229 let cancel_token = self.wait_invoice_cancel_token.clone();
230 let kv_store = self.kv_store.clone();
231
232 let event_stream = futures::stream::unfold(
233 (
234 stream,
235 cancel_token,
236 Arc::clone(&self.wait_invoice_is_active),
237 kv_store,
238 last_add_index.unwrap_or(0),
239 last_settle_index.unwrap_or(0),
240 ),
241 |(
242 mut stream,
243 cancel_token,
244 is_active,
245 kv_store,
246 mut current_add_index,
247 mut current_settle_index,
248 )| async move {
249 is_active.store(true, Ordering::SeqCst);
250
251 loop {
252 tokio::select! {
253 _ = cancel_token.cancelled() => {
254 is_active.store(false, Ordering::SeqCst);
256 tracing::info!("Waiting for lnd invoice ending");
257 return None;
258 }
259 msg = stream.message() => {
260 match msg {
261 Ok(Some(msg)) => {
262 current_add_index = current_add_index.max(msg.add_index);
264 current_settle_index = current_settle_index.max(msg.settle_index);
265
266 let add_index_str = current_add_index.to_string();
268 let settle_index_str = current_settle_index.to_string();
269
270 if let Ok(mut tx) = kv_store.begin_transaction().await {
271 let mut has_error = false;
272
273 if let Err(e) = tx.kv_write(LND_KV_PRIMARY_NAMESPACE, LND_KV_SECONDARY_NAMESPACE, LAST_ADD_INDEX_KV_KEY, add_index_str.as_bytes()).await {
274 tracing::warn!("LND: Failed to write add_index {} to KV store: {}", current_add_index, e);
275 has_error = true;
276 }
277
278 if let Err(e) = tx.kv_write(LND_KV_PRIMARY_NAMESPACE, LND_KV_SECONDARY_NAMESPACE, LAST_SETTLE_INDEX_KV_KEY, settle_index_str.as_bytes()).await {
279 tracing::warn!("LND: Failed to write settle_index {} to KV store: {}", current_settle_index, e);
280 has_error = true;
281 }
282
283 if !has_error {
284 if let Err(e) = tx.commit().await {
285 tracing::warn!("LND: Failed to commit indices to KV store: {}", e);
286 } else {
287 tracing::debug!("LND: Stored updated indices - add_index: {}, settle_index: {}", current_add_index, current_settle_index);
288 }
289 }
290 } else {
291 tracing::warn!("LND: Failed to begin KV transaction for storing indices");
292 }
293
294 if msg.state() == InvoiceState::Settled {
296 let hash_slice: Result<[u8;32], _> = msg.r_hash.try_into();
297
298 if let Ok(hash_slice) = hash_slice {
299 let hash = hex::encode(hash_slice);
300
301 tracing::info!("LND: Payment for {} with amount {} msat", hash, msg.amt_paid_msat);
302
303 let wait_response = WaitPaymentResponse {
304 payment_identifier: PaymentIdentifier::PaymentHash(hash_slice),
305 payment_amount: Amount::from(msg.amt_paid_msat as u64),
306 unit: CurrencyUnit::Msat,
307 payment_id: hash,
308 };
309 let event = Event::PaymentReceived(wait_response);
310 return Some((event, (stream, cancel_token, is_active, kv_store, current_add_index, current_settle_index)));
311 } else {
312 tracing::error!("LND returned invalid payment hash");
314 continue;
316 }
317 } else {
318 tracing::debug!("LND: Received non-settled invoice, continuing to wait for settled invoices");
320 continue;
322 }
323 }
324 Ok(None) => {
325 is_active.store(false, Ordering::SeqCst);
326 tracing::info!("LND invoice stream ended.");
327 return None;
328 }
329 Err(err) => {
330 is_active.store(false, Ordering::SeqCst);
331 tracing::warn!("Encountered error in LND invoice stream. Stream ending");
332 tracing::error!("{:?}", err);
333 return None;
334 }
335 }
336 }
337 }
338 }
339 },
340 );
341
342 Ok(Box::pin(event_stream))
343 }
344
345 #[instrument(skip_all)]
346 async fn get_payment_quote(
347 &self,
348 unit: &CurrencyUnit,
349 options: OutgoingPaymentOptions,
350 ) -> Result<PaymentQuoteResponse, Self::Err> {
351 match options {
352 OutgoingPaymentOptions::Bolt11(bolt11_options) => {
353 let amount_msat = match bolt11_options.melt_options {
354 Some(amount) => amount.amount_msat(),
355 None => bolt11_options
356 .bolt11
357 .amount_milli_satoshis()
358 .ok_or(Error::UnknownInvoiceAmount)?
359 .into(),
360 };
361
362 let amount = to_unit(amount_msat, &CurrencyUnit::Msat, unit)?;
363
364 let relative_fee_reserve =
365 (self.fee_reserve.percent_fee_reserve * u64::from(amount) as f32) as u64;
366
367 let absolute_fee_reserve: u64 = self.fee_reserve.min_fee_reserve.into();
368
369 let fee = max(relative_fee_reserve, absolute_fee_reserve);
370
371 Ok(PaymentQuoteResponse {
372 request_lookup_id: Some(PaymentIdentifier::PaymentHash(
373 *bolt11_options.bolt11.payment_hash().as_ref(),
374 )),
375 amount,
376 fee: fee.into(),
377 state: MeltQuoteState::Unpaid,
378 unit: unit.clone(),
379 })
380 }
381 OutgoingPaymentOptions::Bolt12(_) => {
382 Err(Self::Err::Anyhow(anyhow!("BOLT12 not supported by LND")))
383 }
384 }
385 }
386
387 #[instrument(skip_all)]
388 async fn make_payment(
389 &self,
390 _unit: &CurrencyUnit,
391 options: OutgoingPaymentOptions,
392 ) -> Result<MakePaymentResponse, Self::Err> {
393 match options {
394 OutgoingPaymentOptions::Bolt11(bolt11_options) => {
395 let bolt11 = bolt11_options.bolt11;
396
397 let pay_state = self
398 .check_outgoing_payment(&PaymentIdentifier::PaymentHash(
399 *bolt11.payment_hash().as_ref(),
400 ))
401 .await?;
402
403 match pay_state.status {
404 MeltQuoteState::Unpaid | MeltQuoteState::Unknown | MeltQuoteState::Failed => (),
405 MeltQuoteState::Paid => {
406 tracing::debug!("Melt attempted on invoice already paid");
407 return Err(Self::Err::InvoiceAlreadyPaid);
408 }
409 MeltQuoteState::Pending => {
410 tracing::debug!("Melt attempted on invoice already pending");
411 return Err(Self::Err::InvoicePaymentPending);
412 }
413 }
414
415 match bolt11_options.melt_options {
417 Some(MeltOptions::Mpp { mpp }) => {
418 let amount_msat: u64 = bolt11
419 .amount_milli_satoshis()
420 .ok_or(Error::UnknownInvoiceAmount)?;
421 {
422 let partial_amount_msat = mpp.amount;
423 let invoice = bolt11;
424 let max_fee: Option<Amount> = bolt11_options.max_fee_amount;
425
426 let pub_key = invoice.get_payee_pub_key();
428 let payer_addr = invoice.payment_secret().0.to_vec();
429 let payment_hash = invoice.payment_hash();
430
431 let mut lnd_client = self.lnd_client.clone();
432
433 for attempt in 0..Self::MAX_ROUTE_RETRIES {
434 let route_req = lnrpc::QueryRoutesRequest {
436 pub_key: hex::encode(pub_key.serialize()),
437 amt_msat: u64::from(partial_amount_msat) as i64,
438 fee_limit: max_fee.map(|f| {
439 let limit = Limit::Fixed(u64::from(f) as i64);
440 FeeLimit { limit: Some(limit) }
441 }),
442 use_mission_control: true,
443 ..Default::default()
444 };
445
446 let mut routes_response = lnd_client
448 .lightning()
449 .query_routes(route_req)
450 .await
451 .map_err(Error::LndError)?
452 .into_inner();
453
454 let last_hop: &mut Hop = routes_response.routes[0]
457 .hops
458 .last_mut()
459 .ok_or(Error::MissingLastHop)?;
460 let mpp_record = MppRecord {
461 payment_addr: payer_addr.clone(),
462 total_amt_msat: amount_msat as i64,
463 };
464 last_hop.mpp_record = Some(mpp_record);
465
466 let payment_response = lnd_client
467 .router()
468 .send_to_route_v2(routerrpc::SendToRouteRequest {
469 payment_hash: payment_hash.to_byte_array().to_vec(),
470 route: Some(routes_response.routes[0].clone()),
471 ..Default::default()
472 })
473 .await
474 .map_err(Error::LndError)?
475 .into_inner();
476
477 if let Some(failure) = payment_response.failure {
478 if failure.code == 15 {
479 tracing::debug!(
480 "Attempt number {}: route has failed. Re-querying...",
481 attempt + 1
482 );
483 continue;
484 }
485 }
486
487 let (status, payment_preimage) = match payment_response.status {
489 0 => (MeltQuoteState::Pending, None),
490 1 => (
491 MeltQuoteState::Paid,
492 Some(hex::encode(payment_response.preimage)),
493 ),
494 2 => (MeltQuoteState::Unpaid, None),
495 _ => (MeltQuoteState::Unknown, None),
496 };
497
498 let mut total_amt: u64 = 0;
500 if let Some(route) = payment_response.route {
501 total_amt = (route.total_amt_msat / 1000) as u64;
502 }
503
504 return Ok(MakePaymentResponse {
505 payment_lookup_id: PaymentIdentifier::PaymentHash(
506 payment_hash.to_byte_array(),
507 ),
508 payment_proof: payment_preimage,
509 status,
510 total_spent: total_amt.into(),
511 unit: CurrencyUnit::Sat,
512 });
513 }
514
515 tracing::error!("Limit of retries reached, payment couldn't succeed.");
518 Err(Error::PaymentFailed.into())
519 }
520 }
521 _ => {
522 let mut lnd_client = self.lnd_client.clone();
523
524 let max_fee: Option<Amount> = bolt11_options.max_fee_amount;
525
526 let amount_msat = u64::from(
527 bolt11_options
528 .melt_options
529 .map(|a| a.amount_msat())
530 .unwrap_or_default(),
531 );
532
533 let pay_req = lnrpc::SendRequest {
534 payment_request: bolt11.to_string(),
535 fee_limit: max_fee.map(|f| {
536 let limit = Limit::Fixed(u64::from(f) as i64);
537 FeeLimit { limit: Some(limit) }
538 }),
539 amt_msat: amount_msat as i64,
540 ..Default::default()
541 };
542
543 let payment_response = lnd_client
544 .lightning()
545 .send_payment_sync(tonic::Request::new(pay_req))
546 .await
547 .map_err(|err| {
548 tracing::warn!("Lightning payment failed: {}", err);
549 Error::PaymentFailed
550 })?
551 .into_inner();
552
553 let total_amount = payment_response
554 .payment_route
555 .map_or(0, |route| route.total_amt_msat / MSAT_IN_SAT as i64)
556 as u64;
557
558 let (status, payment_preimage) = match total_amount == 0 {
559 true => (MeltQuoteState::Unpaid, None),
560 false => (
561 MeltQuoteState::Paid,
562 Some(hex::encode(payment_response.payment_preimage)),
563 ),
564 };
565
566 let payment_identifier =
567 PaymentIdentifier::PaymentHash(*bolt11.payment_hash().as_ref());
568
569 Ok(MakePaymentResponse {
570 payment_lookup_id: payment_identifier,
571 payment_proof: payment_preimage,
572 status,
573 total_spent: total_amount.into(),
574 unit: CurrencyUnit::Sat,
575 })
576 }
577 }
578 }
579 OutgoingPaymentOptions::Bolt12(_) => {
580 Err(Self::Err::Anyhow(anyhow!("BOLT12 not supported by LND")))
581 }
582 }
583 }
584
585 #[instrument(skip(self, options))]
586 async fn create_incoming_payment_request(
587 &self,
588 unit: &CurrencyUnit,
589 options: IncomingPaymentOptions,
590 ) -> Result<CreateIncomingPaymentResponse, Self::Err> {
591 match options {
592 IncomingPaymentOptions::Bolt11(bolt11_options) => {
593 let description = bolt11_options.description.unwrap_or_default();
594 let amount = bolt11_options.amount;
595 let unix_expiry = bolt11_options.unix_expiry;
596
597 let amount_msat = to_unit(amount, unit, &CurrencyUnit::Msat)?;
598
599 let invoice_request = lnrpc::Invoice {
600 value_msat: u64::from(amount_msat) as i64,
601 memo: description,
602 ..Default::default()
603 };
604
605 let mut lnd_client = self.lnd_client.clone();
606
607 let invoice = lnd_client
608 .lightning()
609 .add_invoice(tonic::Request::new(invoice_request))
610 .await
611 .map_err(|e| payment::Error::Anyhow(anyhow!(e)))?
612 .into_inner();
613
614 let bolt11 = Bolt11Invoice::from_str(&invoice.payment_request)?;
615
616 let payment_identifier =
617 PaymentIdentifier::PaymentHash(*bolt11.payment_hash().as_ref());
618
619 Ok(CreateIncomingPaymentResponse {
620 request_lookup_id: payment_identifier,
621 request: bolt11.to_string(),
622 expiry: unix_expiry,
623 })
624 }
625 IncomingPaymentOptions::Bolt12(_) => {
626 Err(Self::Err::Anyhow(anyhow!("BOLT12 not supported by LND")))
627 }
628 }
629 }
630
631 #[instrument(skip(self))]
632 async fn check_incoming_payment_status(
633 &self,
634 payment_identifier: &PaymentIdentifier,
635 ) -> Result<Vec<WaitPaymentResponse>, Self::Err> {
636 let mut lnd_client = self.lnd_client.clone();
637
638 let invoice_request = lnrpc::PaymentHash {
639 r_hash: hex::decode(payment_identifier.to_string()).unwrap(),
640 ..Default::default()
641 };
642
643 let invoice = lnd_client
644 .lightning()
645 .lookup_invoice(tonic::Request::new(invoice_request))
646 .await
647 .map_err(|e| payment::Error::Anyhow(anyhow!(e)))?
648 .into_inner();
649
650 if invoice.state() == InvoiceState::Settled {
651 Ok(vec![WaitPaymentResponse {
652 payment_identifier: payment_identifier.clone(),
653 payment_amount: Amount::from(invoice.amt_paid_msat as u64),
654 unit: CurrencyUnit::Msat,
655 payment_id: hex::encode(invoice.r_hash),
656 }])
657 } else {
658 Ok(vec![])
659 }
660 }
661
662 #[instrument(skip(self))]
663 async fn check_outgoing_payment(
664 &self,
665 payment_identifier: &PaymentIdentifier,
666 ) -> Result<MakePaymentResponse, Self::Err> {
667 let mut lnd_client = self.lnd_client.clone();
668
669 let payment_hash = &payment_identifier.to_string();
670
671 let track_request = routerrpc::TrackPaymentRequest {
672 payment_hash: hex::decode(payment_hash).map_err(|_| Error::InvalidHash)?,
673 no_inflight_updates: true,
674 };
675
676 let payment_response = lnd_client.router().track_payment_v2(track_request).await;
677
678 let mut payment_stream = match payment_response {
679 Ok(stream) => stream.into_inner(),
680 Err(err) => {
681 let err_code = err.code();
682 if err_code == tonic::Code::NotFound {
683 return Ok(MakePaymentResponse {
684 payment_lookup_id: payment_identifier.clone(),
685 payment_proof: None,
686 status: MeltQuoteState::Unknown,
687 total_spent: Amount::ZERO,
688 unit: self.settings.unit.clone(),
689 });
690 } else {
691 return Err(payment::Error::UnknownPaymentState);
692 }
693 }
694 };
695
696 while let Some(update_result) = payment_stream.next().await {
697 match update_result {
698 Ok(update) => {
699 let status = update.status();
700
701 let response = match status {
702 PaymentStatus::Unknown => MakePaymentResponse {
703 payment_lookup_id: payment_identifier.clone(),
704 payment_proof: Some(update.payment_preimage),
705 status: MeltQuoteState::Unknown,
706 total_spent: Amount::ZERO,
707 unit: self.settings.unit.clone(),
708 },
709 PaymentStatus::InFlight | PaymentStatus::Initiated => {
710 continue;
712 }
713 PaymentStatus::Succeeded => MakePaymentResponse {
714 payment_lookup_id: payment_identifier.clone(),
715 payment_proof: Some(update.payment_preimage),
716 status: MeltQuoteState::Paid,
717 total_spent: Amount::from(
718 (update
719 .value_sat
720 .checked_add(update.fee_sat)
721 .ok_or(Error::AmountOverflow)?)
722 as u64,
723 ),
724 unit: CurrencyUnit::Sat,
725 },
726 PaymentStatus::Failed => MakePaymentResponse {
727 payment_lookup_id: payment_identifier.clone(),
728 payment_proof: Some(update.payment_preimage),
729 status: MeltQuoteState::Failed,
730 total_spent: Amount::ZERO,
731 unit: self.settings.unit.clone(),
732 },
733 };
734
735 return Ok(response);
736 }
737 Err(_) => {
738 return Err(Error::UnknownPaymentStatus.into());
740 }
741 }
742 }
743
744 Err(Error::UnknownPaymentStatus.into())
746 }
747}