1#![doc = include_str!("../README.md")]
4
5use std::net::SocketAddr;
6use std::pin::Pin;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use bip39::Mnemonic;
12use cdk_common::common::FeeReserve;
13use cdk_common::payment::{self, *};
14use cdk_common::util::{hex, unix_time};
15use cdk_common::{Amount, CurrencyUnit, MeltOptions, MeltQuoteState};
16use futures::{Stream, StreamExt};
17use ldk_node::bitcoin::hashes::Hash;
18use ldk_node::bitcoin::Network;
19use ldk_node::lightning::ln::channelmanager::PaymentId;
20use ldk_node::lightning::ln::msgs::SocketAddress;
21use ldk_node::lightning::routing::router::RouteParametersConfig;
22use ldk_node::lightning_invoice::{Bolt11InvoiceDescription, Description};
23use ldk_node::lightning_types::payment::PaymentHash;
24use ldk_node::logger::{LogLevel, LogWriter};
25use ldk_node::payment::{PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus};
26use ldk_node::{Builder, Event, Node};
27use tokio_stream::wrappers::BroadcastStream;
28use tokio_util::sync::CancellationToken;
29use tracing::instrument;
30
31use crate::error::Error;
32use crate::log::StdoutLogWriter;
33
34mod error;
35mod log;
36mod web;
37
38#[derive(Clone)]
43pub struct CdkLdkNode {
44 inner: Arc<Node>,
45 fee_reserve: FeeReserve,
46 wait_invoice_cancel_token: CancellationToken,
47 wait_invoice_is_active: Arc<AtomicBool>,
48 sender: tokio::sync::broadcast::Sender<WaitPaymentResponse>,
49 receiver: Arc<tokio::sync::broadcast::Receiver<WaitPaymentResponse>>,
50 events_cancel_token: CancellationToken,
51 web_addr: Option<SocketAddr>,
52}
53
54impl std::fmt::Debug for CdkLdkNode {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 f.debug_struct("CdkLdkNode")
57 .field("fee_reserve", &self.fee_reserve)
58 .field("web_addr", &self.web_addr)
59 .finish_non_exhaustive()
60 }
61}
62
63#[derive(Debug, Clone)]
67pub struct BitcoinRpcConfig {
68 pub host: String,
70 pub port: u16,
72 pub user: String,
74 pub password: String,
76}
77
78#[derive(Debug, Clone)]
83pub enum ChainSource {
84 Esplora(String),
88 BitcoinRpc(BitcoinRpcConfig),
92}
93
94#[derive(Debug, Clone)]
99pub enum GossipSource {
100 P2P,
104 RapidGossipSync(String),
108}
109#[derive(Debug)]
111pub struct CdkLdkNodeBuilder {
112 network: Network,
113 chain_source: ChainSource,
114 gossip_source: GossipSource,
115 log_dir_path: Option<String>,
116 storage_dir_path: String,
117 fee_reserve: FeeReserve,
118 listening_addresses: Vec<SocketAddress>,
119 seed: Option<Mnemonic>,
120 announcement_addresses: Option<Vec<SocketAddress>>,
121}
122
123impl CdkLdkNodeBuilder {
124 pub fn new(
126 network: Network,
127 chain_source: ChainSource,
128 gossip_source: GossipSource,
129 storage_dir_path: String,
130 fee_reserve: FeeReserve,
131 listening_addresses: Vec<SocketAddress>,
132 ) -> Self {
133 Self {
134 network,
135 chain_source,
136 gossip_source,
137 storage_dir_path,
138 fee_reserve,
139 listening_addresses,
140 seed: None,
141 announcement_addresses: None,
142 log_dir_path: None,
143 }
144 }
145
146 pub fn with_seed(mut self, seed: Mnemonic) -> Self {
148 self.seed = Some(seed);
149 self
150 }
151 pub fn with_announcement_address(mut self, announcement_addresses: Vec<SocketAddress>) -> Self {
153 self.announcement_addresses = Some(announcement_addresses);
154 self
155 }
156 pub fn with_log_dir_path(mut self, log_dir_path: String) -> Self {
158 self.log_dir_path = Some(log_dir_path);
159 self
160 }
161
162 pub fn build(self) -> Result<CdkLdkNode, Error> {
167 let mut ldk = Builder::new();
168 ldk.set_network(self.network);
169 tracing::info!("Storage dir of node is {}", self.storage_dir_path);
170 ldk.set_storage_dir_path(self.storage_dir_path);
171
172 match self.chain_source {
173 ChainSource::Esplora(esplora_url) => {
174 ldk.set_chain_source_esplora(esplora_url, None);
175 }
176 ChainSource::BitcoinRpc(BitcoinRpcConfig {
177 host,
178 port,
179 user,
180 password,
181 }) => {
182 ldk.set_chain_source_bitcoind_rpc(host, port, user, password);
183 }
184 }
185
186 match self.gossip_source {
187 GossipSource::P2P => {
188 ldk.set_gossip_source_p2p();
189 }
190 GossipSource::RapidGossipSync(rgs_url) => {
191 ldk.set_gossip_source_rgs(rgs_url);
192 }
193 }
194
195 ldk.set_listening_addresses(self.listening_addresses)?;
196 if self.log_dir_path.is_some() {
197 ldk.set_filesystem_logger(self.log_dir_path, Some(LogLevel::Info));
198 } else {
199 ldk.set_custom_logger(Arc::new(StdoutLogWriter));
200 }
201
202 ldk.set_node_alias("cdk-ldk-node".to_string())?;
203 if let Some(seed) = self.seed {
205 ldk.set_entropy_bip39_mnemonic(seed, None);
206 }
207 if let Some(announcement_addresses) = self.announcement_addresses {
209 ldk.set_announcement_addresses(announcement_addresses)?;
210 }
211
212 let node = ldk.build()?;
213
214 tracing::info!("Creating tokio channel for payment notifications");
215 let (sender, receiver) = tokio::sync::broadcast::channel(8);
216
217 let id = node.node_id();
218
219 let adr = node.announcement_addresses();
220
221 tracing::info!(
222 "Created node {} with address {:?} on network {}",
223 id,
224 adr,
225 self.network
226 );
227
228 Ok(CdkLdkNode {
229 inner: node.into(),
230 fee_reserve: self.fee_reserve,
231 wait_invoice_cancel_token: CancellationToken::new(),
232 wait_invoice_is_active: Arc::new(AtomicBool::new(false)),
233 sender,
234 receiver: Arc::new(receiver),
235 events_cancel_token: CancellationToken::new(),
236 web_addr: None,
237 })
238 }
239}
240
241impl CdkLdkNode {
242 pub fn set_web_addr(&mut self, addr: Option<SocketAddr>) {
247 self.web_addr = addr;
248 }
249
250 pub fn default_web_addr() -> SocketAddr {
255 SocketAddr::from(([127, 0, 0, 1], 8091))
256 }
257
258 fn make_payment_response_from_details(
259 unit: &CurrencyUnit,
260 payment_lookup_id: PaymentIdentifier,
261 payment_details: &PaymentDetails,
262 ) -> Result<MakePaymentResponse, payment::Error> {
263 let status = match payment_details.status {
264 PaymentStatus::Pending => MeltQuoteState::Pending,
265 PaymentStatus::Succeeded => MeltQuoteState::Paid,
266 PaymentStatus::Failed => MeltQuoteState::Failed,
267 };
268
269 let payment_proof = match &payment_details.kind {
270 PaymentKind::Bolt11 { preimage, .. } => preimage.map(|p| p.to_string()),
271 PaymentKind::Bolt12Offer { preimage, .. } => preimage.map(|p| p.to_string()),
272 _ => return Err(Error::UnexpectedPaymentKind.into()),
273 };
274
275 let total_spent = if status == MeltQuoteState::Paid {
276 let total_spent = payment_details
277 .amount_msat
278 .ok_or(Error::CouldNotGetAmountSpent)?
279 + payment_details.fee_paid_msat.unwrap_or_default();
280 Amount::new(total_spent, CurrencyUnit::Msat).convert_to(unit)?
281 } else {
282 Amount::new(0, unit.clone())
283 };
284
285 Ok(MakePaymentResponse {
286 payment_lookup_id,
287 payment_proof,
288 status,
289 total_spent,
290 })
291 }
292
293 pub fn start_ldk_node(&self) -> Result<(), Error> {
304 tracing::info!("Starting cdk-ldk node");
305 self.inner.start()?;
306 let node_config = self.inner.config();
307
308 tracing::info!("Starting node with network {}", node_config.network);
309
310 tracing::info!("Node status: {:?}", self.inner.status());
311
312 self.handle_events()?;
313
314 Ok(())
315 }
316
317 pub fn start_web_server(&self, web_addr: SocketAddr) -> Result<(), Error> {
332 let web_server = crate::web::WebServer::new(Arc::new(self.clone()));
333
334 tokio::spawn(async move {
335 if let Err(e) = web_server.serve(web_addr).await {
336 tracing::error!("Web server error: {}", e);
337 }
338 });
339
340 Ok(())
341 }
342
343 pub fn stop_ldk_node(&self) -> Result<(), Error> {
357 tracing::info!("Stopping CdkLdkNode");
358 tracing::info!("Cancelling event handler");
360 self.events_cancel_token.cancel();
361
362 if self.is_payment_event_stream_active() {
364 tracing::info!("Cancelling payment event stream");
365 self.wait_invoice_cancel_token.cancel();
366 }
367
368 tracing::info!("Stopping LDK node");
370 self.inner.stop()?;
371 tracing::info!("CdkLdkNode stopped successfully");
372 Ok(())
373 }
374
375 async fn handle_payment_received(
377 node: &Arc<Node>,
378 sender: &tokio::sync::broadcast::Sender<WaitPaymentResponse>,
379 payment_id: Option<PaymentId>,
380 payment_hash: PaymentHash,
381 amount_msat: u64,
382 ) {
383 tracing::info!(
384 "Received payment for hash={} of amount={} msat",
385 payment_hash,
386 amount_msat
387 );
388
389 let payment_id = match payment_id {
390 Some(id) => id,
391 None => {
392 tracing::warn!("Received payment without payment_id");
393 return;
394 }
395 };
396
397 let payment_id_hex = hex::encode(payment_id.0);
398
399 if amount_msat == 0 {
400 tracing::warn!("Payment of no amount");
401 return;
402 }
403
404 tracing::info!(
405 "Processing payment notification: id={}, amount={} msats",
406 payment_id_hex,
407 amount_msat
408 );
409
410 let payment_details = match node.payment(&payment_id) {
411 Some(details) => details,
412 None => {
413 tracing::error!("Could not find payment details for id={}", payment_id_hex);
414 return;
415 }
416 };
417
418 let (payment_identifier, payment_id) = match payment_details.kind {
419 PaymentKind::Bolt11 { hash, .. } => {
420 (PaymentIdentifier::PaymentHash(hash.0), hash.to_string())
421 }
422 PaymentKind::Bolt12Offer { hash, offer_id, .. } => match hash {
423 Some(h) => (
424 PaymentIdentifier::OfferId(offer_id.to_string()),
425 h.to_string(),
426 ),
427 None => {
428 tracing::error!("Bolt12 payment missing hash");
429 return;
430 }
431 },
432 k => {
433 tracing::warn!("Received payment of kind {:?} which is not supported", k);
434 return;
435 }
436 };
437
438 let wait_payment_response = WaitPaymentResponse {
439 payment_identifier,
440 payment_amount: Amount::new(amount_msat, CurrencyUnit::Msat),
441 payment_id,
442 };
443
444 match sender.send(wait_payment_response) {
445 Ok(_) => tracing::info!("Successfully sent payment notification to stream"),
446 Err(err) => tracing::error!(
447 "Could not send payment received notification on channel: {}",
448 err
449 ),
450 }
451 }
452
453 pub fn handle_events(&self) -> Result<(), Error> {
455 let node = self.inner.clone();
456 let sender = self.sender.clone();
457 let cancel_token = self.events_cancel_token.clone();
458
459 tracing::info!("Starting event handler task");
460
461 tokio::spawn(async move {
462 tracing::info!("Event handler loop started");
463 loop {
464 tokio::select! {
465 _ = cancel_token.cancelled() => {
466 tracing::info!("Event handler cancelled");
467 break;
468 }
469 event = node.next_event_async() => {
470 match event {
471 Event::PaymentReceived {
472 payment_id,
473 payment_hash,
474 amount_msat,
475 custom_records: _
476 } => {
477 Self::handle_payment_received(
478 &node,
479 &sender,
480 payment_id,
481 payment_hash,
482 amount_msat
483 ).await;
484 }
485 Event::PaymentFailed {
486 payment_id,
487 payment_hash,
488 reason,
489 } => {
490 tracing::error!(
491 payment_id = ?payment_id,
492 payment_hash = ?payment_hash,
493 reason = ?reason,
494 "LDK node payment failed"
495 );
496 }
497 event => {
498 tracing::debug!("Received other ldk node event: {:?}", event);
499 }
500 }
501
502 if let Err(err) = node.event_handled() {
503 tracing::error!("Error handling node event: {}", err);
504 } else {
505 tracing::debug!("Successfully handled node event");
506 }
507 }
508 }
509 }
510 tracing::info!("Event handler loop terminated");
511 });
512
513 tracing::info!("Event handler task spawned");
514 Ok(())
515 }
516
517 pub fn node(&self) -> Arc<Node> {
519 Arc::clone(&self.inner)
520 }
521}
522
523#[async_trait]
525impl MintPayment for CdkLdkNode {
526 type Err = payment::Error;
527
528 async fn start(&self) -> Result<(), Self::Err> {
531 self.start_ldk_node().map_err(|e| {
532 tracing::error!("Failed to start CdkLdkNode: {}", e);
533 e
534 })?;
535
536 tracing::info!("CdkLdkNode payment processor started successfully");
537
538 if let Some(web_addr) = self.web_addr {
540 tracing::info!("Starting LDK Node web interface on {}", web_addr);
541 self.start_web_server(web_addr).map_err(|e| {
542 tracing::error!("Failed to start web server: {}", e);
543 e
544 })?;
545 } else {
546 tracing::info!("No web server address configured, skipping web interface");
547 }
548
549 Ok(())
550 }
551
552 async fn stop(&self) -> Result<(), Self::Err> {
555 self.stop_ldk_node().map_err(|e| {
556 tracing::error!("Failed to stop CdkLdkNode: {}", e);
557 e.into()
558 })
559 }
560
561 async fn get_settings(&self) -> Result<SettingsResponse, Self::Err> {
563 let settings = SettingsResponse {
564 unit: CurrencyUnit::Msat.to_string(),
565 bolt11: Some(payment::Bolt11Settings {
566 mpp: false,
567 amountless: true,
568 invoice_description: true,
569 }),
570 bolt12: Some(payment::Bolt12Settings { amountless: true }),
571 onchain: None,
572 custom: std::collections::HashMap::new(),
573 };
574 Ok(settings)
575 }
576
577 #[instrument(skip(self))]
579 async fn create_incoming_payment_request(
580 &self,
581 options: IncomingPaymentOptions,
582 ) -> Result<CreateIncomingPaymentResponse, Self::Err> {
583 match options {
584 IncomingPaymentOptions::Bolt11(bolt11_options) => {
585 let amount_msat: Amount = bolt11_options
586 .amount
587 .convert_to(&CurrencyUnit::Msat)?
588 .into();
589 let description = bolt11_options.description.unwrap_or_default();
590 let time = match bolt11_options.unix_expiry {
591 Some(t) => t
592 .checked_sub(unix_time())
593 .ok_or(payment::Error::InvalidExpiry)?,
594 None => 36000,
595 };
596
597 let description = Bolt11InvoiceDescription::Direct(
598 Description::new(description).map_err(|_| Error::InvalidDescription)?,
599 );
600
601 let payment = self
602 .inner
603 .bolt11_payment()
604 .receive(amount_msat.into(), &description, time as u32)
605 .map_err(Error::LdkNode)?;
606
607 let payment_hash = payment.payment_hash().to_string();
608 let payment_identifier = PaymentIdentifier::PaymentHash(
609 hex::decode(&payment_hash)?
610 .try_into()
611 .map_err(|_| Error::InvalidPaymentHashLength)?,
612 );
613
614 Ok(CreateIncomingPaymentResponse {
615 request_lookup_id: payment_identifier,
616 request: payment.to_string(),
617 expiry: Some(unix_time() + time),
618 extra_json: None,
619 })
620 }
621 IncomingPaymentOptions::Bolt12(bolt12_options) => {
622 let Bolt12IncomingPaymentOptions {
623 description,
624 amount,
625 unix_expiry,
626 } = *bolt12_options;
627
628 let time = unix_expiry
629 .map(|t| {
630 t.checked_sub(unix_time())
631 .ok_or(payment::Error::InvalidExpiry)
632 .map(|t| t as u32)
633 })
634 .transpose()?;
635
636 let offer = match amount {
637 Some(amount) => {
638 let amount_msat: Amount = amount.convert_to(&CurrencyUnit::Msat)?.into();
639
640 self.inner
641 .bolt12_payment()
642 .receive(
643 amount_msat.into(),
644 &description.unwrap_or("".to_string()),
645 time,
646 None,
647 )
648 .map_err(Error::LdkNode)?
649 }
650 None => self
651 .inner
652 .bolt12_payment()
653 .receive_variable_amount(&description.unwrap_or("".to_string()), time)
654 .map_err(Error::LdkNode)?,
655 };
656 let payment_identifier = PaymentIdentifier::OfferId(offer.id().to_string());
657
658 Ok(CreateIncomingPaymentResponse {
659 request_lookup_id: payment_identifier,
660 request: offer.to_string(),
661 expiry: unix_expiry,
662 extra_json: None,
663 })
664 }
665 IncomingPaymentOptions::Custom(_) | IncomingPaymentOptions::Onchain(_) => {
666 Err(cdk_common::payment::Error::UnsupportedPaymentOption)
667 }
668 }
669 }
670
671 #[instrument(skip_all)]
674 async fn get_payment_quote(
675 &self,
676 unit: &CurrencyUnit,
677 options: OutgoingPaymentOptions,
678 ) -> Result<PaymentQuoteResponse, Self::Err> {
679 match options {
680 cdk_common::payment::OutgoingPaymentOptions::Custom(_) => {
681 Err(cdk_common::payment::Error::UnsupportedPaymentOption)
682 }
683 OutgoingPaymentOptions::Bolt11(bolt11_options) => {
684 let bolt11 = bolt11_options.bolt11;
685
686 let amount_msat = match bolt11_options.melt_options {
687 Some(MeltOptions::Amountless { amountless }) => {
688 let amount_msat = amountless.amount_msat;
689
690 if let Some(invoice_amount) = bolt11.amount_milli_satoshis() {
691 if invoice_amount != u64::from(amount_msat) {
692 return Err(payment::Error::AmountMismatch);
693 }
694 }
695
696 amount_msat
697 }
698 Some(MeltOptions::Mpp { mpp }) => mpp.amount,
699 None => bolt11
700 .amount_milli_satoshis()
701 .ok_or(Error::UnknownInvoiceAmount)?
702 .into(),
703 };
704
705 let amount =
706 Amount::new(amount_msat.into(), CurrencyUnit::Msat).convert_to(unit)?;
707
708 let relative_fee_reserve =
709 (self.fee_reserve.percent_fee_reserve * amount.value() as f32) as u64;
710
711 let absolute_fee_reserve: u64 = self.fee_reserve.min_fee_reserve.into();
712
713 let fee = match relative_fee_reserve > absolute_fee_reserve {
714 true => relative_fee_reserve,
715 false => absolute_fee_reserve,
716 };
717
718 let payment_hash = bolt11.payment_hash().to_string();
719 let payment_hash_bytes = hex::decode(&payment_hash)?
720 .try_into()
721 .map_err(|_| Error::InvalidPaymentHashLength)?;
722
723 Ok(PaymentQuoteResponse {
724 request_lookup_id: Some(PaymentIdentifier::PaymentHash(payment_hash_bytes)),
725 amount,
726 fee: Amount::new(fee, unit.clone()),
727 state: MeltQuoteState::Unpaid,
728 extra_json: None,
729 estimated_blocks: None,
730 fee_options: None,
731 })
732 }
733 OutgoingPaymentOptions::Bolt12(bolt12_options) => {
734 let offer = bolt12_options.offer;
735
736 let amount_msat = match bolt12_options.melt_options {
737 Some(melt_options) => melt_options.amount_msat(),
738 None => {
739 let amount = offer.amount().ok_or(payment::Error::AmountMismatch)?;
740
741 match amount {
742 ldk_node::lightning::offers::offer::Amount::Bitcoin {
743 amount_msats,
744 } => amount_msats.into(),
745 _ => return Err(payment::Error::AmountMismatch),
746 }
747 }
748 };
749 let amount =
750 Amount::new(amount_msat.into(), CurrencyUnit::Msat).convert_to(unit)?;
751
752 let relative_fee_reserve =
753 (self.fee_reserve.percent_fee_reserve * amount.value() as f32) as u64;
754
755 let absolute_fee_reserve: u64 = self.fee_reserve.min_fee_reserve.into();
756
757 let fee = match relative_fee_reserve > absolute_fee_reserve {
758 true => relative_fee_reserve,
759 false => absolute_fee_reserve,
760 };
761
762 Ok(PaymentQuoteResponse {
763 request_lookup_id: None,
764 amount,
765 fee: Amount::new(fee, unit.clone()),
766 state: MeltQuoteState::Unpaid,
767 extra_json: None,
768 estimated_blocks: None,
769 fee_options: None,
770 })
771 }
772 OutgoingPaymentOptions::Onchain(_) => {
773 Err(cdk_common::payment::Error::UnsupportedPaymentOption)
774 }
775 }
776 }
777
778 #[instrument(skip(self, options))]
780 async fn make_payment(
781 &self,
782 unit: &CurrencyUnit,
783 options: OutgoingPaymentOptions,
784 ) -> Result<MakePaymentResponse, Self::Err> {
785 match options {
786 cdk_common::payment::OutgoingPaymentOptions::Custom(_) => {
787 Err(cdk_common::payment::Error::UnsupportedPaymentOption)
788 }
789 OutgoingPaymentOptions::Bolt11(bolt11_options) => {
790 let bolt11 = bolt11_options.bolt11;
791
792 let send_params = match bolt11_options
793 .max_fee_amount
794 .map(|f| {
795 f.convert_to(&CurrencyUnit::Msat)
796 .map(|amount_msat| RouteParametersConfig {
797 max_total_routing_fee_msat: Some(amount_msat.value()),
798 ..Default::default()
799 })
800 })
801 .transpose()
802 {
803 Ok(params) => params,
804 Err(err) => {
805 tracing::error!("Failed to convert fee amount: {}", err);
806 return Err(payment::Error::Custom(format!("Invalid fee amount: {err}")));
807 }
808 };
809
810 let payment_id = match bolt11_options.melt_options {
811 Some(MeltOptions::Amountless { amountless }) => {
812 if let Some(invoice_amount) = bolt11.amount_milli_satoshis() {
813 if invoice_amount != u64::from(amountless.amount_msat) {
814 return Err(payment::Error::AmountMismatch);
815 }
816 }
817
818 self.inner
819 .bolt11_payment()
820 .send_using_amount(&bolt11, amountless.amount_msat.into(), send_params)
821 .map_err(|err| {
822 tracing::error!("Could not send send amountless bolt11: {}", err);
823 Error::CouldNotSendBolt11WithoutAmount
824 })?
825 }
826 None => self
827 .inner
828 .bolt11_payment()
829 .send(&bolt11, send_params)
830 .map_err(|err| {
831 tracing::error!("Could not send bolt11 {}", err);
832 Error::CouldNotSendBolt11
833 })?,
834 _ => return Err(payment::Error::UnsupportedPaymentOption),
835 };
836
837 let start = std::time::Instant::now();
839 let timeout = std::time::Duration::from_secs(10);
840
841 let payment_details = loop {
842 let details = self
843 .inner
844 .payment(&payment_id)
845 .ok_or(Error::PaymentNotFound)?;
846
847 match details.status {
848 PaymentStatus::Succeeded => break details,
849 PaymentStatus::Failed => {
850 tracing::error!("Failed to pay bolt11 payment.");
851 break details;
852 }
853 PaymentStatus::Pending => {
854 if start.elapsed() > timeout {
855 tracing::warn!(
856 "Paying bolt11 exceeded timeout 10 seconds no longer waitning."
857 );
858 break details;
859 }
860 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
861 continue;
862 }
863 }
864 };
865
866 Self::make_payment_response_from_details(
867 unit,
868 PaymentIdentifier::PaymentHash(bolt11.payment_hash().to_byte_array()),
869 &payment_details,
870 )
871 }
872 OutgoingPaymentOptions::Bolt12(bolt12_options) => {
873 let offer = bolt12_options.offer;
874
875 let send_params = match bolt12_options
876 .max_fee_amount
877 .map(|f| {
878 f.convert_to(&CurrencyUnit::Msat)
879 .map(|amount_msat| RouteParametersConfig {
880 max_total_routing_fee_msat: Some(amount_msat.value()),
881 ..Default::default()
882 })
883 })
884 .transpose()
885 {
886 Ok(params) => params,
887 Err(err) => {
888 tracing::error!("Failed to convert fee amount: {}", err);
889 return Err(payment::Error::Custom(format!("Invalid fee amount: {err}")));
890 }
891 };
892
893 let payment_id = match bolt12_options.melt_options {
894 Some(MeltOptions::Amountless { amountless }) => self
895 .inner
896 .bolt12_payment()
897 .send_using_amount(
898 &offer,
899 amountless.amount_msat.into(),
900 None,
901 None,
902 send_params,
903 )
904 .map_err(Error::LdkNode)?,
905 None => self
906 .inner
907 .bolt12_payment()
908 .send(&offer, None, None, send_params)
909 .map_err(Error::LdkNode)?,
910 _ => return Err(payment::Error::UnsupportedPaymentOption),
911 };
912
913 let start = std::time::Instant::now();
915 let timeout = std::time::Duration::from_secs(10);
916
917 let payment_details = loop {
918 let details = self
919 .inner
920 .payment(&payment_id)
921 .ok_or(Error::PaymentNotFound)?;
922
923 match details.status {
924 PaymentStatus::Succeeded => break details,
925 PaymentStatus::Failed => {
926 tracing::error!(
927 payment_id = %payment_id,
928 amount_msat = ?details.amount_msat,
929 fee_paid_msat = ?details.fee_paid_msat,
930 payment_kind = ?details.kind,
931 "Bolt12 payment failed"
932 );
933 break details;
934 }
935 PaymentStatus::Pending => {
936 if start.elapsed() > timeout {
937 tracing::warn!(
938 "Payment has been being for 10 seconds. No longer waiting"
939 );
940 break details;
941 }
942 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
943 continue;
944 }
945 }
946 };
947
948 Self::make_payment_response_from_details(
949 unit,
950 PaymentIdentifier::PaymentId(payment_id.0),
951 &payment_details,
952 )
953 }
954 OutgoingPaymentOptions::Onchain(_) => {
955 Err(cdk_common::payment::Error::UnsupportedPaymentOption)
956 }
957 }
958 }
959
960 #[instrument(skip(self))]
963 async fn wait_payment_event(
964 &self,
965 ) -> Result<Pin<Box<dyn Stream<Item = cdk_common::payment::Event> + Send>>, Self::Err> {
966 tracing::info!("Starting stream for invoices - wait_any_incoming_payment called");
967
968 self.wait_invoice_is_active.store(true, Ordering::SeqCst);
970 tracing::debug!("wait_invoice_is_active set to true");
971
972 let receiver = self.receiver.clone();
973
974 tracing::info!("Receiver obtained successfully, creating response stream");
975
976 let response_stream = BroadcastStream::new(receiver.resubscribe());
978
979 let response_stream = response_stream.filter_map(|result| async move {
981 match result {
982 Ok(payment) => Some(cdk_common::payment::Event::PaymentReceived(payment)),
983 Err(err) => {
984 tracing::warn!("Error in broadcast stream: {}", err);
985 None
986 }
987 }
988 });
989
990 let cancel_token = self.wait_invoice_cancel_token.clone();
992 let is_active = self.wait_invoice_is_active.clone();
993
994 let stream = Box::pin(response_stream);
995
996 tokio::spawn(async move {
998 cancel_token.cancelled().await;
999 tracing::info!("wait_invoice stream cancelled");
1000 is_active.store(false, Ordering::SeqCst);
1001 });
1002
1003 tracing::info!("wait_any_incoming_payment returning stream");
1004 Ok(stream)
1005 }
1006
1007 fn is_payment_event_stream_active(&self) -> bool {
1009 self.wait_invoice_is_active.load(Ordering::SeqCst)
1010 }
1011
1012 fn cancel_payment_event_stream(&self) {
1014 self.wait_invoice_cancel_token.cancel()
1015 }
1016
1017 async fn check_incoming_payment_status(
1019 &self,
1020 payment_identifier: &PaymentIdentifier,
1021 ) -> Result<Vec<WaitPaymentResponse>, Self::Err> {
1022 if let PaymentIdentifier::OfferId(offer_id) = payment_identifier {
1025 let payments = self.inner.list_payments_with_filter(|p| {
1026 p.direction == PaymentDirection::Inbound
1027 && p.status == PaymentStatus::Succeeded
1028 && matches!(
1029 &p.kind,
1030 PaymentKind::Bolt12Offer { offer_id: oid, .. } if oid.to_string() == *offer_id
1031 )
1032 });
1033
1034 return Ok(payments
1035 .into_iter()
1036 .filter_map(|p| {
1037 let payment_id = match &p.kind {
1038 PaymentKind::Bolt12Offer {
1039 hash: Some(hash), ..
1040 } => hash.to_string(),
1041 _ => {
1042 tracing::warn!("Bolt12 payment for offer {} missing hash", offer_id);
1043 return None;
1044 }
1045 };
1046
1047 Some(WaitPaymentResponse {
1048 payment_identifier: payment_identifier.clone(),
1049 payment_amount: Amount::new(p.amount_msat?, CurrencyUnit::Msat),
1050 payment_id,
1051 })
1052 })
1053 .collect());
1054 }
1055
1056 let payment_id_str = match payment_identifier {
1057 PaymentIdentifier::PaymentHash(hash) => hex::encode(hash),
1058 PaymentIdentifier::CustomId(id) => id.clone(),
1059 _ => return Err(Error::UnsupportedPaymentIdentifierType.into()),
1060 };
1061
1062 let payment_id = PaymentId(
1063 hex::decode(&payment_id_str)?
1064 .try_into()
1065 .map_err(|_| Error::InvalidPaymentIdLength)?,
1066 );
1067
1068 let payment_details = self
1069 .inner
1070 .payment(&payment_id)
1071 .ok_or(Error::PaymentNotFound)?;
1072
1073 if payment_details.direction == PaymentDirection::Outbound {
1074 return Err(Error::InvalidPaymentDirection.into());
1075 }
1076
1077 let amount = if payment_details.status == PaymentStatus::Succeeded {
1078 payment_details
1079 .amount_msat
1080 .ok_or(Error::CouldNotGetPaymentAmount)?
1081 } else {
1082 return Ok(vec![]);
1083 };
1084
1085 let response = WaitPaymentResponse {
1086 payment_identifier: payment_identifier.clone(),
1087 payment_amount: Amount::new(amount, CurrencyUnit::Msat),
1088 payment_id: payment_id_str,
1089 };
1090
1091 Ok(vec![response])
1092 }
1093
1094 async fn check_outgoing_payment(
1096 &self,
1097 request_lookup_id: &PaymentIdentifier,
1098 ) -> Result<MakePaymentResponse, Self::Err> {
1099 let payment_details = match request_lookup_id {
1100 PaymentIdentifier::PaymentHash(id_hash) => self
1101 .inner
1102 .list_payments_with_filter(
1103 |p| matches!(&p.kind, PaymentKind::Bolt11 { hash, .. } if &hash.0 == id_hash),
1104 )
1105 .first()
1106 .cloned(),
1107 PaymentIdentifier::PaymentId(id) => self.inner.payment(&PaymentId(*id)),
1108 _ => {
1109 return Ok(MakePaymentResponse {
1110 payment_lookup_id: request_lookup_id.clone(),
1111 payment_proof: None,
1112 status: MeltQuoteState::Unknown,
1113 total_spent: Amount::new(0, CurrencyUnit::Msat),
1114 });
1115 }
1116 }
1117 .ok_or(Error::PaymentNotFound)?;
1118
1119 if payment_details.direction != PaymentDirection::Outbound {
1120 return Err(Error::InvalidPaymentDirection.into());
1121 }
1122
1123 Self::make_payment_response_from_details(
1124 &CurrencyUnit::Msat,
1125 request_lookup_id.clone(),
1126 &payment_details,
1127 )
1128 }
1129}
1130
1131impl Drop for CdkLdkNode {
1132 fn drop(&mut self) {
1133 tracing::info!("Drop called on CdkLdkNode");
1134 self.wait_invoice_cancel_token.cancel();
1135 tracing::debug!("Cancelled wait_invoice token in drop");
1136 }
1137}
1138
1139#[cfg(test)]
1140mod tests {
1141 use super::*;
1142
1143 fn test_payment_details(status: PaymentStatus, amount_msat: Option<u64>) -> PaymentDetails {
1144 PaymentDetails {
1145 id: PaymentId([2; 32]),
1146 kind: PaymentKind::Bolt11 {
1147 hash: PaymentHash([1; 32]),
1148 preimage: None,
1149 secret: None,
1150 },
1151 amount_msat,
1152 fee_paid_msat: None,
1153 direction: PaymentDirection::Outbound,
1154 status,
1155 latest_update_timestamp: 0,
1156 }
1157 }
1158
1159 #[test]
1160 fn failed_payment_response_does_not_require_amount() {
1161 let details = test_payment_details(PaymentStatus::Failed, None);
1162
1163 let response = CdkLdkNode::make_payment_response_from_details(
1164 &CurrencyUnit::Msat,
1165 PaymentIdentifier::PaymentId([2; 32]),
1166 &details,
1167 )
1168 .expect("failed payment details should map without amount");
1169
1170 assert_eq!(response.status, MeltQuoteState::Failed);
1171 assert_eq!(response.total_spent, Amount::new(0, CurrencyUnit::Msat));
1172 }
1173
1174 #[test]
1175 fn pending_payment_response_does_not_require_amount() {
1176 let details = test_payment_details(PaymentStatus::Pending, None);
1177
1178 let response = CdkLdkNode::make_payment_response_from_details(
1179 &CurrencyUnit::Msat,
1180 PaymentIdentifier::PaymentId([2; 32]),
1181 &details,
1182 )
1183 .expect("pending payment details should map without amount");
1184
1185 assert_eq!(response.status, MeltQuoteState::Pending);
1186 assert_eq!(response.total_spent, Amount::new(0, CurrencyUnit::Msat));
1187 }
1188
1189 #[test]
1190 fn paid_payment_response_requires_amount() {
1191 let details = test_payment_details(PaymentStatus::Succeeded, None);
1192
1193 let err = CdkLdkNode::make_payment_response_from_details(
1194 &CurrencyUnit::Msat,
1195 PaymentIdentifier::PaymentId([2; 32]),
1196 &details,
1197 )
1198 .expect_err("paid payment details without amount should fail");
1199
1200 assert!(matches!(err, payment::Error::Lightning(_)));
1201 }
1202}