Skip to main content

cdk_ldk_node/
lib.rs

1//! CDK lightning backend for ldk-node
2
3#![doc = include_str!("../README.md")]
4
5use std::net::SocketAddr;
6use std::pin::Pin;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use bip39::Mnemonic;
12use cdk_common::common::FeeReserve;
13use cdk_common::payment::{self, *};
14use cdk_common::util::{hex, unix_time};
15use cdk_common::{Amount, CurrencyUnit, MeltOptions, MeltQuoteState};
16use futures::{Stream, StreamExt};
17use ldk_node::bitcoin::hashes::Hash;
18use ldk_node::bitcoin::Network;
19use ldk_node::lightning::ln::channelmanager::PaymentId;
20use ldk_node::lightning::ln::msgs::SocketAddress;
21use ldk_node::lightning::routing::router::RouteParametersConfig;
22use ldk_node::lightning_invoice::{Bolt11InvoiceDescription, Description};
23use ldk_node::lightning_types::payment::PaymentHash;
24use ldk_node::logger::{LogLevel, LogWriter};
25use ldk_node::payment::{PaymentDirection, PaymentKind, PaymentStatus};
26use ldk_node::{Builder, Event, Node};
27use tokio_stream::wrappers::BroadcastStream;
28use tokio_util::sync::CancellationToken;
29use tracing::instrument;
30
31use crate::error::Error;
32use crate::log::StdoutLogWriter;
33
34mod error;
35mod log;
36mod web;
37
38/// CDK Lightning backend using LDK Node
39///
40/// Provides Lightning Network functionality for CDK with support for Cashu operations.
41/// Handles payment creation, processing, and event management using the Lightning Development Kit.
42#[derive(Clone)]
43pub struct CdkLdkNode {
44    inner: Arc<Node>,
45    fee_reserve: FeeReserve,
46    wait_invoice_cancel_token: CancellationToken,
47    wait_invoice_is_active: Arc<AtomicBool>,
48    sender: tokio::sync::broadcast::Sender<WaitPaymentResponse>,
49    receiver: Arc<tokio::sync::broadcast::Receiver<WaitPaymentResponse>>,
50    events_cancel_token: CancellationToken,
51    web_addr: Option<SocketAddr>,
52}
53
54impl std::fmt::Debug for CdkLdkNode {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        f.debug_struct("CdkLdkNode")
57            .field("fee_reserve", &self.fee_reserve)
58            .field("web_addr", &self.web_addr)
59            .finish_non_exhaustive()
60    }
61}
62
63/// Configuration for connecting to Bitcoin RPC
64///
65/// Contains the necessary connection parameters for Bitcoin Core RPC interface.
66#[derive(Debug, Clone)]
67pub struct BitcoinRpcConfig {
68    /// Bitcoin RPC server hostname or IP address
69    pub host: String,
70    /// Bitcoin RPC server port number
71    pub port: u16,
72    /// Username for Bitcoin RPC authentication
73    pub user: String,
74    /// Password for Bitcoin RPC authentication
75    pub password: String,
76}
77
78/// Source of blockchain data for the Lightning node
79///
80/// Specifies how the node should connect to the Bitcoin network to retrieve
81/// blockchain information and broadcast transactions.
82#[derive(Debug, Clone)]
83pub enum ChainSource {
84    /// Use an Esplora server for blockchain data
85    ///
86    /// Contains the URL of the Esplora server endpoint
87    Esplora(String),
88    /// Use Bitcoin Core RPC for blockchain data
89    ///
90    /// Contains the configuration for connecting to Bitcoin Core
91    BitcoinRpc(BitcoinRpcConfig),
92}
93
94/// Source of Lightning network gossip data
95///
96/// Specifies how the node should learn about the Lightning Network topology
97/// and routing information.
98#[derive(Debug, Clone)]
99pub enum GossipSource {
100    /// Learn gossip through peer-to-peer connections
101    ///
102    /// The node will connect to other Lightning nodes and exchange gossip data directly
103    P2P,
104    /// Use Rapid Gossip Sync for efficient gossip updates
105    ///
106    /// Contains the URL of the RGS server for compressed gossip data
107    RapidGossipSync(String),
108}
109/// A builder for an [`CdkLdkNode`] instance.
110#[derive(Debug)]
111pub struct CdkLdkNodeBuilder {
112    network: Network,
113    chain_source: ChainSource,
114    gossip_source: GossipSource,
115    log_dir_path: Option<String>,
116    storage_dir_path: String,
117    fee_reserve: FeeReserve,
118    listening_addresses: Vec<SocketAddress>,
119    seed: Option<Mnemonic>,
120    announcement_addresses: Option<Vec<SocketAddress>>,
121}
122
123impl CdkLdkNodeBuilder {
124    /// Creates a new builder instance.
125    pub fn new(
126        network: Network,
127        chain_source: ChainSource,
128        gossip_source: GossipSource,
129        storage_dir_path: String,
130        fee_reserve: FeeReserve,
131        listening_addresses: Vec<SocketAddress>,
132    ) -> Self {
133        Self {
134            network,
135            chain_source,
136            gossip_source,
137            storage_dir_path,
138            fee_reserve,
139            listening_addresses,
140            seed: None,
141            announcement_addresses: None,
142            log_dir_path: None,
143        }
144    }
145
146    /// Configures the [`CdkLdkNode`] to use the Mnemonic for entropy source configuration
147    pub fn with_seed(mut self, seed: Mnemonic) -> Self {
148        self.seed = Some(seed);
149        self
150    }
151    /// Configures the [`CdkLdkNode`] to use announce this address to the lightning network
152    pub fn with_announcement_address(mut self, announcement_addresses: Vec<SocketAddress>) -> Self {
153        self.announcement_addresses = Some(announcement_addresses);
154        self
155    }
156    /// Configures the [`CdkLdkNode`] to use announce this address to the lightning network
157    pub fn with_log_dir_path(mut self, log_dir_path: String) -> Self {
158        self.log_dir_path = Some(log_dir_path);
159        self
160    }
161
162    /// Builds the [`CdkLdkNode`] instance
163    ///
164    /// # Errors
165    /// Returns an error if the LDK node builder fails to create the node
166    pub fn build(self) -> Result<CdkLdkNode, Error> {
167        let mut ldk = Builder::new();
168        ldk.set_network(self.network);
169        tracing::info!("Storage dir of node is {}", self.storage_dir_path);
170        ldk.set_storage_dir_path(self.storage_dir_path);
171
172        match self.chain_source {
173            ChainSource::Esplora(esplora_url) => {
174                ldk.set_chain_source_esplora(esplora_url, None);
175            }
176            ChainSource::BitcoinRpc(BitcoinRpcConfig {
177                host,
178                port,
179                user,
180                password,
181            }) => {
182                ldk.set_chain_source_bitcoind_rpc(host, port, user, password);
183            }
184        }
185
186        match self.gossip_source {
187            GossipSource::P2P => {
188                ldk.set_gossip_source_p2p();
189            }
190            GossipSource::RapidGossipSync(rgs_url) => {
191                ldk.set_gossip_source_rgs(rgs_url);
192            }
193        }
194
195        ldk.set_listening_addresses(self.listening_addresses)?;
196        if self.log_dir_path.is_some() {
197            ldk.set_filesystem_logger(self.log_dir_path, Some(LogLevel::Info));
198        } else {
199            ldk.set_custom_logger(Arc::new(StdoutLogWriter));
200        }
201
202        ldk.set_node_alias("cdk-ldk-node".to_string())?;
203        // set the seed as bip39 entropy mnemonic
204        if let Some(seed) = self.seed {
205            ldk.set_entropy_bip39_mnemonic(seed, None);
206        }
207        // set the announcement addresses
208        if let Some(announcement_addresses) = self.announcement_addresses {
209            ldk.set_announcement_addresses(announcement_addresses)?;
210        }
211
212        let node = ldk.build()?;
213
214        tracing::info!("Creating tokio channel for payment notifications");
215        let (sender, receiver) = tokio::sync::broadcast::channel(8);
216
217        let id = node.node_id();
218
219        let adr = node.announcement_addresses();
220
221        tracing::info!(
222            "Created node {} with address {:?} on network {}",
223            id,
224            adr,
225            self.network
226        );
227
228        Ok(CdkLdkNode {
229            inner: node.into(),
230            fee_reserve: self.fee_reserve,
231            wait_invoice_cancel_token: CancellationToken::new(),
232            wait_invoice_is_active: Arc::new(AtomicBool::new(false)),
233            sender,
234            receiver: Arc::new(receiver),
235            events_cancel_token: CancellationToken::new(),
236            web_addr: None,
237        })
238    }
239}
240
241impl CdkLdkNode {
242    /// Set the web server address for the LDK node management interface
243    ///
244    /// # Arguments
245    /// * `addr` - Socket address for the web server. If None, no web server will be started.
246    pub fn set_web_addr(&mut self, addr: Option<SocketAddr>) {
247        self.web_addr = addr;
248    }
249
250    /// Get a default web server address using an unused port
251    ///
252    /// Returns a SocketAddr with localhost and port 0, which will cause
253    /// the system to automatically assign an available port
254    pub fn default_web_addr() -> SocketAddr {
255        SocketAddr::from(([127, 0, 0, 1], 8091))
256    }
257
258    /// Start the CDK LDK Node
259    ///
260    /// Starts the underlying LDK node and begins event processing.
261    /// Sets up event handlers to listen for Lightning events like payment received.
262    ///
263    /// # Returns
264    /// Returns `Ok(())` on successful start, error otherwise
265    ///
266    /// # Errors
267    /// Returns an error if the LDK node fails to start or event handling setup fails
268    pub fn start_ldk_node(&self) -> Result<(), Error> {
269        tracing::info!("Starting cdk-ldk node");
270        self.inner.start()?;
271        let node_config = self.inner.config();
272
273        tracing::info!("Starting node with network {}", node_config.network);
274
275        tracing::info!("Node status: {:?}", self.inner.status());
276
277        self.handle_events()?;
278
279        Ok(())
280    }
281
282    /// Start the web server for the LDK node management interface
283    ///
284    /// Starts a web server that provides a user interface for managing the LDK node.
285    /// The web interface allows users to view balances, manage channels, create invoices,
286    /// and send payments.
287    ///
288    /// # Arguments
289    /// * `web_addr` - The socket address to bind the web server to
290    ///
291    /// # Returns
292    /// Returns `Ok(())` on successful start, error otherwise
293    ///
294    /// # Errors
295    /// Returns an error if the web server fails to start
296    pub fn start_web_server(&self, web_addr: SocketAddr) -> Result<(), Error> {
297        let web_server = crate::web::WebServer::new(Arc::new(self.clone()));
298
299        tokio::spawn(async move {
300            if let Err(e) = web_server.serve(web_addr).await {
301                tracing::error!("Web server error: {}", e);
302            }
303        });
304
305        Ok(())
306    }
307
308    /// Stop the CDK LDK Node
309    ///
310    /// Gracefully stops the node by cancelling all active tasks and event handlers.
311    /// This includes:
312    /// - Cancelling the event handler task
313    /// - Cancelling any active wait_invoice streams
314    /// - Stopping the underlying LDK node
315    ///
316    /// # Returns
317    /// Returns `Ok(())` on successful shutdown, error otherwise
318    ///
319    /// # Errors
320    /// Returns an error if the underlying LDK node fails to stop
321    pub fn stop_ldk_node(&self) -> Result<(), Error> {
322        tracing::info!("Stopping CdkLdkNode");
323        // Cancel all tokio tasks
324        tracing::info!("Cancelling event handler");
325        self.events_cancel_token.cancel();
326
327        // Cancel any payment event streams
328        if self.is_payment_event_stream_active() {
329            tracing::info!("Cancelling payment event stream");
330            self.wait_invoice_cancel_token.cancel();
331        }
332
333        // Stop the LDK node
334        tracing::info!("Stopping LDK node");
335        self.inner.stop()?;
336        tracing::info!("CdkLdkNode stopped successfully");
337        Ok(())
338    }
339
340    /// Handle payment received event
341    async fn handle_payment_received(
342        node: &Arc<Node>,
343        sender: &tokio::sync::broadcast::Sender<WaitPaymentResponse>,
344        payment_id: Option<PaymentId>,
345        payment_hash: PaymentHash,
346        amount_msat: u64,
347    ) {
348        tracing::info!(
349            "Received payment for hash={} of amount={} msat",
350            payment_hash,
351            amount_msat
352        );
353
354        let payment_id = match payment_id {
355            Some(id) => id,
356            None => {
357                tracing::warn!("Received payment without payment_id");
358                return;
359            }
360        };
361
362        let payment_id_hex = hex::encode(payment_id.0);
363
364        if amount_msat == 0 {
365            tracing::warn!("Payment of no amount");
366            return;
367        }
368
369        tracing::info!(
370            "Processing payment notification: id={}, amount={} msats",
371            payment_id_hex,
372            amount_msat
373        );
374
375        let payment_details = match node.payment(&payment_id) {
376            Some(details) => details,
377            None => {
378                tracing::error!("Could not find payment details for id={}", payment_id_hex);
379                return;
380            }
381        };
382
383        let (payment_identifier, payment_id) = match payment_details.kind {
384            PaymentKind::Bolt11 { hash, .. } => {
385                (PaymentIdentifier::PaymentHash(hash.0), hash.to_string())
386            }
387            PaymentKind::Bolt12Offer { hash, offer_id, .. } => match hash {
388                Some(h) => (
389                    PaymentIdentifier::OfferId(offer_id.to_string()),
390                    h.to_string(),
391                ),
392                None => {
393                    tracing::error!("Bolt12 payment missing hash");
394                    return;
395                }
396            },
397            k => {
398                tracing::warn!("Received payment of kind {:?} which is not supported", k);
399                return;
400            }
401        };
402
403        let wait_payment_response = WaitPaymentResponse {
404            payment_identifier,
405            payment_amount: Amount::new(amount_msat, CurrencyUnit::Msat),
406            payment_id,
407        };
408
409        match sender.send(wait_payment_response) {
410            Ok(_) => tracing::info!("Successfully sent payment notification to stream"),
411            Err(err) => tracing::error!(
412                "Could not send payment received notification on channel: {}",
413                err
414            ),
415        }
416    }
417
418    /// Set up event handling for the node
419    pub fn handle_events(&self) -> Result<(), Error> {
420        let node = self.inner.clone();
421        let sender = self.sender.clone();
422        let cancel_token = self.events_cancel_token.clone();
423
424        tracing::info!("Starting event handler task");
425
426        tokio::spawn(async move {
427            tracing::info!("Event handler loop started");
428            loop {
429                tokio::select! {
430                    _ = cancel_token.cancelled() => {
431                        tracing::info!("Event handler cancelled");
432                        break;
433                    }
434                    event = node.next_event_async() => {
435                        match event {
436                            Event::PaymentReceived {
437                                payment_id,
438                                payment_hash,
439                                amount_msat,
440                                custom_records: _
441                            } => {
442                                Self::handle_payment_received(
443                                    &node,
444                                    &sender,
445                                    payment_id,
446                                    payment_hash,
447                                    amount_msat
448                                ).await;
449                            }
450                            event => {
451                                tracing::debug!("Received other ldk node event: {:?}", event);
452                            }
453                        }
454
455                        if let Err(err) = node.event_handled() {
456                            tracing::error!("Error handling node event: {}", err);
457                        } else {
458                            tracing::debug!("Successfully handled node event");
459                        }
460                    }
461                }
462            }
463            tracing::info!("Event handler loop terminated");
464        });
465
466        tracing::info!("Event handler task spawned");
467        Ok(())
468    }
469
470    /// Get Node used
471    pub fn node(&self) -> Arc<Node> {
472        Arc::clone(&self.inner)
473    }
474}
475
476/// Mint payment trait
477#[async_trait]
478impl MintPayment for CdkLdkNode {
479    type Err = payment::Error;
480
481    /// Start the payment processor
482    /// Starts the LDK node and begins event processing
483    async fn start(&self) -> Result<(), Self::Err> {
484        self.start_ldk_node().map_err(|e| {
485            tracing::error!("Failed to start CdkLdkNode: {}", e);
486            e
487        })?;
488
489        tracing::info!("CdkLdkNode payment processor started successfully");
490
491        // Start web server if configured
492        if let Some(web_addr) = self.web_addr {
493            tracing::info!("Starting LDK Node web interface on {}", web_addr);
494            self.start_web_server(web_addr).map_err(|e| {
495                tracing::error!("Failed to start web server: {}", e);
496                e
497            })?;
498        } else {
499            tracing::info!("No web server address configured, skipping web interface");
500        }
501
502        Ok(())
503    }
504
505    /// Stop the payment processor
506    /// Gracefully stops the LDK node and cancels all background tasks
507    async fn stop(&self) -> Result<(), Self::Err> {
508        self.stop_ldk_node().map_err(|e| {
509            tracing::error!("Failed to stop CdkLdkNode: {}", e);
510            e.into()
511        })
512    }
513
514    /// Base Settings
515    async fn get_settings(&self) -> Result<SettingsResponse, Self::Err> {
516        let settings = SettingsResponse {
517            unit: CurrencyUnit::Msat.to_string(),
518            bolt11: Some(payment::Bolt11Settings {
519                mpp: false,
520                amountless: true,
521                invoice_description: true,
522            }),
523            bolt12: Some(payment::Bolt12Settings { amountless: true }),
524            onchain: None,
525            custom: std::collections::HashMap::new(),
526        };
527        Ok(settings)
528    }
529
530    /// Create a new invoice
531    #[instrument(skip(self))]
532    async fn create_incoming_payment_request(
533        &self,
534        options: IncomingPaymentOptions,
535    ) -> Result<CreateIncomingPaymentResponse, Self::Err> {
536        match options {
537            IncomingPaymentOptions::Bolt11(bolt11_options) => {
538                let amount_msat: Amount = bolt11_options
539                    .amount
540                    .convert_to(&CurrencyUnit::Msat)?
541                    .into();
542                let description = bolt11_options.description.unwrap_or_default();
543                let time = match bolt11_options.unix_expiry {
544                    Some(t) => t
545                        .checked_sub(unix_time())
546                        .ok_or(payment::Error::InvalidExpiry)?,
547                    None => 36000,
548                };
549
550                let description = Bolt11InvoiceDescription::Direct(
551                    Description::new(description).map_err(|_| Error::InvalidDescription)?,
552                );
553
554                let payment = self
555                    .inner
556                    .bolt11_payment()
557                    .receive(amount_msat.into(), &description, time as u32)
558                    .map_err(Error::LdkNode)?;
559
560                let payment_hash = payment.payment_hash().to_string();
561                let payment_identifier = PaymentIdentifier::PaymentHash(
562                    hex::decode(&payment_hash)?
563                        .try_into()
564                        .map_err(|_| Error::InvalidPaymentHashLength)?,
565                );
566
567                Ok(CreateIncomingPaymentResponse {
568                    request_lookup_id: payment_identifier,
569                    request: payment.to_string(),
570                    expiry: Some(unix_time() + time),
571                    extra_json: None,
572                })
573            }
574            IncomingPaymentOptions::Bolt12(bolt12_options) => {
575                let Bolt12IncomingPaymentOptions {
576                    description,
577                    amount,
578                    unix_expiry,
579                } = *bolt12_options;
580
581                let time = unix_expiry
582                    .map(|t| {
583                        t.checked_sub(unix_time())
584                            .ok_or(payment::Error::InvalidExpiry)
585                            .map(|t| t as u32)
586                    })
587                    .transpose()?;
588
589                let offer = match amount {
590                    Some(amount) => {
591                        let amount_msat: Amount = amount.convert_to(&CurrencyUnit::Msat)?.into();
592
593                        self.inner
594                            .bolt12_payment()
595                            .receive(
596                                amount_msat.into(),
597                                &description.unwrap_or("".to_string()),
598                                time,
599                                None,
600                            )
601                            .map_err(Error::LdkNode)?
602                    }
603                    None => self
604                        .inner
605                        .bolt12_payment()
606                        .receive_variable_amount(&description.unwrap_or("".to_string()), time)
607                        .map_err(Error::LdkNode)?,
608                };
609                let payment_identifier = PaymentIdentifier::OfferId(offer.id().to_string());
610
611                Ok(CreateIncomingPaymentResponse {
612                    request_lookup_id: payment_identifier,
613                    request: offer.to_string(),
614                    expiry: time.map(|a| a as u64),
615                    extra_json: None,
616                })
617            }
618            IncomingPaymentOptions::Custom(_) | IncomingPaymentOptions::Onchain(_) => {
619                Err(cdk_common::payment::Error::UnsupportedPaymentOption)
620            }
621        }
622    }
623
624    /// Get payment quote
625    /// Used to get fee and amount required for a payment request
626    #[instrument(skip_all)]
627    async fn get_payment_quote(
628        &self,
629        unit: &CurrencyUnit,
630        options: OutgoingPaymentOptions,
631    ) -> Result<PaymentQuoteResponse, Self::Err> {
632        match options {
633            cdk_common::payment::OutgoingPaymentOptions::Custom(_) => {
634                Err(cdk_common::payment::Error::UnsupportedPaymentOption)
635            }
636            OutgoingPaymentOptions::Bolt11(bolt11_options) => {
637                let bolt11 = bolt11_options.bolt11;
638
639                let amount_msat = match bolt11_options.melt_options {
640                    Some(melt_options) => melt_options.amount_msat(),
641                    None => bolt11
642                        .amount_milli_satoshis()
643                        .ok_or(Error::UnknownInvoiceAmount)?
644                        .into(),
645                };
646
647                let amount =
648                    Amount::new(amount_msat.into(), CurrencyUnit::Msat).convert_to(unit)?;
649
650                let relative_fee_reserve =
651                    (self.fee_reserve.percent_fee_reserve * amount.value() as f32) as u64;
652
653                let absolute_fee_reserve: u64 = self.fee_reserve.min_fee_reserve.into();
654
655                let fee = match relative_fee_reserve > absolute_fee_reserve {
656                    true => relative_fee_reserve,
657                    false => absolute_fee_reserve,
658                };
659
660                let payment_hash = bolt11.payment_hash().to_string();
661                let payment_hash_bytes = hex::decode(&payment_hash)?
662                    .try_into()
663                    .map_err(|_| Error::InvalidPaymentHashLength)?;
664
665                Ok(PaymentQuoteResponse {
666                    request_lookup_id: Some(PaymentIdentifier::PaymentHash(payment_hash_bytes)),
667                    amount,
668                    fee: Amount::new(fee, unit.clone()),
669                    state: MeltQuoteState::Unpaid,
670                    extra_json: None,
671                    estimated_blocks: None,
672                    fee_options: None,
673                })
674            }
675            OutgoingPaymentOptions::Bolt12(bolt12_options) => {
676                let offer = bolt12_options.offer;
677
678                let amount_msat = match bolt12_options.melt_options {
679                    Some(melt_options) => melt_options.amount_msat(),
680                    None => {
681                        let amount = offer.amount().ok_or(payment::Error::AmountMismatch)?;
682
683                        match amount {
684                            ldk_node::lightning::offers::offer::Amount::Bitcoin {
685                                amount_msats,
686                            } => amount_msats.into(),
687                            _ => return Err(payment::Error::AmountMismatch),
688                        }
689                    }
690                };
691                let amount =
692                    Amount::new(amount_msat.into(), CurrencyUnit::Msat).convert_to(unit)?;
693
694                let relative_fee_reserve =
695                    (self.fee_reserve.percent_fee_reserve * amount.value() as f32) as u64;
696
697                let absolute_fee_reserve: u64 = self.fee_reserve.min_fee_reserve.into();
698
699                let fee = match relative_fee_reserve > absolute_fee_reserve {
700                    true => relative_fee_reserve,
701                    false => absolute_fee_reserve,
702                };
703
704                Ok(PaymentQuoteResponse {
705                    request_lookup_id: None,
706                    amount,
707                    fee: Amount::new(fee, unit.clone()),
708                    state: MeltQuoteState::Unpaid,
709                    extra_json: None,
710                    estimated_blocks: None,
711                    fee_options: None,
712                })
713            }
714            OutgoingPaymentOptions::Onchain(_) => {
715                Err(cdk_common::payment::Error::UnsupportedPaymentOption)
716            }
717        }
718    }
719
720    /// Pay request
721    #[instrument(skip(self, options))]
722    async fn make_payment(
723        &self,
724        unit: &CurrencyUnit,
725        options: OutgoingPaymentOptions,
726    ) -> Result<MakePaymentResponse, Self::Err> {
727        match options {
728            cdk_common::payment::OutgoingPaymentOptions::Custom(_) => {
729                Err(cdk_common::payment::Error::UnsupportedPaymentOption)
730            }
731            OutgoingPaymentOptions::Bolt11(bolt11_options) => {
732                let bolt11 = bolt11_options.bolt11;
733
734                let send_params = match bolt11_options
735                    .max_fee_amount
736                    .map(|f| {
737                        f.convert_to(&CurrencyUnit::Msat)
738                            .map(|amount_msat| RouteParametersConfig {
739                                max_total_routing_fee_msat: Some(amount_msat.value()),
740                                ..Default::default()
741                            })
742                    })
743                    .transpose()
744                {
745                    Ok(params) => params,
746                    Err(err) => {
747                        tracing::error!("Failed to convert fee amount: {}", err);
748                        return Err(payment::Error::Custom(format!("Invalid fee amount: {err}")));
749                    }
750                };
751
752                let payment_id = match bolt11_options.melt_options {
753                    Some(MeltOptions::Amountless { amountless }) => self
754                        .inner
755                        .bolt11_payment()
756                        .send_using_amount(&bolt11, amountless.amount_msat.into(), send_params)
757                        .map_err(|err| {
758                            tracing::error!("Could not send send amountless bolt11: {}", err);
759                            Error::CouldNotSendBolt11WithoutAmount
760                        })?,
761                    None => self
762                        .inner
763                        .bolt11_payment()
764                        .send(&bolt11, send_params)
765                        .map_err(|err| {
766                            tracing::error!("Could not send bolt11 {}", err);
767                            Error::CouldNotSendBolt11
768                        })?,
769                    _ => return Err(payment::Error::UnsupportedPaymentOption),
770                };
771
772                // Check payment status for up to 10 seconds
773                let start = std::time::Instant::now();
774                let timeout = std::time::Duration::from_secs(10);
775
776                let (status, payment_details) = loop {
777                    let details = self
778                        .inner
779                        .payment(&payment_id)
780                        .ok_or(Error::PaymentNotFound)?;
781
782                    match details.status {
783                        PaymentStatus::Succeeded => break (MeltQuoteState::Paid, details),
784                        PaymentStatus::Failed => {
785                            tracing::error!("Failed to pay bolt11 payment.");
786                            break (MeltQuoteState::Failed, details);
787                        }
788                        PaymentStatus::Pending => {
789                            if start.elapsed() > timeout {
790                                tracing::warn!(
791                                    "Paying bolt11 exceeded timeout 10 seconds no longer waitning."
792                                );
793                                break (MeltQuoteState::Pending, details);
794                            }
795                            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
796                            continue;
797                        }
798                    }
799                };
800
801                let payment_proof = match payment_details.kind {
802                    PaymentKind::Bolt11 {
803                        hash: _,
804                        preimage,
805                        secret: _,
806                    } => preimage.map(|p| p.to_string()),
807                    _ => return Err(Error::UnexpectedPaymentKind.into()),
808                };
809
810                let total_spent = payment_details
811                    .amount_msat
812                    .ok_or(Error::CouldNotGetAmountSpent)?
813                    + payment_details.fee_paid_msat.unwrap_or_default();
814
815                let total_spent = Amount::new(total_spent, CurrencyUnit::Msat).convert_to(unit)?;
816
817                Ok(MakePaymentResponse {
818                    payment_lookup_id: PaymentIdentifier::PaymentHash(
819                        bolt11.payment_hash().to_byte_array(),
820                    ),
821                    payment_proof,
822                    status,
823                    total_spent,
824                })
825            }
826            OutgoingPaymentOptions::Bolt12(bolt12_options) => {
827                let offer = bolt12_options.offer;
828
829                let payment_id = match bolt12_options.melt_options {
830                    Some(MeltOptions::Amountless { amountless }) => self
831                        .inner
832                        .bolt12_payment()
833                        .send_using_amount(&offer, amountless.amount_msat.into(), None, None, None)
834                        .map_err(Error::LdkNode)?,
835                    None => self
836                        .inner
837                        .bolt12_payment()
838                        .send(&offer, None, None, None)
839                        .map_err(Error::LdkNode)?,
840                    _ => return Err(payment::Error::UnsupportedPaymentOption),
841                };
842
843                // Check payment status for up to 10 seconds
844                let start = std::time::Instant::now();
845                let timeout = std::time::Duration::from_secs(10);
846
847                let (status, payment_details) = loop {
848                    let details = self
849                        .inner
850                        .payment(&payment_id)
851                        .ok_or(Error::PaymentNotFound)?;
852
853                    match details.status {
854                        PaymentStatus::Succeeded => break (MeltQuoteState::Paid, details),
855                        PaymentStatus::Failed => {
856                            tracing::error!("Payment with id {} failed.", payment_id);
857                            break (MeltQuoteState::Failed, details);
858                        }
859                        PaymentStatus::Pending => {
860                            if start.elapsed() > timeout {
861                                tracing::warn!(
862                                    "Payment has been being for 10 seconds. No longer waiting"
863                                );
864                                break (MeltQuoteState::Pending, details);
865                            }
866                            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
867                            continue;
868                        }
869                    }
870                };
871
872                let payment_proof = match payment_details.kind {
873                    PaymentKind::Bolt12Offer {
874                        hash: _,
875                        preimage,
876                        secret: _,
877                        offer_id: _,
878                        payer_note: _,
879                        quantity: _,
880                    } => preimage.map(|p| p.to_string()),
881                    _ => return Err(Error::UnexpectedPaymentKind.into()),
882                };
883
884                let total_spent = payment_details
885                    .amount_msat
886                    .ok_or(Error::CouldNotGetAmountSpent)?
887                    + payment_details.fee_paid_msat.unwrap_or_default();
888
889                let total_spent = Amount::new(total_spent, CurrencyUnit::Msat).convert_to(unit)?;
890
891                Ok(MakePaymentResponse {
892                    payment_lookup_id: PaymentIdentifier::PaymentId(payment_id.0),
893                    payment_proof,
894                    status,
895                    total_spent,
896                })
897            }
898            OutgoingPaymentOptions::Onchain(_) => {
899                Err(cdk_common::payment::Error::UnsupportedPaymentOption)
900            }
901        }
902    }
903
904    /// Listen for invoices to be paid to the mint
905    /// Returns a stream of request_lookup_id once invoices are paid
906    #[instrument(skip(self))]
907    async fn wait_payment_event(
908        &self,
909    ) -> Result<Pin<Box<dyn Stream<Item = cdk_common::payment::Event> + Send>>, Self::Err> {
910        tracing::info!("Starting stream for invoices - wait_any_incoming_payment called");
911
912        // Set active flag to indicate stream is active
913        self.wait_invoice_is_active.store(true, Ordering::SeqCst);
914        tracing::debug!("wait_invoice_is_active set to true");
915
916        let receiver = self.receiver.clone();
917
918        tracing::info!("Receiver obtained successfully, creating response stream");
919
920        // Transform the String stream into a WaitPaymentResponse stream
921        let response_stream = BroadcastStream::new(receiver.resubscribe());
922
923        // Map the stream to handle BroadcastStreamRecvError and wrap in Event
924        let response_stream = response_stream.filter_map(|result| async move {
925            match result {
926                Ok(payment) => Some(cdk_common::payment::Event::PaymentReceived(payment)),
927                Err(err) => {
928                    tracing::warn!("Error in broadcast stream: {}", err);
929                    None
930                }
931            }
932        });
933
934        // Create a combined stream that also handles cancellation
935        let cancel_token = self.wait_invoice_cancel_token.clone();
936        let is_active = self.wait_invoice_is_active.clone();
937
938        let stream = Box::pin(response_stream);
939
940        // Set up a task to clean up when the stream is dropped
941        tokio::spawn(async move {
942            cancel_token.cancelled().await;
943            tracing::info!("wait_invoice stream cancelled");
944            is_active.store(false, Ordering::SeqCst);
945        });
946
947        tracing::info!("wait_any_incoming_payment returning stream");
948        Ok(stream)
949    }
950
951    /// Is payment event stream active
952    fn is_payment_event_stream_active(&self) -> bool {
953        self.wait_invoice_is_active.load(Ordering::SeqCst)
954    }
955
956    /// Cancel payment event stream
957    fn cancel_payment_event_stream(&self) {
958        self.wait_invoice_cancel_token.cancel()
959    }
960
961    /// Check the status of an incoming payment
962    async fn check_incoming_payment_status(
963        &self,
964        payment_identifier: &PaymentIdentifier,
965    ) -> Result<Vec<WaitPaymentResponse>, Self::Err> {
966        let payment_id_str = match payment_identifier {
967            PaymentIdentifier::PaymentHash(hash) => hex::encode(hash),
968            PaymentIdentifier::CustomId(id) => id.clone(),
969            _ => return Err(Error::UnsupportedPaymentIdentifierType.into()),
970        };
971
972        let payment_id = PaymentId(
973            hex::decode(&payment_id_str)?
974                .try_into()
975                .map_err(|_| Error::InvalidPaymentIdLength)?,
976        );
977
978        let payment_details = self
979            .inner
980            .payment(&payment_id)
981            .ok_or(Error::PaymentNotFound)?;
982
983        if payment_details.direction == PaymentDirection::Outbound {
984            return Err(Error::InvalidPaymentDirection.into());
985        }
986
987        let amount = if payment_details.status == PaymentStatus::Succeeded {
988            payment_details
989                .amount_msat
990                .ok_or(Error::CouldNotGetPaymentAmount)?
991        } else {
992            return Ok(vec![]);
993        };
994
995        let response = WaitPaymentResponse {
996            payment_identifier: payment_identifier.clone(),
997            payment_amount: Amount::new(amount, CurrencyUnit::Msat),
998            payment_id: payment_id_str,
999        };
1000
1001        Ok(vec![response])
1002    }
1003
1004    /// Check the status of an outgoing payment
1005    async fn check_outgoing_payment(
1006        &self,
1007        request_lookup_id: &PaymentIdentifier,
1008    ) -> Result<MakePaymentResponse, Self::Err> {
1009        let payment_details = match request_lookup_id {
1010            PaymentIdentifier::PaymentHash(id_hash) => self
1011                .inner
1012                .list_payments_with_filter(
1013                    |p| matches!(&p.kind, PaymentKind::Bolt11 { hash, .. } if &hash.0 == id_hash),
1014                )
1015                .first()
1016                .cloned(),
1017            PaymentIdentifier::PaymentId(id) => self.inner.payment(&PaymentId(
1018                hex::decode(id)?
1019                    .try_into()
1020                    .map_err(|_| payment::Error::Custom("Invalid hex".to_string()))?,
1021            )),
1022            _ => {
1023                return Ok(MakePaymentResponse {
1024                    payment_lookup_id: request_lookup_id.clone(),
1025                    payment_proof: None,
1026                    status: MeltQuoteState::Unknown,
1027                    total_spent: Amount::new(0, CurrencyUnit::Msat),
1028                });
1029            }
1030        }
1031        .ok_or(Error::PaymentNotFound)?;
1032
1033        // This check seems reversed in the original code, so I'm fixing it here
1034        if payment_details.direction != PaymentDirection::Outbound {
1035            return Err(Error::InvalidPaymentDirection.into());
1036        }
1037
1038        let status = match payment_details.status {
1039            PaymentStatus::Pending => MeltQuoteState::Pending,
1040            PaymentStatus::Succeeded => MeltQuoteState::Paid,
1041            PaymentStatus::Failed => MeltQuoteState::Failed,
1042        };
1043
1044        let payment_proof = match payment_details.kind {
1045            PaymentKind::Bolt11 {
1046                hash: _,
1047                preimage,
1048                secret: _,
1049            } => preimage.map(|p| p.to_string()),
1050            _ => return Err(Error::UnexpectedPaymentKind.into()),
1051        };
1052
1053        let total_spent = payment_details
1054            .amount_msat
1055            .ok_or(Error::CouldNotGetAmountSpent)?;
1056
1057        Ok(MakePaymentResponse {
1058            payment_lookup_id: request_lookup_id.clone(),
1059            payment_proof,
1060            status,
1061            total_spent: Amount::new(total_spent, CurrencyUnit::Msat),
1062        })
1063    }
1064}
1065
1066impl Drop for CdkLdkNode {
1067    fn drop(&mut self) {
1068        tracing::info!("Drop called on CdkLdkNode");
1069        self.wait_invoice_cancel_token.cancel();
1070        tracing::debug!("Cancelled wait_invoice token in drop");
1071    }
1072}