Skip to main content

cdk_ldk_node/
lib.rs

1//! CDK lightning backend for ldk-node
2
3#![doc = include_str!("../README.md")]
4
5use std::net::SocketAddr;
6use std::pin::Pin;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use bip39::Mnemonic;
12use cdk_common::common::FeeReserve;
13use cdk_common::payment::{self, *};
14use cdk_common::util::{hex, unix_time};
15use cdk_common::{Amount, CurrencyUnit, MeltOptions, MeltQuoteState};
16use futures::{Stream, StreamExt};
17use ldk_node::bitcoin::hashes::Hash;
18use ldk_node::bitcoin::Network;
19use ldk_node::lightning::ln::channelmanager::PaymentId;
20use ldk_node::lightning::ln::msgs::SocketAddress;
21use ldk_node::lightning::routing::router::RouteParametersConfig;
22use ldk_node::lightning_invoice::{Bolt11InvoiceDescription, Description};
23use ldk_node::lightning_types::payment::PaymentHash;
24use ldk_node::logger::{LogLevel, LogWriter};
25use ldk_node::payment::{PaymentDirection, PaymentKind, PaymentStatus};
26use ldk_node::{Builder, Event, Node};
27use tokio_stream::wrappers::BroadcastStream;
28use tokio_util::sync::CancellationToken;
29use tracing::instrument;
30
31use crate::error::Error;
32use crate::log::StdoutLogWriter;
33
34mod error;
35mod log;
36mod web;
37
38/// CDK Lightning backend using LDK Node
39///
40/// Provides Lightning Network functionality for CDK with support for Cashu operations.
41/// Handles payment creation, processing, and event management using the Lightning Development Kit.
42#[derive(Clone)]
43pub struct CdkLdkNode {
44    inner: Arc<Node>,
45    fee_reserve: FeeReserve,
46    wait_invoice_cancel_token: CancellationToken,
47    wait_invoice_is_active: Arc<AtomicBool>,
48    sender: tokio::sync::broadcast::Sender<WaitPaymentResponse>,
49    receiver: Arc<tokio::sync::broadcast::Receiver<WaitPaymentResponse>>,
50    events_cancel_token: CancellationToken,
51    web_addr: Option<SocketAddr>,
52}
53
54impl std::fmt::Debug for CdkLdkNode {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        f.debug_struct("CdkLdkNode")
57            .field("fee_reserve", &self.fee_reserve)
58            .field("web_addr", &self.web_addr)
59            .finish_non_exhaustive()
60    }
61}
62
63/// Configuration for connecting to Bitcoin RPC
64///
65/// Contains the necessary connection parameters for Bitcoin Core RPC interface.
66#[derive(Debug, Clone)]
67pub struct BitcoinRpcConfig {
68    /// Bitcoin RPC server hostname or IP address
69    pub host: String,
70    /// Bitcoin RPC server port number
71    pub port: u16,
72    /// Username for Bitcoin RPC authentication
73    pub user: String,
74    /// Password for Bitcoin RPC authentication
75    pub password: String,
76}
77
78/// Source of blockchain data for the Lightning node
79///
80/// Specifies how the node should connect to the Bitcoin network to retrieve
81/// blockchain information and broadcast transactions.
82#[derive(Debug, Clone)]
83pub enum ChainSource {
84    /// Use an Esplora server for blockchain data
85    ///
86    /// Contains the URL of the Esplora server endpoint
87    Esplora(String),
88    /// Use Bitcoin Core RPC for blockchain data
89    ///
90    /// Contains the configuration for connecting to Bitcoin Core
91    BitcoinRpc(BitcoinRpcConfig),
92}
93
94/// Source of Lightning network gossip data
95///
96/// Specifies how the node should learn about the Lightning Network topology
97/// and routing information.
98#[derive(Debug, Clone)]
99pub enum GossipSource {
100    /// Learn gossip through peer-to-peer connections
101    ///
102    /// The node will connect to other Lightning nodes and exchange gossip data directly
103    P2P,
104    /// Use Rapid Gossip Sync for efficient gossip updates
105    ///
106    /// Contains the URL of the RGS server for compressed gossip data
107    RapidGossipSync(String),
108}
109/// A builder for an [`CdkLdkNode`] instance.
110#[derive(Debug)]
111pub struct CdkLdkNodeBuilder {
112    network: Network,
113    chain_source: ChainSource,
114    gossip_source: GossipSource,
115    log_dir_path: Option<String>,
116    storage_dir_path: String,
117    fee_reserve: FeeReserve,
118    listening_addresses: Vec<SocketAddress>,
119    seed: Option<Mnemonic>,
120    announcement_addresses: Option<Vec<SocketAddress>>,
121}
122
123impl CdkLdkNodeBuilder {
124    /// Creates a new builder instance.
125    pub fn new(
126        network: Network,
127        chain_source: ChainSource,
128        gossip_source: GossipSource,
129        storage_dir_path: String,
130        fee_reserve: FeeReserve,
131        listening_addresses: Vec<SocketAddress>,
132    ) -> Self {
133        Self {
134            network,
135            chain_source,
136            gossip_source,
137            storage_dir_path,
138            fee_reserve,
139            listening_addresses,
140            seed: None,
141            announcement_addresses: None,
142            log_dir_path: None,
143        }
144    }
145
146    /// Configures the [`CdkLdkNode`] to use the Mnemonic for entropy source configuration
147    pub fn with_seed(mut self, seed: Mnemonic) -> Self {
148        self.seed = Some(seed);
149        self
150    }
151    /// Configures the [`CdkLdkNode`] to use announce this address to the lightning network
152    pub fn with_announcement_address(mut self, announcement_addresses: Vec<SocketAddress>) -> Self {
153        self.announcement_addresses = Some(announcement_addresses);
154        self
155    }
156    /// Configures the [`CdkLdkNode`] to use announce this address to the lightning network
157    pub fn with_log_dir_path(mut self, log_dir_path: String) -> Self {
158        self.log_dir_path = Some(log_dir_path);
159        self
160    }
161
162    /// Builds the [`CdkLdkNode`] instance
163    ///
164    /// # Errors
165    /// Returns an error if the LDK node builder fails to create the node
166    pub fn build(self) -> Result<CdkLdkNode, Error> {
167        let mut ldk = Builder::new();
168        ldk.set_network(self.network);
169        tracing::info!("Storage dir of node is {}", self.storage_dir_path);
170        ldk.set_storage_dir_path(self.storage_dir_path);
171
172        match self.chain_source {
173            ChainSource::Esplora(esplora_url) => {
174                ldk.set_chain_source_esplora(esplora_url, None);
175            }
176            ChainSource::BitcoinRpc(BitcoinRpcConfig {
177                host,
178                port,
179                user,
180                password,
181            }) => {
182                ldk.set_chain_source_bitcoind_rpc(host, port, user, password);
183            }
184        }
185
186        match self.gossip_source {
187            GossipSource::P2P => {
188                ldk.set_gossip_source_p2p();
189            }
190            GossipSource::RapidGossipSync(rgs_url) => {
191                ldk.set_gossip_source_rgs(rgs_url);
192            }
193        }
194
195        ldk.set_listening_addresses(self.listening_addresses)?;
196        if self.log_dir_path.is_some() {
197            ldk.set_filesystem_logger(self.log_dir_path, Some(LogLevel::Info));
198        } else {
199            ldk.set_custom_logger(Arc::new(StdoutLogWriter));
200        }
201
202        ldk.set_node_alias("cdk-ldk-node".to_string())?;
203        // set the seed as bip39 entropy mnemonic
204        if let Some(seed) = self.seed {
205            ldk.set_entropy_bip39_mnemonic(seed, None);
206        }
207        // set the announcement addresses
208        if let Some(announcement_addresses) = self.announcement_addresses {
209            ldk.set_announcement_addresses(announcement_addresses)?;
210        }
211
212        let node = ldk.build()?;
213
214        tracing::info!("Creating tokio channel for payment notifications");
215        let (sender, receiver) = tokio::sync::broadcast::channel(8);
216
217        let id = node.node_id();
218
219        let adr = node.announcement_addresses();
220
221        tracing::info!(
222            "Created node {} with address {:?} on network {}",
223            id,
224            adr,
225            self.network
226        );
227
228        Ok(CdkLdkNode {
229            inner: node.into(),
230            fee_reserve: self.fee_reserve,
231            wait_invoice_cancel_token: CancellationToken::new(),
232            wait_invoice_is_active: Arc::new(AtomicBool::new(false)),
233            sender,
234            receiver: Arc::new(receiver),
235            events_cancel_token: CancellationToken::new(),
236            web_addr: None,
237        })
238    }
239}
240
241impl CdkLdkNode {
242    /// Set the web server address for the LDK node management interface
243    ///
244    /// # Arguments
245    /// * `addr` - Socket address for the web server. If None, no web server will be started.
246    pub fn set_web_addr(&mut self, addr: Option<SocketAddr>) {
247        self.web_addr = addr;
248    }
249
250    /// Get a default web server address using an unused port
251    ///
252    /// Returns a SocketAddr with localhost and port 0, which will cause
253    /// the system to automatically assign an available port
254    pub fn default_web_addr() -> SocketAddr {
255        SocketAddr::from(([127, 0, 0, 1], 8091))
256    }
257
258    /// Start the CDK LDK Node
259    ///
260    /// Starts the underlying LDK node and begins event processing.
261    /// Sets up event handlers to listen for Lightning events like payment received.
262    ///
263    /// # Returns
264    /// Returns `Ok(())` on successful start, error otherwise
265    ///
266    /// # Errors
267    /// Returns an error if the LDK node fails to start or event handling setup fails
268    pub fn start_ldk_node(&self) -> Result<(), Error> {
269        tracing::info!("Starting cdk-ldk node");
270        self.inner.start()?;
271        let node_config = self.inner.config();
272
273        tracing::info!("Starting node with network {}", node_config.network);
274
275        tracing::info!("Node status: {:?}", self.inner.status());
276
277        self.handle_events()?;
278
279        Ok(())
280    }
281
282    /// Start the web server for the LDK node management interface
283    ///
284    /// Starts a web server that provides a user interface for managing the LDK node.
285    /// The web interface allows users to view balances, manage channels, create invoices,
286    /// and send payments.
287    ///
288    /// # Arguments
289    /// * `web_addr` - The socket address to bind the web server to
290    ///
291    /// # Returns
292    /// Returns `Ok(())` on successful start, error otherwise
293    ///
294    /// # Errors
295    /// Returns an error if the web server fails to start
296    pub fn start_web_server(&self, web_addr: SocketAddr) -> Result<(), Error> {
297        let web_server = crate::web::WebServer::new(Arc::new(self.clone()));
298
299        tokio::spawn(async move {
300            if let Err(e) = web_server.serve(web_addr).await {
301                tracing::error!("Web server error: {}", e);
302            }
303        });
304
305        Ok(())
306    }
307
308    /// Stop the CDK LDK Node
309    ///
310    /// Gracefully stops the node by cancelling all active tasks and event handlers.
311    /// This includes:
312    /// - Cancelling the event handler task
313    /// - Cancelling any active wait_invoice streams
314    /// - Stopping the underlying LDK node
315    ///
316    /// # Returns
317    /// Returns `Ok(())` on successful shutdown, error otherwise
318    ///
319    /// # Errors
320    /// Returns an error if the underlying LDK node fails to stop
321    pub fn stop_ldk_node(&self) -> Result<(), Error> {
322        tracing::info!("Stopping CdkLdkNode");
323        // Cancel all tokio tasks
324        tracing::info!("Cancelling event handler");
325        self.events_cancel_token.cancel();
326
327        // Cancel any wait_invoice streams
328        if self.is_wait_invoice_active() {
329            tracing::info!("Cancelling wait_invoice stream");
330            self.wait_invoice_cancel_token.cancel();
331        }
332
333        // Stop the LDK node
334        tracing::info!("Stopping LDK node");
335        self.inner.stop()?;
336        tracing::info!("CdkLdkNode stopped successfully");
337        Ok(())
338    }
339
340    /// Handle payment received event
341    async fn handle_payment_received(
342        node: &Arc<Node>,
343        sender: &tokio::sync::broadcast::Sender<WaitPaymentResponse>,
344        payment_id: Option<PaymentId>,
345        payment_hash: PaymentHash,
346        amount_msat: u64,
347    ) {
348        tracing::info!(
349            "Received payment for hash={} of amount={} msat",
350            payment_hash,
351            amount_msat
352        );
353
354        let payment_id = match payment_id {
355            Some(id) => id,
356            None => {
357                tracing::warn!("Received payment without payment_id");
358                return;
359            }
360        };
361
362        let payment_id_hex = hex::encode(payment_id.0);
363
364        if amount_msat == 0 {
365            tracing::warn!("Payment of no amount");
366            return;
367        }
368
369        tracing::info!(
370            "Processing payment notification: id={}, amount={} msats",
371            payment_id_hex,
372            amount_msat
373        );
374
375        let payment_details = match node.payment(&payment_id) {
376            Some(details) => details,
377            None => {
378                tracing::error!("Could not find payment details for id={}", payment_id_hex);
379                return;
380            }
381        };
382
383        let (payment_identifier, payment_id) = match payment_details.kind {
384            PaymentKind::Bolt11 { hash, .. } => {
385                (PaymentIdentifier::PaymentHash(hash.0), hash.to_string())
386            }
387            PaymentKind::Bolt12Offer { hash, offer_id, .. } => match hash {
388                Some(h) => (
389                    PaymentIdentifier::OfferId(offer_id.to_string()),
390                    h.to_string(),
391                ),
392                None => {
393                    tracing::error!("Bolt12 payment missing hash");
394                    return;
395                }
396            },
397            k => {
398                tracing::warn!("Received payment of kind {:?} which is not supported", k);
399                return;
400            }
401        };
402
403        let wait_payment_response = WaitPaymentResponse {
404            payment_identifier,
405            payment_amount: Amount::new(amount_msat, CurrencyUnit::Msat),
406            payment_id,
407        };
408
409        match sender.send(wait_payment_response) {
410            Ok(_) => tracing::info!("Successfully sent payment notification to stream"),
411            Err(err) => tracing::error!(
412                "Could not send payment received notification on channel: {}",
413                err
414            ),
415        }
416    }
417
418    /// Set up event handling for the node
419    pub fn handle_events(&self) -> Result<(), Error> {
420        let node = self.inner.clone();
421        let sender = self.sender.clone();
422        let cancel_token = self.events_cancel_token.clone();
423
424        tracing::info!("Starting event handler task");
425
426        tokio::spawn(async move {
427            tracing::info!("Event handler loop started");
428            loop {
429                tokio::select! {
430                    _ = cancel_token.cancelled() => {
431                        tracing::info!("Event handler cancelled");
432                        break;
433                    }
434                    event = node.next_event_async() => {
435                        match event {
436                            Event::PaymentReceived {
437                                payment_id,
438                                payment_hash,
439                                amount_msat,
440                                custom_records: _
441                            } => {
442                                Self::handle_payment_received(
443                                    &node,
444                                    &sender,
445                                    payment_id,
446                                    payment_hash,
447                                    amount_msat
448                                ).await;
449                            }
450                            event => {
451                                tracing::debug!("Received other ldk node event: {:?}", event);
452                            }
453                        }
454
455                        if let Err(err) = node.event_handled() {
456                            tracing::error!("Error handling node event: {}", err);
457                        } else {
458                            tracing::debug!("Successfully handled node event");
459                        }
460                    }
461                }
462            }
463            tracing::info!("Event handler loop terminated");
464        });
465
466        tracing::info!("Event handler task spawned");
467        Ok(())
468    }
469
470    /// Get Node used
471    pub fn node(&self) -> Arc<Node> {
472        Arc::clone(&self.inner)
473    }
474}
475
476/// Mint payment trait
477#[async_trait]
478impl MintPayment for CdkLdkNode {
479    type Err = payment::Error;
480
481    /// Start the payment processor
482    /// Starts the LDK node and begins event processing
483    async fn start(&self) -> Result<(), Self::Err> {
484        self.start_ldk_node().map_err(|e| {
485            tracing::error!("Failed to start CdkLdkNode: {}", e);
486            e
487        })?;
488
489        tracing::info!("CdkLdkNode payment processor started successfully");
490
491        // Start web server if configured
492        if let Some(web_addr) = self.web_addr {
493            tracing::info!("Starting LDK Node web interface on {}", web_addr);
494            self.start_web_server(web_addr).map_err(|e| {
495                tracing::error!("Failed to start web server: {}", e);
496                e
497            })?;
498        } else {
499            tracing::info!("No web server address configured, skipping web interface");
500        }
501
502        Ok(())
503    }
504
505    /// Stop the payment processor
506    /// Gracefully stops the LDK node and cancels all background tasks
507    async fn stop(&self) -> Result<(), Self::Err> {
508        self.stop_ldk_node().map_err(|e| {
509            tracing::error!("Failed to stop CdkLdkNode: {}", e);
510            e.into()
511        })
512    }
513
514    /// Base Settings
515    async fn get_settings(&self) -> Result<SettingsResponse, Self::Err> {
516        let settings = SettingsResponse {
517            unit: CurrencyUnit::Msat.to_string(),
518            bolt11: Some(payment::Bolt11Settings {
519                mpp: false,
520                amountless: true,
521                invoice_description: true,
522            }),
523            bolt12: Some(payment::Bolt12Settings { amountless: true }),
524            custom: std::collections::HashMap::new(),
525        };
526        Ok(settings)
527    }
528
529    /// Create a new invoice
530    #[instrument(skip(self))]
531    async fn create_incoming_payment_request(
532        &self,
533        unit: &CurrencyUnit,
534        options: IncomingPaymentOptions,
535    ) -> Result<CreateIncomingPaymentResponse, Self::Err> {
536        match options {
537            IncomingPaymentOptions::Bolt11(bolt11_options) => {
538                let amount_msat: Amount = Amount::new(bolt11_options.amount.into(), unit.clone())
539                    .convert_to(&CurrencyUnit::Msat)?
540                    .into();
541                let description = bolt11_options.description.unwrap_or_default();
542                let time = bolt11_options
543                    .unix_expiry
544                    .map(|t| t - unix_time())
545                    .unwrap_or(36000);
546
547                let description = Bolt11InvoiceDescription::Direct(
548                    Description::new(description).map_err(|_| Error::InvalidDescription)?,
549                );
550
551                let payment = self
552                    .inner
553                    .bolt11_payment()
554                    .receive(amount_msat.into(), &description, time as u32)
555                    .map_err(Error::LdkNode)?;
556
557                let payment_hash = payment.payment_hash().to_string();
558                let payment_identifier = PaymentIdentifier::PaymentHash(
559                    hex::decode(&payment_hash)?
560                        .try_into()
561                        .map_err(|_| Error::InvalidPaymentHashLength)?,
562                );
563
564                Ok(CreateIncomingPaymentResponse {
565                    request_lookup_id: payment_identifier,
566                    request: payment.to_string(),
567                    expiry: Some(unix_time() + time),
568                    extra_json: None,
569                })
570            }
571            IncomingPaymentOptions::Bolt12(bolt12_options) => {
572                let Bolt12IncomingPaymentOptions {
573                    description,
574                    amount,
575                    unix_expiry,
576                } = *bolt12_options;
577
578                let time = unix_expiry.map(|t| (t - unix_time()) as u32);
579
580                let offer = match amount {
581                    Some(amount) => {
582                        let amount_msat: Amount = Amount::new(amount.into(), unit.clone())
583                            .convert_to(&CurrencyUnit::Msat)?
584                            .into();
585
586                        self.inner
587                            .bolt12_payment()
588                            .receive(
589                                amount_msat.into(),
590                                &description.unwrap_or("".to_string()),
591                                time,
592                                None,
593                            )
594                            .map_err(Error::LdkNode)?
595                    }
596                    None => self
597                        .inner
598                        .bolt12_payment()
599                        .receive_variable_amount(&description.unwrap_or("".to_string()), time)
600                        .map_err(Error::LdkNode)?,
601                };
602                let payment_identifier = PaymentIdentifier::OfferId(offer.id().to_string());
603
604                Ok(CreateIncomingPaymentResponse {
605                    request_lookup_id: payment_identifier,
606                    request: offer.to_string(),
607                    expiry: time.map(|a| a as u64),
608                    extra_json: None,
609                })
610            }
611            cdk_common::payment::IncomingPaymentOptions::Custom(_) => {
612                Err(cdk_common::payment::Error::UnsupportedPaymentOption)
613            }
614        }
615    }
616
617    /// Get payment quote
618    /// Used to get fee and amount required for a payment request
619    #[instrument(skip_all)]
620    async fn get_payment_quote(
621        &self,
622        unit: &CurrencyUnit,
623        options: OutgoingPaymentOptions,
624    ) -> Result<PaymentQuoteResponse, Self::Err> {
625        match options {
626            cdk_common::payment::OutgoingPaymentOptions::Custom(_) => {
627                Err(cdk_common::payment::Error::UnsupportedPaymentOption)
628            }
629            OutgoingPaymentOptions::Bolt11(bolt11_options) => {
630                let bolt11 = bolt11_options.bolt11;
631
632                let amount_msat = match bolt11_options.melt_options {
633                    Some(melt_options) => melt_options.amount_msat(),
634                    None => bolt11
635                        .amount_milli_satoshis()
636                        .ok_or(Error::UnknownInvoiceAmount)?
637                        .into(),
638                };
639
640                let amount =
641                    Amount::new(amount_msat.into(), CurrencyUnit::Msat).convert_to(unit)?;
642
643                let relative_fee_reserve =
644                    (self.fee_reserve.percent_fee_reserve * amount.value() as f32) as u64;
645
646                let absolute_fee_reserve: u64 = self.fee_reserve.min_fee_reserve.into();
647
648                let fee = match relative_fee_reserve > absolute_fee_reserve {
649                    true => relative_fee_reserve,
650                    false => absolute_fee_reserve,
651                };
652
653                let payment_hash = bolt11.payment_hash().to_string();
654                let payment_hash_bytes = hex::decode(&payment_hash)?
655                    .try_into()
656                    .map_err(|_| Error::InvalidPaymentHashLength)?;
657
658                Ok(PaymentQuoteResponse {
659                    request_lookup_id: Some(PaymentIdentifier::PaymentHash(payment_hash_bytes)),
660                    amount,
661                    fee: Amount::new(fee, unit.clone()),
662                    state: MeltQuoteState::Unpaid,
663                })
664            }
665            OutgoingPaymentOptions::Bolt12(bolt12_options) => {
666                let offer = bolt12_options.offer;
667
668                let amount_msat = match bolt12_options.melt_options {
669                    Some(melt_options) => melt_options.amount_msat(),
670                    None => {
671                        let amount = offer.amount().ok_or(payment::Error::AmountMismatch)?;
672
673                        match amount {
674                            ldk_node::lightning::offers::offer::Amount::Bitcoin {
675                                amount_msats,
676                            } => amount_msats.into(),
677                            _ => return Err(payment::Error::AmountMismatch),
678                        }
679                    }
680                };
681                let amount =
682                    Amount::new(amount_msat.into(), CurrencyUnit::Msat).convert_to(unit)?;
683
684                let relative_fee_reserve =
685                    (self.fee_reserve.percent_fee_reserve * amount.value() as f32) as u64;
686
687                let absolute_fee_reserve: u64 = self.fee_reserve.min_fee_reserve.into();
688
689                let fee = match relative_fee_reserve > absolute_fee_reserve {
690                    true => relative_fee_reserve,
691                    false => absolute_fee_reserve,
692                };
693
694                Ok(PaymentQuoteResponse {
695                    request_lookup_id: None,
696                    amount,
697                    fee: Amount::new(fee, unit.clone()),
698                    state: MeltQuoteState::Unpaid,
699                })
700            }
701        }
702    }
703
704    /// Pay request
705    #[instrument(skip(self, options))]
706    async fn make_payment(
707        &self,
708        unit: &CurrencyUnit,
709        options: OutgoingPaymentOptions,
710    ) -> Result<MakePaymentResponse, Self::Err> {
711        match options {
712            cdk_common::payment::OutgoingPaymentOptions::Custom(_) => {
713                Err(cdk_common::payment::Error::UnsupportedPaymentOption)
714            }
715            OutgoingPaymentOptions::Bolt11(bolt11_options) => {
716                let bolt11 = bolt11_options.bolt11;
717
718                let send_params = match bolt11_options
719                    .max_fee_amount
720                    .map(|f| {
721                        Amount::new(f.into(), unit.clone())
722                            .convert_to(&CurrencyUnit::Msat)
723                            .map(|amount_msat| RouteParametersConfig {
724                                max_total_routing_fee_msat: Some(amount_msat.value()),
725                                ..Default::default()
726                            })
727                    })
728                    .transpose()
729                {
730                    Ok(params) => params,
731                    Err(err) => {
732                        tracing::error!("Failed to convert fee amount: {}", err);
733                        return Err(payment::Error::Custom(format!("Invalid fee amount: {err}")));
734                    }
735                };
736
737                let payment_id = match bolt11_options.melt_options {
738                    Some(MeltOptions::Amountless { amountless }) => self
739                        .inner
740                        .bolt11_payment()
741                        .send_using_amount(&bolt11, amountless.amount_msat.into(), send_params)
742                        .map_err(|err| {
743                            tracing::error!("Could not send send amountless bolt11: {}", err);
744                            Error::CouldNotSendBolt11WithoutAmount
745                        })?,
746                    None => self
747                        .inner
748                        .bolt11_payment()
749                        .send(&bolt11, send_params)
750                        .map_err(|err| {
751                            tracing::error!("Could not send bolt11 {}", err);
752                            Error::CouldNotSendBolt11
753                        })?,
754                    _ => return Err(payment::Error::UnsupportedPaymentOption),
755                };
756
757                // Check payment status for up to 10 seconds
758                let start = std::time::Instant::now();
759                let timeout = std::time::Duration::from_secs(10);
760
761                let (status, payment_details) = loop {
762                    let details = self
763                        .inner
764                        .payment(&payment_id)
765                        .ok_or(Error::PaymentNotFound)?;
766
767                    match details.status {
768                        PaymentStatus::Succeeded => break (MeltQuoteState::Paid, details),
769                        PaymentStatus::Failed => {
770                            tracing::error!("Failed to pay bolt11 payment.");
771                            break (MeltQuoteState::Failed, details);
772                        }
773                        PaymentStatus::Pending => {
774                            if start.elapsed() > timeout {
775                                tracing::warn!(
776                                    "Paying bolt11 exceeded timeout 10 seconds no longer waitning."
777                                );
778                                break (MeltQuoteState::Pending, details);
779                            }
780                            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
781                            continue;
782                        }
783                    }
784                };
785
786                let payment_proof = match payment_details.kind {
787                    PaymentKind::Bolt11 {
788                        hash: _,
789                        preimage,
790                        secret: _,
791                    } => preimage.map(|p| p.to_string()),
792                    _ => return Err(Error::UnexpectedPaymentKind.into()),
793                };
794
795                let total_spent = payment_details
796                    .amount_msat
797                    .ok_or(Error::CouldNotGetAmountSpent)?
798                    + payment_details.fee_paid_msat.unwrap_or_default();
799
800                let total_spent = Amount::new(total_spent, CurrencyUnit::Msat).convert_to(unit)?;
801
802                Ok(MakePaymentResponse {
803                    payment_lookup_id: PaymentIdentifier::PaymentHash(
804                        bolt11.payment_hash().to_byte_array(),
805                    ),
806                    payment_proof,
807                    status,
808                    total_spent,
809                })
810            }
811            OutgoingPaymentOptions::Bolt12(bolt12_options) => {
812                let offer = bolt12_options.offer;
813
814                let payment_id = match bolt12_options.melt_options {
815                    Some(MeltOptions::Amountless { amountless }) => self
816                        .inner
817                        .bolt12_payment()
818                        .send_using_amount(&offer, amountless.amount_msat.into(), None, None, None)
819                        .map_err(Error::LdkNode)?,
820                    None => self
821                        .inner
822                        .bolt12_payment()
823                        .send(&offer, None, None, None)
824                        .map_err(Error::LdkNode)?,
825                    _ => return Err(payment::Error::UnsupportedPaymentOption),
826                };
827
828                // Check payment status for up to 10 seconds
829                let start = std::time::Instant::now();
830                let timeout = std::time::Duration::from_secs(10);
831
832                let (status, payment_details) = loop {
833                    let details = self
834                        .inner
835                        .payment(&payment_id)
836                        .ok_or(Error::PaymentNotFound)?;
837
838                    match details.status {
839                        PaymentStatus::Succeeded => break (MeltQuoteState::Paid, details),
840                        PaymentStatus::Failed => {
841                            tracing::error!("Payment with id {} failed.", payment_id);
842                            break (MeltQuoteState::Failed, details);
843                        }
844                        PaymentStatus::Pending => {
845                            if start.elapsed() > timeout {
846                                tracing::warn!(
847                                    "Payment has been being for 10 seconds. No longer waiting"
848                                );
849                                break (MeltQuoteState::Pending, details);
850                            }
851                            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
852                            continue;
853                        }
854                    }
855                };
856
857                let payment_proof = match payment_details.kind {
858                    PaymentKind::Bolt12Offer {
859                        hash: _,
860                        preimage,
861                        secret: _,
862                        offer_id: _,
863                        payer_note: _,
864                        quantity: _,
865                    } => preimage.map(|p| p.to_string()),
866                    _ => return Err(Error::UnexpectedPaymentKind.into()),
867                };
868
869                let total_spent = payment_details
870                    .amount_msat
871                    .ok_or(Error::CouldNotGetAmountSpent)?
872                    + payment_details.fee_paid_msat.unwrap_or_default();
873
874                let total_spent = Amount::new(total_spent, CurrencyUnit::Msat).convert_to(unit)?;
875
876                Ok(MakePaymentResponse {
877                    payment_lookup_id: PaymentIdentifier::PaymentId(payment_id.0),
878                    payment_proof,
879                    status,
880                    total_spent,
881                })
882            }
883        }
884    }
885
886    /// Listen for invoices to be paid to the mint
887    /// Returns a stream of request_lookup_id once invoices are paid
888    #[instrument(skip(self))]
889    async fn wait_payment_event(
890        &self,
891    ) -> Result<Pin<Box<dyn Stream<Item = cdk_common::payment::Event> + Send>>, Self::Err> {
892        tracing::info!("Starting stream for invoices - wait_any_incoming_payment called");
893
894        // Set active flag to indicate stream is active
895        self.wait_invoice_is_active.store(true, Ordering::SeqCst);
896        tracing::debug!("wait_invoice_is_active set to true");
897
898        let receiver = self.receiver.clone();
899
900        tracing::info!("Receiver obtained successfully, creating response stream");
901
902        // Transform the String stream into a WaitPaymentResponse stream
903        let response_stream = BroadcastStream::new(receiver.resubscribe());
904
905        // Map the stream to handle BroadcastStreamRecvError and wrap in Event
906        let response_stream = response_stream.filter_map(|result| async move {
907            match result {
908                Ok(payment) => Some(cdk_common::payment::Event::PaymentReceived(payment)),
909                Err(err) => {
910                    tracing::warn!("Error in broadcast stream: {}", err);
911                    None
912                }
913            }
914        });
915
916        // Create a combined stream that also handles cancellation
917        let cancel_token = self.wait_invoice_cancel_token.clone();
918        let is_active = self.wait_invoice_is_active.clone();
919
920        let stream = Box::pin(response_stream);
921
922        // Set up a task to clean up when the stream is dropped
923        tokio::spawn(async move {
924            cancel_token.cancelled().await;
925            tracing::info!("wait_invoice stream cancelled");
926            is_active.store(false, Ordering::SeqCst);
927        });
928
929        tracing::info!("wait_any_incoming_payment returning stream");
930        Ok(stream)
931    }
932
933    /// Is wait invoice active
934    fn is_wait_invoice_active(&self) -> bool {
935        self.wait_invoice_is_active.load(Ordering::SeqCst)
936    }
937
938    /// Cancel wait invoice
939    fn cancel_wait_invoice(&self) {
940        self.wait_invoice_cancel_token.cancel()
941    }
942
943    /// Check the status of an incoming payment
944    async fn check_incoming_payment_status(
945        &self,
946        payment_identifier: &PaymentIdentifier,
947    ) -> Result<Vec<WaitPaymentResponse>, Self::Err> {
948        let payment_id_str = match payment_identifier {
949            PaymentIdentifier::PaymentHash(hash) => hex::encode(hash),
950            PaymentIdentifier::CustomId(id) => id.clone(),
951            _ => return Err(Error::UnsupportedPaymentIdentifierType.into()),
952        };
953
954        let payment_id = PaymentId(
955            hex::decode(&payment_id_str)?
956                .try_into()
957                .map_err(|_| Error::InvalidPaymentIdLength)?,
958        );
959
960        let payment_details = self
961            .inner
962            .payment(&payment_id)
963            .ok_or(Error::PaymentNotFound)?;
964
965        if payment_details.direction == PaymentDirection::Outbound {
966            return Err(Error::InvalidPaymentDirection.into());
967        }
968
969        let amount = if payment_details.status == PaymentStatus::Succeeded {
970            payment_details
971                .amount_msat
972                .ok_or(Error::CouldNotGetPaymentAmount)?
973        } else {
974            return Ok(vec![]);
975        };
976
977        let response = WaitPaymentResponse {
978            payment_identifier: payment_identifier.clone(),
979            payment_amount: Amount::new(amount, CurrencyUnit::Msat),
980            payment_id: payment_id_str,
981        };
982
983        Ok(vec![response])
984    }
985
986    /// Check the status of an outgoing payment
987    async fn check_outgoing_payment(
988        &self,
989        request_lookup_id: &PaymentIdentifier,
990    ) -> Result<MakePaymentResponse, Self::Err> {
991        let payment_details = match request_lookup_id {
992            PaymentIdentifier::PaymentHash(id_hash) => self
993                .inner
994                .list_payments_with_filter(
995                    |p| matches!(&p.kind, PaymentKind::Bolt11 { hash, .. } if &hash.0 == id_hash),
996                )
997                .first()
998                .cloned(),
999            PaymentIdentifier::PaymentId(id) => self.inner.payment(&PaymentId(
1000                hex::decode(id)?
1001                    .try_into()
1002                    .map_err(|_| payment::Error::Custom("Invalid hex".to_string()))?,
1003            )),
1004            _ => {
1005                return Ok(MakePaymentResponse {
1006                    payment_lookup_id: request_lookup_id.clone(),
1007                    payment_proof: None,
1008                    status: MeltQuoteState::Unknown,
1009                    total_spent: Amount::new(0, CurrencyUnit::Msat),
1010                });
1011            }
1012        }
1013        .ok_or(Error::PaymentNotFound)?;
1014
1015        // This check seems reversed in the original code, so I'm fixing it here
1016        if payment_details.direction != PaymentDirection::Outbound {
1017            return Err(Error::InvalidPaymentDirection.into());
1018        }
1019
1020        let status = match payment_details.status {
1021            PaymentStatus::Pending => MeltQuoteState::Pending,
1022            PaymentStatus::Succeeded => MeltQuoteState::Paid,
1023            PaymentStatus::Failed => MeltQuoteState::Failed,
1024        };
1025
1026        let payment_proof = match payment_details.kind {
1027            PaymentKind::Bolt11 {
1028                hash: _,
1029                preimage,
1030                secret: _,
1031            } => preimage.map(|p| p.to_string()),
1032            _ => return Err(Error::UnexpectedPaymentKind.into()),
1033        };
1034
1035        let total_spent = payment_details
1036            .amount_msat
1037            .ok_or(Error::CouldNotGetAmountSpent)?;
1038
1039        Ok(MakePaymentResponse {
1040            payment_lookup_id: request_lookup_id.clone(),
1041            payment_proof,
1042            status,
1043            total_spent: Amount::new(total_spent, CurrencyUnit::Msat),
1044        })
1045    }
1046}
1047
1048impl Drop for CdkLdkNode {
1049    fn drop(&mut self) {
1050        tracing::info!("Drop called on CdkLdkNode");
1051        self.wait_invoice_cancel_token.cancel();
1052        tracing::debug!("Cancelled wait_invoice token in drop");
1053    }
1054}