Skip to main content

cdk_ldk_node/
lib.rs

1//! CDK lightning backend for ldk-node
2
3#![doc = include_str!("../README.md")]
4
5use std::net::SocketAddr;
6use std::pin::Pin;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use bip39::Mnemonic;
12use cdk_common::common::FeeReserve;
13use cdk_common::payment::{self, *};
14use cdk_common::util::{hex, unix_time};
15use cdk_common::{Amount, CurrencyUnit, MeltOptions, MeltQuoteState};
16use futures::{Stream, StreamExt};
17use ldk_node::bitcoin::hashes::Hash;
18use ldk_node::bitcoin::Network;
19use ldk_node::lightning::ln::channelmanager::PaymentId;
20use ldk_node::lightning::ln::msgs::SocketAddress;
21use ldk_node::lightning::routing::router::RouteParametersConfig;
22use ldk_node::lightning_invoice::{Bolt11InvoiceDescription, Description};
23use ldk_node::lightning_types::payment::PaymentHash;
24use ldk_node::logger::{LogLevel, LogWriter};
25use ldk_node::payment::{PaymentDirection, PaymentKind, PaymentStatus};
26use ldk_node::{Builder, Event, Node};
27use tokio_stream::wrappers::BroadcastStream;
28use tokio_util::sync::CancellationToken;
29use tracing::instrument;
30
31use crate::error::Error;
32use crate::log::StdoutLogWriter;
33
34mod error;
35mod log;
36mod web;
37
38/// CDK Lightning backend using LDK Node
39///
40/// Provides Lightning Network functionality for CDK with support for Cashu operations.
41/// Handles payment creation, processing, and event management using the Lightning Development Kit.
42#[derive(Clone)]
43pub struct CdkLdkNode {
44    inner: Arc<Node>,
45    fee_reserve: FeeReserve,
46    wait_invoice_cancel_token: CancellationToken,
47    wait_invoice_is_active: Arc<AtomicBool>,
48    sender: tokio::sync::broadcast::Sender<WaitPaymentResponse>,
49    receiver: Arc<tokio::sync::broadcast::Receiver<WaitPaymentResponse>>,
50    events_cancel_token: CancellationToken,
51    web_addr: Option<SocketAddr>,
52}
53
54impl std::fmt::Debug for CdkLdkNode {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        f.debug_struct("CdkLdkNode")
57            .field("fee_reserve", &self.fee_reserve)
58            .field("web_addr", &self.web_addr)
59            .finish_non_exhaustive()
60    }
61}
62
63/// Configuration for connecting to Bitcoin RPC
64///
65/// Contains the necessary connection parameters for Bitcoin Core RPC interface.
66#[derive(Debug, Clone)]
67pub struct BitcoinRpcConfig {
68    /// Bitcoin RPC server hostname or IP address
69    pub host: String,
70    /// Bitcoin RPC server port number
71    pub port: u16,
72    /// Username for Bitcoin RPC authentication
73    pub user: String,
74    /// Password for Bitcoin RPC authentication
75    pub password: String,
76}
77
78/// Source of blockchain data for the Lightning node
79///
80/// Specifies how the node should connect to the Bitcoin network to retrieve
81/// blockchain information and broadcast transactions.
82#[derive(Debug, Clone)]
83pub enum ChainSource {
84    /// Use an Esplora server for blockchain data
85    ///
86    /// Contains the URL of the Esplora server endpoint
87    Esplora(String),
88    /// Use Bitcoin Core RPC for blockchain data
89    ///
90    /// Contains the configuration for connecting to Bitcoin Core
91    BitcoinRpc(BitcoinRpcConfig),
92}
93
94/// Source of Lightning network gossip data
95///
96/// Specifies how the node should learn about the Lightning Network topology
97/// and routing information.
98#[derive(Debug, Clone)]
99pub enum GossipSource {
100    /// Learn gossip through peer-to-peer connections
101    ///
102    /// The node will connect to other Lightning nodes and exchange gossip data directly
103    P2P,
104    /// Use Rapid Gossip Sync for efficient gossip updates
105    ///
106    /// Contains the URL of the RGS server for compressed gossip data
107    RapidGossipSync(String),
108}
109/// A builder for an [`CdkLdkNode`] instance.
110#[derive(Debug)]
111pub struct CdkLdkNodeBuilder {
112    network: Network,
113    chain_source: ChainSource,
114    gossip_source: GossipSource,
115    log_dir_path: Option<String>,
116    storage_dir_path: String,
117    fee_reserve: FeeReserve,
118    listening_addresses: Vec<SocketAddress>,
119    seed: Option<Mnemonic>,
120    announcement_addresses: Option<Vec<SocketAddress>>,
121}
122
123impl CdkLdkNodeBuilder {
124    /// Creates a new builder instance.
125    pub fn new(
126        network: Network,
127        chain_source: ChainSource,
128        gossip_source: GossipSource,
129        storage_dir_path: String,
130        fee_reserve: FeeReserve,
131        listening_addresses: Vec<SocketAddress>,
132    ) -> Self {
133        Self {
134            network,
135            chain_source,
136            gossip_source,
137            storage_dir_path,
138            fee_reserve,
139            listening_addresses,
140            seed: None,
141            announcement_addresses: None,
142            log_dir_path: None,
143        }
144    }
145
146    /// Configures the [`CdkLdkNode`] to use the Mnemonic for entropy source configuration
147    pub fn with_seed(mut self, seed: Mnemonic) -> Self {
148        self.seed = Some(seed);
149        self
150    }
151    /// Configures the [`CdkLdkNode`] to use announce this address to the lightning network
152    pub fn with_announcement_address(mut self, announcement_addresses: Vec<SocketAddress>) -> Self {
153        self.announcement_addresses = Some(announcement_addresses);
154        self
155    }
156    /// Configures the [`CdkLdkNode`] to use announce this address to the lightning network
157    pub fn with_log_dir_path(mut self, log_dir_path: String) -> Self {
158        self.log_dir_path = Some(log_dir_path);
159        self
160    }
161
162    /// Builds the [`CdkLdkNode`] instance
163    ///
164    /// # Errors
165    /// Returns an error if the LDK node builder fails to create the node
166    pub fn build(self) -> Result<CdkLdkNode, Error> {
167        let mut ldk = Builder::new();
168        ldk.set_network(self.network);
169        tracing::info!("Storage dir of node is {}", self.storage_dir_path);
170        ldk.set_storage_dir_path(self.storage_dir_path);
171
172        match self.chain_source {
173            ChainSource::Esplora(esplora_url) => {
174                ldk.set_chain_source_esplora(esplora_url, None);
175            }
176            ChainSource::BitcoinRpc(BitcoinRpcConfig {
177                host,
178                port,
179                user,
180                password,
181            }) => {
182                ldk.set_chain_source_bitcoind_rpc(host, port, user, password);
183            }
184        }
185
186        match self.gossip_source {
187            GossipSource::P2P => {
188                ldk.set_gossip_source_p2p();
189            }
190            GossipSource::RapidGossipSync(rgs_url) => {
191                ldk.set_gossip_source_rgs(rgs_url);
192            }
193        }
194
195        ldk.set_listening_addresses(self.listening_addresses)?;
196        if self.log_dir_path.is_some() {
197            ldk.set_filesystem_logger(self.log_dir_path, Some(LogLevel::Info));
198        } else {
199            ldk.set_custom_logger(Arc::new(StdoutLogWriter));
200        }
201
202        ldk.set_node_alias("cdk-ldk-node".to_string())?;
203        // set the seed as bip39 entropy mnemonic
204        if let Some(seed) = self.seed {
205            ldk.set_entropy_bip39_mnemonic(seed, None);
206        }
207        // set the announcement addresses
208        if let Some(announcement_addresses) = self.announcement_addresses {
209            ldk.set_announcement_addresses(announcement_addresses)?;
210        }
211
212        let node = ldk.build()?;
213
214        tracing::info!("Creating tokio channel for payment notifications");
215        let (sender, receiver) = tokio::sync::broadcast::channel(8);
216
217        let id = node.node_id();
218
219        let adr = node.announcement_addresses();
220
221        tracing::info!(
222            "Created node {} with address {:?} on network {}",
223            id,
224            adr,
225            self.network
226        );
227
228        Ok(CdkLdkNode {
229            inner: node.into(),
230            fee_reserve: self.fee_reserve,
231            wait_invoice_cancel_token: CancellationToken::new(),
232            wait_invoice_is_active: Arc::new(AtomicBool::new(false)),
233            sender,
234            receiver: Arc::new(receiver),
235            events_cancel_token: CancellationToken::new(),
236            web_addr: None,
237        })
238    }
239}
240
241impl CdkLdkNode {
242    /// Set the web server address for the LDK node management interface
243    ///
244    /// # Arguments
245    /// * `addr` - Socket address for the web server. If None, no web server will be started.
246    pub fn set_web_addr(&mut self, addr: Option<SocketAddr>) {
247        self.web_addr = addr;
248    }
249
250    /// Get a default web server address using an unused port
251    ///
252    /// Returns a SocketAddr with localhost and port 0, which will cause
253    /// the system to automatically assign an available port
254    pub fn default_web_addr() -> SocketAddr {
255        SocketAddr::from(([127, 0, 0, 1], 8091))
256    }
257
258    /// Start the CDK LDK Node
259    ///
260    /// Starts the underlying LDK node and begins event processing.
261    /// Sets up event handlers to listen for Lightning events like payment received.
262    ///
263    /// # Returns
264    /// Returns `Ok(())` on successful start, error otherwise
265    ///
266    /// # Errors
267    /// Returns an error if the LDK node fails to start or event handling setup fails
268    pub fn start_ldk_node(&self) -> Result<(), Error> {
269        tracing::info!("Starting cdk-ldk node");
270        self.inner.start()?;
271        let node_config = self.inner.config();
272
273        tracing::info!("Starting node with network {}", node_config.network);
274
275        tracing::info!("Node status: {:?}", self.inner.status());
276
277        self.handle_events()?;
278
279        Ok(())
280    }
281
282    /// Start the web server for the LDK node management interface
283    ///
284    /// Starts a web server that provides a user interface for managing the LDK node.
285    /// The web interface allows users to view balances, manage channels, create invoices,
286    /// and send payments.
287    ///
288    /// # Arguments
289    /// * `web_addr` - The socket address to bind the web server to
290    ///
291    /// # Returns
292    /// Returns `Ok(())` on successful start, error otherwise
293    ///
294    /// # Errors
295    /// Returns an error if the web server fails to start
296    pub fn start_web_server(&self, web_addr: SocketAddr) -> Result<(), Error> {
297        let web_server = crate::web::WebServer::new(Arc::new(self.clone()));
298
299        tokio::spawn(async move {
300            if let Err(e) = web_server.serve(web_addr).await {
301                tracing::error!("Web server error: {}", e);
302            }
303        });
304
305        Ok(())
306    }
307
308    /// Stop the CDK LDK Node
309    ///
310    /// Gracefully stops the node by cancelling all active tasks and event handlers.
311    /// This includes:
312    /// - Cancelling the event handler task
313    /// - Cancelling any active wait_invoice streams
314    /// - Stopping the underlying LDK node
315    ///
316    /// # Returns
317    /// Returns `Ok(())` on successful shutdown, error otherwise
318    ///
319    /// # Errors
320    /// Returns an error if the underlying LDK node fails to stop
321    pub fn stop_ldk_node(&self) -> Result<(), Error> {
322        tracing::info!("Stopping CdkLdkNode");
323        // Cancel all tokio tasks
324        tracing::info!("Cancelling event handler");
325        self.events_cancel_token.cancel();
326
327        // Cancel any wait_invoice streams
328        if self.is_wait_invoice_active() {
329            tracing::info!("Cancelling wait_invoice stream");
330            self.wait_invoice_cancel_token.cancel();
331        }
332
333        // Stop the LDK node
334        tracing::info!("Stopping LDK node");
335        self.inner.stop()?;
336        tracing::info!("CdkLdkNode stopped successfully");
337        Ok(())
338    }
339
340    /// Handle payment received event
341    async fn handle_payment_received(
342        node: &Arc<Node>,
343        sender: &tokio::sync::broadcast::Sender<WaitPaymentResponse>,
344        payment_id: Option<PaymentId>,
345        payment_hash: PaymentHash,
346        amount_msat: u64,
347    ) {
348        tracing::info!(
349            "Received payment for hash={} of amount={} msat",
350            payment_hash,
351            amount_msat
352        );
353
354        let payment_id = match payment_id {
355            Some(id) => id,
356            None => {
357                tracing::warn!("Received payment without payment_id");
358                return;
359            }
360        };
361
362        let payment_id_hex = hex::encode(payment_id.0);
363
364        if amount_msat == 0 {
365            tracing::warn!("Payment of no amount");
366            return;
367        }
368
369        tracing::info!(
370            "Processing payment notification: id={}, amount={} msats",
371            payment_id_hex,
372            amount_msat
373        );
374
375        let payment_details = match node.payment(&payment_id) {
376            Some(details) => details,
377            None => {
378                tracing::error!("Could not find payment details for id={}", payment_id_hex);
379                return;
380            }
381        };
382
383        let (payment_identifier, payment_id) = match payment_details.kind {
384            PaymentKind::Bolt11 { hash, .. } => {
385                (PaymentIdentifier::PaymentHash(hash.0), hash.to_string())
386            }
387            PaymentKind::Bolt12Offer { hash, offer_id, .. } => match hash {
388                Some(h) => (
389                    PaymentIdentifier::OfferId(offer_id.to_string()),
390                    h.to_string(),
391                ),
392                None => {
393                    tracing::error!("Bolt12 payment missing hash");
394                    return;
395                }
396            },
397            k => {
398                tracing::warn!("Received payment of kind {:?} which is not supported", k);
399                return;
400            }
401        };
402
403        let wait_payment_response = WaitPaymentResponse {
404            payment_identifier,
405            payment_amount: Amount::new(amount_msat, CurrencyUnit::Msat),
406            payment_id,
407        };
408
409        match sender.send(wait_payment_response) {
410            Ok(_) => tracing::info!("Successfully sent payment notification to stream"),
411            Err(err) => tracing::error!(
412                "Could not send payment received notification on channel: {}",
413                err
414            ),
415        }
416    }
417
418    /// Set up event handling for the node
419    pub fn handle_events(&self) -> Result<(), Error> {
420        let node = self.inner.clone();
421        let sender = self.sender.clone();
422        let cancel_token = self.events_cancel_token.clone();
423
424        tracing::info!("Starting event handler task");
425
426        tokio::spawn(async move {
427            tracing::info!("Event handler loop started");
428            loop {
429                tokio::select! {
430                    _ = cancel_token.cancelled() => {
431                        tracing::info!("Event handler cancelled");
432                        break;
433                    }
434                    event = node.next_event_async() => {
435                        match event {
436                            Event::PaymentReceived {
437                                payment_id,
438                                payment_hash,
439                                amount_msat,
440                                custom_records: _
441                            } => {
442                                Self::handle_payment_received(
443                                    &node,
444                                    &sender,
445                                    payment_id,
446                                    payment_hash,
447                                    amount_msat
448                                ).await;
449                            }
450                            event => {
451                                tracing::debug!("Received other ldk node event: {:?}", event);
452                            }
453                        }
454
455                        if let Err(err) = node.event_handled() {
456                            tracing::error!("Error handling node event: {}", err);
457                        } else {
458                            tracing::debug!("Successfully handled node event");
459                        }
460                    }
461                }
462            }
463            tracing::info!("Event handler loop terminated");
464        });
465
466        tracing::info!("Event handler task spawned");
467        Ok(())
468    }
469
470    /// Get Node used
471    pub fn node(&self) -> Arc<Node> {
472        Arc::clone(&self.inner)
473    }
474}
475
476/// Mint payment trait
477#[async_trait]
478impl MintPayment for CdkLdkNode {
479    type Err = payment::Error;
480
481    /// Start the payment processor
482    /// Starts the LDK node and begins event processing
483    async fn start(&self) -> Result<(), Self::Err> {
484        self.start_ldk_node().map_err(|e| {
485            tracing::error!("Failed to start CdkLdkNode: {}", e);
486            e
487        })?;
488
489        tracing::info!("CdkLdkNode payment processor started successfully");
490
491        // Start web server if configured
492        if let Some(web_addr) = self.web_addr {
493            tracing::info!("Starting LDK Node web interface on {}", web_addr);
494            self.start_web_server(web_addr).map_err(|e| {
495                tracing::error!("Failed to start web server: {}", e);
496                e
497            })?;
498        } else {
499            tracing::info!("No web server address configured, skipping web interface");
500        }
501
502        Ok(())
503    }
504
505    /// Stop the payment processor
506    /// Gracefully stops the LDK node and cancels all background tasks
507    async fn stop(&self) -> Result<(), Self::Err> {
508        self.stop_ldk_node().map_err(|e| {
509            tracing::error!("Failed to stop CdkLdkNode: {}", e);
510            e.into()
511        })
512    }
513
514    /// Base Settings
515    async fn get_settings(&self) -> Result<SettingsResponse, Self::Err> {
516        let settings = SettingsResponse {
517            unit: CurrencyUnit::Msat.to_string(),
518            bolt11: Some(payment::Bolt11Settings {
519                mpp: false,
520                amountless: true,
521                invoice_description: true,
522            }),
523            bolt12: Some(payment::Bolt12Settings { amountless: true }),
524            custom: std::collections::HashMap::new(),
525        };
526        Ok(settings)
527    }
528
529    /// Create a new invoice
530    #[instrument(skip(self))]
531    async fn create_incoming_payment_request(
532        &self,
533        options: IncomingPaymentOptions,
534    ) -> Result<CreateIncomingPaymentResponse, Self::Err> {
535        match options {
536            IncomingPaymentOptions::Bolt11(bolt11_options) => {
537                let amount_msat: Amount = bolt11_options
538                    .amount
539                    .convert_to(&CurrencyUnit::Msat)?
540                    .into();
541                let description = bolt11_options.description.unwrap_or_default();
542                let time = bolt11_options
543                    .unix_expiry
544                    .map(|t| t - unix_time())
545                    .unwrap_or(36000);
546
547                let description = Bolt11InvoiceDescription::Direct(
548                    Description::new(description).map_err(|_| Error::InvalidDescription)?,
549                );
550
551                let payment = self
552                    .inner
553                    .bolt11_payment()
554                    .receive(amount_msat.into(), &description, time as u32)
555                    .map_err(Error::LdkNode)?;
556
557                let payment_hash = payment.payment_hash().to_string();
558                let payment_identifier = PaymentIdentifier::PaymentHash(
559                    hex::decode(&payment_hash)?
560                        .try_into()
561                        .map_err(|_| Error::InvalidPaymentHashLength)?,
562                );
563
564                Ok(CreateIncomingPaymentResponse {
565                    request_lookup_id: payment_identifier,
566                    request: payment.to_string(),
567                    expiry: Some(unix_time() + time),
568                    extra_json: None,
569                })
570            }
571            IncomingPaymentOptions::Bolt12(bolt12_options) => {
572                let Bolt12IncomingPaymentOptions {
573                    description,
574                    amount,
575                    unix_expiry,
576                } = *bolt12_options;
577
578                let time = unix_expiry.map(|t| (t - unix_time()) as u32);
579
580                let offer = match amount {
581                    Some(amount) => {
582                        let amount_msat: Amount = amount.convert_to(&CurrencyUnit::Msat)?.into();
583
584                        self.inner
585                            .bolt12_payment()
586                            .receive(
587                                amount_msat.into(),
588                                &description.unwrap_or("".to_string()),
589                                time,
590                                None,
591                            )
592                            .map_err(Error::LdkNode)?
593                    }
594                    None => self
595                        .inner
596                        .bolt12_payment()
597                        .receive_variable_amount(&description.unwrap_or("".to_string()), time)
598                        .map_err(Error::LdkNode)?,
599                };
600                let payment_identifier = PaymentIdentifier::OfferId(offer.id().to_string());
601
602                Ok(CreateIncomingPaymentResponse {
603                    request_lookup_id: payment_identifier,
604                    request: offer.to_string(),
605                    expiry: time.map(|a| a as u64),
606                    extra_json: None,
607                })
608            }
609            cdk_common::payment::IncomingPaymentOptions::Custom(_) => {
610                Err(cdk_common::payment::Error::UnsupportedPaymentOption)
611            }
612        }
613    }
614
615    /// Get payment quote
616    /// Used to get fee and amount required for a payment request
617    #[instrument(skip_all)]
618    async fn get_payment_quote(
619        &self,
620        unit: &CurrencyUnit,
621        options: OutgoingPaymentOptions,
622    ) -> Result<PaymentQuoteResponse, Self::Err> {
623        match options {
624            cdk_common::payment::OutgoingPaymentOptions::Custom(_) => {
625                Err(cdk_common::payment::Error::UnsupportedPaymentOption)
626            }
627            OutgoingPaymentOptions::Bolt11(bolt11_options) => {
628                let bolt11 = bolt11_options.bolt11;
629
630                let amount_msat = match bolt11_options.melt_options {
631                    Some(melt_options) => melt_options.amount_msat(),
632                    None => bolt11
633                        .amount_milli_satoshis()
634                        .ok_or(Error::UnknownInvoiceAmount)?
635                        .into(),
636                };
637
638                let amount =
639                    Amount::new(amount_msat.into(), CurrencyUnit::Msat).convert_to(unit)?;
640
641                let relative_fee_reserve =
642                    (self.fee_reserve.percent_fee_reserve * amount.value() as f32) as u64;
643
644                let absolute_fee_reserve: u64 = self.fee_reserve.min_fee_reserve.into();
645
646                let fee = match relative_fee_reserve > absolute_fee_reserve {
647                    true => relative_fee_reserve,
648                    false => absolute_fee_reserve,
649                };
650
651                let payment_hash = bolt11.payment_hash().to_string();
652                let payment_hash_bytes = hex::decode(&payment_hash)?
653                    .try_into()
654                    .map_err(|_| Error::InvalidPaymentHashLength)?;
655
656                Ok(PaymentQuoteResponse {
657                    request_lookup_id: Some(PaymentIdentifier::PaymentHash(payment_hash_bytes)),
658                    amount,
659                    fee: Amount::new(fee, unit.clone()),
660                    state: MeltQuoteState::Unpaid,
661                })
662            }
663            OutgoingPaymentOptions::Bolt12(bolt12_options) => {
664                let offer = bolt12_options.offer;
665
666                let amount_msat = match bolt12_options.melt_options {
667                    Some(melt_options) => melt_options.amount_msat(),
668                    None => {
669                        let amount = offer.amount().ok_or(payment::Error::AmountMismatch)?;
670
671                        match amount {
672                            ldk_node::lightning::offers::offer::Amount::Bitcoin {
673                                amount_msats,
674                            } => amount_msats.into(),
675                            _ => return Err(payment::Error::AmountMismatch),
676                        }
677                    }
678                };
679                let amount =
680                    Amount::new(amount_msat.into(), CurrencyUnit::Msat).convert_to(unit)?;
681
682                let relative_fee_reserve =
683                    (self.fee_reserve.percent_fee_reserve * amount.value() as f32) as u64;
684
685                let absolute_fee_reserve: u64 = self.fee_reserve.min_fee_reserve.into();
686
687                let fee = match relative_fee_reserve > absolute_fee_reserve {
688                    true => relative_fee_reserve,
689                    false => absolute_fee_reserve,
690                };
691
692                Ok(PaymentQuoteResponse {
693                    request_lookup_id: None,
694                    amount,
695                    fee: Amount::new(fee, unit.clone()),
696                    state: MeltQuoteState::Unpaid,
697                })
698            }
699        }
700    }
701
702    /// Pay request
703    #[instrument(skip(self, options))]
704    async fn make_payment(
705        &self,
706        unit: &CurrencyUnit,
707        options: OutgoingPaymentOptions,
708    ) -> Result<MakePaymentResponse, Self::Err> {
709        match options {
710            cdk_common::payment::OutgoingPaymentOptions::Custom(_) => {
711                Err(cdk_common::payment::Error::UnsupportedPaymentOption)
712            }
713            OutgoingPaymentOptions::Bolt11(bolt11_options) => {
714                let bolt11 = bolt11_options.bolt11;
715
716                let send_params = match bolt11_options
717                    .max_fee_amount
718                    .map(|f| {
719                        f.convert_to(&CurrencyUnit::Msat)
720                            .map(|amount_msat| RouteParametersConfig {
721                                max_total_routing_fee_msat: Some(amount_msat.value()),
722                                ..Default::default()
723                            })
724                    })
725                    .transpose()
726                {
727                    Ok(params) => params,
728                    Err(err) => {
729                        tracing::error!("Failed to convert fee amount: {}", err);
730                        return Err(payment::Error::Custom(format!("Invalid fee amount: {err}")));
731                    }
732                };
733
734                let payment_id = match bolt11_options.melt_options {
735                    Some(MeltOptions::Amountless { amountless }) => self
736                        .inner
737                        .bolt11_payment()
738                        .send_using_amount(&bolt11, amountless.amount_msat.into(), send_params)
739                        .map_err(|err| {
740                            tracing::error!("Could not send send amountless bolt11: {}", err);
741                            Error::CouldNotSendBolt11WithoutAmount
742                        })?,
743                    None => self
744                        .inner
745                        .bolt11_payment()
746                        .send(&bolt11, send_params)
747                        .map_err(|err| {
748                            tracing::error!("Could not send bolt11 {}", err);
749                            Error::CouldNotSendBolt11
750                        })?,
751                    _ => return Err(payment::Error::UnsupportedPaymentOption),
752                };
753
754                // Check payment status for up to 10 seconds
755                let start = std::time::Instant::now();
756                let timeout = std::time::Duration::from_secs(10);
757
758                let (status, payment_details) = loop {
759                    let details = self
760                        .inner
761                        .payment(&payment_id)
762                        .ok_or(Error::PaymentNotFound)?;
763
764                    match details.status {
765                        PaymentStatus::Succeeded => break (MeltQuoteState::Paid, details),
766                        PaymentStatus::Failed => {
767                            tracing::error!("Failed to pay bolt11 payment.");
768                            break (MeltQuoteState::Failed, details);
769                        }
770                        PaymentStatus::Pending => {
771                            if start.elapsed() > timeout {
772                                tracing::warn!(
773                                    "Paying bolt11 exceeded timeout 10 seconds no longer waitning."
774                                );
775                                break (MeltQuoteState::Pending, details);
776                            }
777                            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
778                            continue;
779                        }
780                    }
781                };
782
783                let payment_proof = match payment_details.kind {
784                    PaymentKind::Bolt11 {
785                        hash: _,
786                        preimage,
787                        secret: _,
788                    } => preimage.map(|p| p.to_string()),
789                    _ => return Err(Error::UnexpectedPaymentKind.into()),
790                };
791
792                let total_spent = payment_details
793                    .amount_msat
794                    .ok_or(Error::CouldNotGetAmountSpent)?
795                    + payment_details.fee_paid_msat.unwrap_or_default();
796
797                let total_spent = Amount::new(total_spent, CurrencyUnit::Msat).convert_to(unit)?;
798
799                Ok(MakePaymentResponse {
800                    payment_lookup_id: PaymentIdentifier::PaymentHash(
801                        bolt11.payment_hash().to_byte_array(),
802                    ),
803                    payment_proof,
804                    status,
805                    total_spent,
806                })
807            }
808            OutgoingPaymentOptions::Bolt12(bolt12_options) => {
809                let offer = bolt12_options.offer;
810
811                let payment_id = match bolt12_options.melt_options {
812                    Some(MeltOptions::Amountless { amountless }) => self
813                        .inner
814                        .bolt12_payment()
815                        .send_using_amount(&offer, amountless.amount_msat.into(), None, None, None)
816                        .map_err(Error::LdkNode)?,
817                    None => self
818                        .inner
819                        .bolt12_payment()
820                        .send(&offer, None, None, None)
821                        .map_err(Error::LdkNode)?,
822                    _ => return Err(payment::Error::UnsupportedPaymentOption),
823                };
824
825                // Check payment status for up to 10 seconds
826                let start = std::time::Instant::now();
827                let timeout = std::time::Duration::from_secs(10);
828
829                let (status, payment_details) = loop {
830                    let details = self
831                        .inner
832                        .payment(&payment_id)
833                        .ok_or(Error::PaymentNotFound)?;
834
835                    match details.status {
836                        PaymentStatus::Succeeded => break (MeltQuoteState::Paid, details),
837                        PaymentStatus::Failed => {
838                            tracing::error!("Payment with id {} failed.", payment_id);
839                            break (MeltQuoteState::Failed, details);
840                        }
841                        PaymentStatus::Pending => {
842                            if start.elapsed() > timeout {
843                                tracing::warn!(
844                                    "Payment has been being for 10 seconds. No longer waiting"
845                                );
846                                break (MeltQuoteState::Pending, details);
847                            }
848                            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
849                            continue;
850                        }
851                    }
852                };
853
854                let payment_proof = match payment_details.kind {
855                    PaymentKind::Bolt12Offer {
856                        hash: _,
857                        preimage,
858                        secret: _,
859                        offer_id: _,
860                        payer_note: _,
861                        quantity: _,
862                    } => preimage.map(|p| p.to_string()),
863                    _ => return Err(Error::UnexpectedPaymentKind.into()),
864                };
865
866                let total_spent = payment_details
867                    .amount_msat
868                    .ok_or(Error::CouldNotGetAmountSpent)?
869                    + payment_details.fee_paid_msat.unwrap_or_default();
870
871                let total_spent = Amount::new(total_spent, CurrencyUnit::Msat).convert_to(unit)?;
872
873                Ok(MakePaymentResponse {
874                    payment_lookup_id: PaymentIdentifier::PaymentId(payment_id.0),
875                    payment_proof,
876                    status,
877                    total_spent,
878                })
879            }
880        }
881    }
882
883    /// Listen for invoices to be paid to the mint
884    /// Returns a stream of request_lookup_id once invoices are paid
885    #[instrument(skip(self))]
886    async fn wait_payment_event(
887        &self,
888    ) -> Result<Pin<Box<dyn Stream<Item = cdk_common::payment::Event> + Send>>, Self::Err> {
889        tracing::info!("Starting stream for invoices - wait_any_incoming_payment called");
890
891        // Set active flag to indicate stream is active
892        self.wait_invoice_is_active.store(true, Ordering::SeqCst);
893        tracing::debug!("wait_invoice_is_active set to true");
894
895        let receiver = self.receiver.clone();
896
897        tracing::info!("Receiver obtained successfully, creating response stream");
898
899        // Transform the String stream into a WaitPaymentResponse stream
900        let response_stream = BroadcastStream::new(receiver.resubscribe());
901
902        // Map the stream to handle BroadcastStreamRecvError and wrap in Event
903        let response_stream = response_stream.filter_map(|result| async move {
904            match result {
905                Ok(payment) => Some(cdk_common::payment::Event::PaymentReceived(payment)),
906                Err(err) => {
907                    tracing::warn!("Error in broadcast stream: {}", err);
908                    None
909                }
910            }
911        });
912
913        // Create a combined stream that also handles cancellation
914        let cancel_token = self.wait_invoice_cancel_token.clone();
915        let is_active = self.wait_invoice_is_active.clone();
916
917        let stream = Box::pin(response_stream);
918
919        // Set up a task to clean up when the stream is dropped
920        tokio::spawn(async move {
921            cancel_token.cancelled().await;
922            tracing::info!("wait_invoice stream cancelled");
923            is_active.store(false, Ordering::SeqCst);
924        });
925
926        tracing::info!("wait_any_incoming_payment returning stream");
927        Ok(stream)
928    }
929
930    /// Is wait invoice active
931    fn is_wait_invoice_active(&self) -> bool {
932        self.wait_invoice_is_active.load(Ordering::SeqCst)
933    }
934
935    /// Cancel wait invoice
936    fn cancel_wait_invoice(&self) {
937        self.wait_invoice_cancel_token.cancel()
938    }
939
940    /// Check the status of an incoming payment
941    async fn check_incoming_payment_status(
942        &self,
943        payment_identifier: &PaymentIdentifier,
944    ) -> Result<Vec<WaitPaymentResponse>, Self::Err> {
945        let payment_id_str = match payment_identifier {
946            PaymentIdentifier::PaymentHash(hash) => hex::encode(hash),
947            PaymentIdentifier::CustomId(id) => id.clone(),
948            _ => return Err(Error::UnsupportedPaymentIdentifierType.into()),
949        };
950
951        let payment_id = PaymentId(
952            hex::decode(&payment_id_str)?
953                .try_into()
954                .map_err(|_| Error::InvalidPaymentIdLength)?,
955        );
956
957        let payment_details = self
958            .inner
959            .payment(&payment_id)
960            .ok_or(Error::PaymentNotFound)?;
961
962        if payment_details.direction == PaymentDirection::Outbound {
963            return Err(Error::InvalidPaymentDirection.into());
964        }
965
966        let amount = if payment_details.status == PaymentStatus::Succeeded {
967            payment_details
968                .amount_msat
969                .ok_or(Error::CouldNotGetPaymentAmount)?
970        } else {
971            return Ok(vec![]);
972        };
973
974        let response = WaitPaymentResponse {
975            payment_identifier: payment_identifier.clone(),
976            payment_amount: Amount::new(amount, CurrencyUnit::Msat),
977            payment_id: payment_id_str,
978        };
979
980        Ok(vec![response])
981    }
982
983    /// Check the status of an outgoing payment
984    async fn check_outgoing_payment(
985        &self,
986        request_lookup_id: &PaymentIdentifier,
987    ) -> Result<MakePaymentResponse, Self::Err> {
988        let payment_details = match request_lookup_id {
989            PaymentIdentifier::PaymentHash(id_hash) => self
990                .inner
991                .list_payments_with_filter(
992                    |p| matches!(&p.kind, PaymentKind::Bolt11 { hash, .. } if &hash.0 == id_hash),
993                )
994                .first()
995                .cloned(),
996            PaymentIdentifier::PaymentId(id) => self.inner.payment(&PaymentId(
997                hex::decode(id)?
998                    .try_into()
999                    .map_err(|_| payment::Error::Custom("Invalid hex".to_string()))?,
1000            )),
1001            _ => {
1002                return Ok(MakePaymentResponse {
1003                    payment_lookup_id: request_lookup_id.clone(),
1004                    payment_proof: None,
1005                    status: MeltQuoteState::Unknown,
1006                    total_spent: Amount::new(0, CurrencyUnit::Msat),
1007                });
1008            }
1009        }
1010        .ok_or(Error::PaymentNotFound)?;
1011
1012        // This check seems reversed in the original code, so I'm fixing it here
1013        if payment_details.direction != PaymentDirection::Outbound {
1014            return Err(Error::InvalidPaymentDirection.into());
1015        }
1016
1017        let status = match payment_details.status {
1018            PaymentStatus::Pending => MeltQuoteState::Pending,
1019            PaymentStatus::Succeeded => MeltQuoteState::Paid,
1020            PaymentStatus::Failed => MeltQuoteState::Failed,
1021        };
1022
1023        let payment_proof = match payment_details.kind {
1024            PaymentKind::Bolt11 {
1025                hash: _,
1026                preimage,
1027                secret: _,
1028            } => preimage.map(|p| p.to_string()),
1029            _ => return Err(Error::UnexpectedPaymentKind.into()),
1030        };
1031
1032        let total_spent = payment_details
1033            .amount_msat
1034            .ok_or(Error::CouldNotGetAmountSpent)?;
1035
1036        Ok(MakePaymentResponse {
1037            payment_lookup_id: request_lookup_id.clone(),
1038            payment_proof,
1039            status,
1040            total_spent: Amount::new(total_spent, CurrencyUnit::Msat),
1041        })
1042    }
1043}
1044
1045impl Drop for CdkLdkNode {
1046    fn drop(&mut self) {
1047        tracing::info!("Drop called on CdkLdkNode");
1048        self.wait_invoice_cancel_token.cancel();
1049        tracing::debug!("Cancelled wait_invoice token in drop");
1050    }
1051}