cdk_ldk_node/
lib.rs

1//! CDK lightning backend for ldk-node
2
3#![doc = include_str!("../README.md")]
4#![warn(missing_docs)]
5#![warn(rustdoc::bare_urls)]
6
7use std::net::SocketAddr;
8use std::pin::Pin;
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::Arc;
11
12use async_trait::async_trait;
13use cdk_common::amount::to_unit;
14use cdk_common::common::FeeReserve;
15use cdk_common::payment::{self, *};
16use cdk_common::util::{hex, unix_time};
17use cdk_common::{Amount, CurrencyUnit, MeltOptions, MeltQuoteState};
18use futures::{Stream, StreamExt};
19use ldk_node::bitcoin::hashes::Hash;
20use ldk_node::bitcoin::Network;
21use ldk_node::lightning::ln::channelmanager::PaymentId;
22use ldk_node::lightning::ln::msgs::SocketAddress;
23use ldk_node::lightning_invoice::{Bolt11InvoiceDescription, Description};
24use ldk_node::lightning_types::payment::PaymentHash;
25use ldk_node::payment::{PaymentDirection, PaymentKind, PaymentStatus, SendingParameters};
26use ldk_node::{Builder, Event, Node};
27use tokio::runtime::Runtime;
28use tokio_stream::wrappers::BroadcastStream;
29use tokio_util::sync::CancellationToken;
30use tracing::instrument;
31
32use crate::error::Error;
33
34mod error;
35mod web;
36
37/// CDK Lightning backend using LDK Node
38///
39/// Provides Lightning Network functionality for CDK with support for Cashu operations.
40/// Handles payment creation, processing, and event management using the Lightning Development Kit.
41#[derive(Clone)]
42pub struct CdkLdkNode {
43    inner: Arc<Node>,
44    fee_reserve: FeeReserve,
45    wait_invoice_cancel_token: CancellationToken,
46    wait_invoice_is_active: Arc<AtomicBool>,
47    sender: tokio::sync::broadcast::Sender<WaitPaymentResponse>,
48    receiver: Arc<tokio::sync::broadcast::Receiver<WaitPaymentResponse>>,
49    events_cancel_token: CancellationToken,
50    runtime: Option<Arc<Runtime>>,
51    web_addr: Option<SocketAddr>,
52}
53
54/// Configuration for connecting to Bitcoin RPC
55///
56/// Contains the necessary connection parameters for Bitcoin Core RPC interface.
57#[derive(Debug, Clone)]
58pub struct BitcoinRpcConfig {
59    /// Bitcoin RPC server hostname or IP address
60    pub host: String,
61    /// Bitcoin RPC server port number
62    pub port: u16,
63    /// Username for Bitcoin RPC authentication
64    pub user: String,
65    /// Password for Bitcoin RPC authentication
66    pub password: String,
67}
68
69/// Source of blockchain data for the Lightning node
70///
71/// Specifies how the node should connect to the Bitcoin network to retrieve
72/// blockchain information and broadcast transactions.
73#[derive(Debug, Clone)]
74pub enum ChainSource {
75    /// Use an Esplora server for blockchain data
76    ///
77    /// Contains the URL of the Esplora server endpoint
78    Esplora(String),
79    /// Use Bitcoin Core RPC for blockchain data
80    ///
81    /// Contains the configuration for connecting to Bitcoin Core
82    BitcoinRpc(BitcoinRpcConfig),
83}
84
85/// Source of Lightning network gossip data
86///
87/// Specifies how the node should learn about the Lightning Network topology
88/// and routing information.
89#[derive(Debug, Clone)]
90pub enum GossipSource {
91    /// Learn gossip through peer-to-peer connections
92    ///
93    /// The node will connect to other Lightning nodes and exchange gossip data directly
94    P2P,
95    /// Use Rapid Gossip Sync for efficient gossip updates
96    ///
97    /// Contains the URL of the RGS server for compressed gossip data
98    RapidGossipSync(String),
99}
100
101impl CdkLdkNode {
102    /// Create a new CDK LDK Node instance
103    ///
104    /// # Arguments
105    /// * `network` - Bitcoin network (mainnet, testnet, regtest, signet)
106    /// * `chain_source` - Source of blockchain data (Esplora or Bitcoin RPC)
107    /// * `gossip_source` - Source of Lightning network gossip data
108    /// * `storage_dir_path` - Directory path for node data storage
109    /// * `fee_reserve` - Fee reserve configuration for payments
110    /// * `listening_address` - Socket addresses for peer connections
111    /// * `runtime` - Optional Tokio runtime to use for starting the node
112    ///
113    /// # Returns
114    /// A new `CdkLdkNode` instance ready to be started
115    ///
116    /// # Errors
117    /// Returns an error if the LDK node builder fails to create the node
118    pub fn new(
119        network: Network,
120        chain_source: ChainSource,
121        gossip_source: GossipSource,
122        storage_dir_path: String,
123        fee_reserve: FeeReserve,
124        listening_address: Vec<SocketAddress>,
125        runtime: Option<Arc<Runtime>>,
126    ) -> Result<Self, Error> {
127        let mut builder = Builder::new();
128        builder.set_network(network);
129        tracing::info!("Storage dir of node is {}", storage_dir_path);
130        builder.set_storage_dir_path(storage_dir_path);
131
132        match chain_source {
133            ChainSource::Esplora(esplora_url) => {
134                builder.set_chain_source_esplora(esplora_url, None);
135            }
136            ChainSource::BitcoinRpc(BitcoinRpcConfig {
137                host,
138                port,
139                user,
140                password,
141            }) => {
142                builder.set_chain_source_bitcoind_rpc(host, port, user, password);
143            }
144        }
145
146        match gossip_source {
147            GossipSource::P2P => {
148                builder.set_gossip_source_p2p();
149            }
150            GossipSource::RapidGossipSync(rgs_url) => {
151                builder.set_gossip_source_rgs(rgs_url);
152            }
153        }
154
155        builder.set_listening_addresses(listening_address)?;
156
157        builder.set_node_alias("cdk-ldk-node".to_string())?;
158
159        let node = builder.build()?;
160
161        tracing::info!("Creating tokio channel for payment notifications");
162        let (sender, receiver) = tokio::sync::broadcast::channel(8);
163
164        let id = node.node_id();
165
166        let adr = node.announcement_addresses();
167
168        tracing::info!(
169            "Created node {} with address {:?} on network {}",
170            id,
171            adr,
172            network
173        );
174
175        Ok(Self {
176            inner: node.into(),
177            fee_reserve,
178            wait_invoice_cancel_token: CancellationToken::new(),
179            wait_invoice_is_active: Arc::new(AtomicBool::new(false)),
180            sender,
181            receiver: Arc::new(receiver),
182            events_cancel_token: CancellationToken::new(),
183            runtime,
184            web_addr: None,
185        })
186    }
187
188    /// Set the web server address for the LDK node management interface
189    ///
190    /// # Arguments
191    /// * `addr` - Socket address for the web server. If None, no web server will be started.
192    pub fn set_web_addr(&mut self, addr: Option<SocketAddr>) {
193        self.web_addr = addr;
194    }
195
196    /// Get a default web server address using an unused port
197    ///
198    /// Returns a SocketAddr with localhost and port 0, which will cause
199    /// the system to automatically assign an available port
200    pub fn default_web_addr() -> SocketAddr {
201        SocketAddr::from(([127, 0, 0, 1], 8091))
202    }
203
204    /// Start the CDK LDK Node
205    ///
206    /// Starts the underlying LDK node and begins event processing.
207    /// Sets up event handlers to listen for Lightning events like payment received.
208    ///
209    /// # Returns
210    /// Returns `Ok(())` on successful start, error otherwise
211    ///
212    /// # Errors
213    /// Returns an error if the LDK node fails to start or event handling setup fails
214    pub fn start_ldk_node(&self) -> Result<(), Error> {
215        match &self.runtime {
216            Some(runtime) => {
217                tracing::info!("Starting cdk-ldk node with existing runtime");
218                self.inner.start_with_runtime(Arc::clone(runtime))?
219            }
220            None => {
221                tracing::info!("Starting cdk-ldk-node with new runtime");
222                self.inner.start()?
223            }
224        };
225        let node_config = self.inner.config();
226
227        tracing::info!("Starting node with network {}", node_config.network);
228
229        tracing::info!("Node status: {:?}", self.inner.status());
230
231        self.handle_events()?;
232
233        Ok(())
234    }
235
236    /// Start the web server for the LDK node management interface
237    ///
238    /// Starts a web server that provides a user interface for managing the LDK node.
239    /// The web interface allows users to view balances, manage channels, create invoices,
240    /// and send payments.
241    ///
242    /// # Arguments
243    /// * `web_addr` - The socket address to bind the web server to
244    ///
245    /// # Returns
246    /// Returns `Ok(())` on successful start, error otherwise
247    ///
248    /// # Errors
249    /// Returns an error if the web server fails to start
250    pub fn start_web_server(&self, web_addr: SocketAddr) -> Result<(), Error> {
251        let web_server = crate::web::WebServer::new(Arc::new(self.clone()));
252
253        tokio::spawn(async move {
254            if let Err(e) = web_server.serve(web_addr).await {
255                tracing::error!("Web server error: {}", e);
256            }
257        });
258
259        Ok(())
260    }
261
262    /// Stop the CDK LDK Node
263    ///
264    /// Gracefully stops the node by cancelling all active tasks and event handlers.
265    /// This includes:
266    /// - Cancelling the event handler task
267    /// - Cancelling any active wait_invoice streams
268    /// - Stopping the underlying LDK node
269    ///
270    /// # Returns
271    /// Returns `Ok(())` on successful shutdown, error otherwise
272    ///
273    /// # Errors
274    /// Returns an error if the underlying LDK node fails to stop
275    pub fn stop_ldk_node(&self) -> Result<(), Error> {
276        tracing::info!("Stopping CdkLdkNode");
277        // Cancel all tokio tasks
278        tracing::info!("Cancelling event handler");
279        self.events_cancel_token.cancel();
280
281        // Cancel any wait_invoice streams
282        if self.is_wait_invoice_active() {
283            tracing::info!("Cancelling wait_invoice stream");
284            self.wait_invoice_cancel_token.cancel();
285        }
286
287        // Stop the LDK node
288        tracing::info!("Stopping LDK node");
289        self.inner.stop()?;
290        tracing::info!("CdkLdkNode stopped successfully");
291        Ok(())
292    }
293
294    /// Handle payment received event
295    async fn handle_payment_received(
296        node: &Arc<Node>,
297        sender: &tokio::sync::broadcast::Sender<WaitPaymentResponse>,
298        payment_id: Option<PaymentId>,
299        payment_hash: PaymentHash,
300        amount_msat: u64,
301    ) {
302        tracing::info!(
303            "Received payment for hash={} of amount={} msat",
304            payment_hash,
305            amount_msat
306        );
307
308        let payment_id = match payment_id {
309            Some(id) => id,
310            None => {
311                tracing::warn!("Received payment without payment_id");
312                return;
313            }
314        };
315
316        let payment_id_hex = hex::encode(payment_id.0);
317
318        if amount_msat == 0 {
319            tracing::warn!("Payment of no amount");
320            return;
321        }
322
323        tracing::info!(
324            "Processing payment notification: id={}, amount={} msats",
325            payment_id_hex,
326            amount_msat
327        );
328
329        let payment_details = match node.payment(&payment_id) {
330            Some(details) => details,
331            None => {
332                tracing::error!("Could not find payment details for id={}", payment_id_hex);
333                return;
334            }
335        };
336
337        let (payment_identifier, payment_id) = match payment_details.kind {
338            PaymentKind::Bolt11 { hash, .. } => {
339                (PaymentIdentifier::PaymentHash(hash.0), hash.to_string())
340            }
341            PaymentKind::Bolt12Offer { hash, offer_id, .. } => match hash {
342                Some(h) => (
343                    PaymentIdentifier::OfferId(offer_id.to_string()),
344                    h.to_string(),
345                ),
346                None => {
347                    tracing::error!("Bolt12 payment missing hash");
348                    return;
349                }
350            },
351            k => {
352                tracing::warn!("Received payment of kind {:?} which is not supported", k);
353                return;
354            }
355        };
356
357        let wait_payment_response = WaitPaymentResponse {
358            payment_identifier,
359            payment_amount: amount_msat.into(),
360            unit: CurrencyUnit::Msat,
361            payment_id,
362        };
363
364        match sender.send(wait_payment_response) {
365            Ok(_) => tracing::info!("Successfully sent payment notification to stream"),
366            Err(err) => tracing::error!(
367                "Could not send payment received notification on channel: {}",
368                err
369            ),
370        }
371    }
372
373    /// Set up event handling for the node
374    pub fn handle_events(&self) -> Result<(), Error> {
375        let node = self.inner.clone();
376        let sender = self.sender.clone();
377        let cancel_token = self.events_cancel_token.clone();
378
379        tracing::info!("Starting event handler task");
380
381        tokio::spawn(async move {
382            tracing::info!("Event handler loop started");
383            loop {
384                tokio::select! {
385                    _ = cancel_token.cancelled() => {
386                        tracing::info!("Event handler cancelled");
387                        break;
388                    }
389                    event = node.next_event_async() => {
390                        match event {
391                            Event::PaymentReceived {
392                                payment_id,
393                                payment_hash,
394                                amount_msat,
395                                custom_records: _
396                            } => {
397                                Self::handle_payment_received(
398                                    &node,
399                                    &sender,
400                                    payment_id,
401                                    payment_hash,
402                                    amount_msat
403                                ).await;
404                            }
405                            event => {
406                                tracing::debug!("Received other ldk node event: {:?}", event);
407                            }
408                        }
409
410                        if let Err(err) = node.event_handled() {
411                            tracing::error!("Error handling node event: {}", err);
412                        } else {
413                            tracing::debug!("Successfully handled node event");
414                        }
415                    }
416                }
417            }
418            tracing::info!("Event handler loop terminated");
419        });
420
421        tracing::info!("Event handler task spawned");
422        Ok(())
423    }
424
425    /// Get Node used
426    pub fn node(&self) -> Arc<Node> {
427        Arc::clone(&self.inner)
428    }
429}
430
431/// Mint payment trait
432#[async_trait]
433impl MintPayment for CdkLdkNode {
434    type Err = payment::Error;
435
436    /// Start the payment processor
437    /// Starts the LDK node and begins event processing
438    async fn start(&self) -> Result<(), Self::Err> {
439        self.start_ldk_node().map_err(|e| {
440            tracing::error!("Failed to start CdkLdkNode: {}", e);
441            e
442        })?;
443
444        tracing::info!("CdkLdkNode payment processor started successfully");
445
446        // Start web server if configured
447        if let Some(web_addr) = self.web_addr {
448            tracing::info!("Starting LDK Node web interface on {}", web_addr);
449            self.start_web_server(web_addr).map_err(|e| {
450                tracing::error!("Failed to start web server: {}", e);
451                e
452            })?;
453        } else {
454            tracing::info!("No web server address configured, skipping web interface");
455        }
456
457        Ok(())
458    }
459
460    /// Stop the payment processor
461    /// Gracefully stops the LDK node and cancels all background tasks
462    async fn stop(&self) -> Result<(), Self::Err> {
463        self.stop_ldk_node().map_err(|e| {
464            tracing::error!("Failed to stop CdkLdkNode: {}", e);
465            e.into()
466        })
467    }
468
469    /// Base Settings
470    async fn get_settings(&self) -> Result<serde_json::Value, Self::Err> {
471        let settings = Bolt11Settings {
472            mpp: false,
473            unit: CurrencyUnit::Msat,
474            invoice_description: true,
475            amountless: true,
476            bolt12: true,
477        };
478        Ok(serde_json::to_value(settings)?)
479    }
480
481    /// Create a new invoice
482    #[instrument(skip(self))]
483    async fn create_incoming_payment_request(
484        &self,
485        unit: &CurrencyUnit,
486        options: IncomingPaymentOptions,
487    ) -> Result<CreateIncomingPaymentResponse, Self::Err> {
488        match options {
489            IncomingPaymentOptions::Bolt11(bolt11_options) => {
490                let amount_msat = to_unit(bolt11_options.amount, unit, &CurrencyUnit::Msat)?;
491                let description = bolt11_options.description.unwrap_or_default();
492                let time = bolt11_options
493                    .unix_expiry
494                    .map(|t| t - unix_time())
495                    .unwrap_or(36000);
496
497                let description = Bolt11InvoiceDescription::Direct(
498                    Description::new(description).map_err(|_| Error::InvalidDescription)?,
499                );
500
501                let payment = self
502                    .inner
503                    .bolt11_payment()
504                    .receive(amount_msat.into(), &description, time as u32)
505                    .map_err(Error::LdkNode)?;
506
507                let payment_hash = payment.payment_hash().to_string();
508                let payment_identifier = PaymentIdentifier::PaymentHash(
509                    hex::decode(&payment_hash)?
510                        .try_into()
511                        .map_err(|_| Error::InvalidPaymentHashLength)?,
512                );
513
514                Ok(CreateIncomingPaymentResponse {
515                    request_lookup_id: payment_identifier,
516                    request: payment.to_string(),
517                    expiry: Some(unix_time() + time),
518                })
519            }
520            IncomingPaymentOptions::Bolt12(bolt12_options) => {
521                let Bolt12IncomingPaymentOptions {
522                    description,
523                    amount,
524                    unix_expiry,
525                } = *bolt12_options;
526
527                let time = unix_expiry.map(|t| (t - unix_time()) as u32);
528
529                let offer = match amount {
530                    Some(amount) => {
531                        let amount_msat = to_unit(amount, unit, &CurrencyUnit::Msat)?;
532
533                        self.inner
534                            .bolt12_payment()
535                            .receive(
536                                amount_msat.into(),
537                                &description.unwrap_or("".to_string()),
538                                time,
539                                None,
540                            )
541                            .map_err(Error::LdkNode)?
542                    }
543                    None => self
544                        .inner
545                        .bolt12_payment()
546                        .receive_variable_amount(&description.unwrap_or("".to_string()), time)
547                        .map_err(Error::LdkNode)?,
548                };
549                let payment_identifier = PaymentIdentifier::OfferId(offer.id().to_string());
550
551                Ok(CreateIncomingPaymentResponse {
552                    request_lookup_id: payment_identifier,
553                    request: offer.to_string(),
554                    expiry: time.map(|a| a as u64),
555                })
556            }
557        }
558    }
559
560    /// Get payment quote
561    /// Used to get fee and amount required for a payment request
562    #[instrument(skip_all)]
563    async fn get_payment_quote(
564        &self,
565        unit: &CurrencyUnit,
566        options: OutgoingPaymentOptions,
567    ) -> Result<PaymentQuoteResponse, Self::Err> {
568        match options {
569            OutgoingPaymentOptions::Bolt11(bolt11_options) => {
570                let bolt11 = bolt11_options.bolt11;
571
572                let amount_msat = match bolt11_options.melt_options {
573                    Some(melt_options) => melt_options.amount_msat(),
574                    None => bolt11
575                        .amount_milli_satoshis()
576                        .ok_or(Error::UnknownInvoiceAmount)?
577                        .into(),
578                };
579
580                let amount = to_unit(amount_msat, &CurrencyUnit::Msat, unit)?;
581
582                let relative_fee_reserve =
583                    (self.fee_reserve.percent_fee_reserve * u64::from(amount) as f32) as u64;
584
585                let absolute_fee_reserve: u64 = self.fee_reserve.min_fee_reserve.into();
586
587                let fee = match relative_fee_reserve > absolute_fee_reserve {
588                    true => relative_fee_reserve,
589                    false => absolute_fee_reserve,
590                };
591
592                let payment_hash = bolt11.payment_hash().to_string();
593                let payment_hash_bytes = hex::decode(&payment_hash)?
594                    .try_into()
595                    .map_err(|_| Error::InvalidPaymentHashLength)?;
596
597                Ok(PaymentQuoteResponse {
598                    request_lookup_id: Some(PaymentIdentifier::PaymentHash(payment_hash_bytes)),
599                    amount,
600                    fee: fee.into(),
601                    state: MeltQuoteState::Unpaid,
602                    unit: unit.clone(),
603                })
604            }
605            OutgoingPaymentOptions::Bolt12(bolt12_options) => {
606                let offer = bolt12_options.offer;
607
608                let amount_msat = match bolt12_options.melt_options {
609                    Some(melt_options) => melt_options.amount_msat(),
610                    None => {
611                        let amount = offer.amount().ok_or(payment::Error::AmountMismatch)?;
612
613                        match amount {
614                            ldk_node::lightning::offers::offer::Amount::Bitcoin {
615                                amount_msats,
616                            } => amount_msats.into(),
617                            _ => return Err(payment::Error::AmountMismatch),
618                        }
619                    }
620                };
621                let amount = to_unit(amount_msat, &CurrencyUnit::Msat, unit)?;
622
623                let relative_fee_reserve =
624                    (self.fee_reserve.percent_fee_reserve * u64::from(amount) as f32) as u64;
625
626                let absolute_fee_reserve: u64 = self.fee_reserve.min_fee_reserve.into();
627
628                let fee = match relative_fee_reserve > absolute_fee_reserve {
629                    true => relative_fee_reserve,
630                    false => absolute_fee_reserve,
631                };
632
633                Ok(PaymentQuoteResponse {
634                    request_lookup_id: None,
635                    amount,
636                    fee: fee.into(),
637                    state: MeltQuoteState::Unpaid,
638                    unit: unit.clone(),
639                })
640            }
641        }
642    }
643
644    /// Pay request
645    #[instrument(skip(self, options))]
646    async fn make_payment(
647        &self,
648        unit: &CurrencyUnit,
649        options: OutgoingPaymentOptions,
650    ) -> Result<MakePaymentResponse, Self::Err> {
651        match options {
652            OutgoingPaymentOptions::Bolt11(bolt11_options) => {
653                let bolt11 = bolt11_options.bolt11;
654
655                let send_params = match bolt11_options
656                    .max_fee_amount
657                    .map(|f| {
658                        to_unit(f, unit, &CurrencyUnit::Msat).map(|amount_msat| SendingParameters {
659                            max_total_routing_fee_msat: Some(Some(amount_msat.into())),
660                            max_channel_saturation_power_of_half: None,
661                            max_total_cltv_expiry_delta: None,
662                            max_path_count: None,
663                        })
664                    })
665                    .transpose()
666                {
667                    Ok(params) => params,
668                    Err(err) => {
669                        tracing::error!("Failed to convert fee amount: {}", err);
670                        return Err(payment::Error::Custom(format!("Invalid fee amount: {err}")));
671                    }
672                };
673
674                let payment_id = match bolt11_options.melt_options {
675                    Some(MeltOptions::Amountless { amountless }) => self
676                        .inner
677                        .bolt11_payment()
678                        .send_using_amount(&bolt11, amountless.amount_msat.into(), send_params)
679                        .map_err(|err| {
680                            tracing::error!("Could not send send amountless bolt11: {}", err);
681                            Error::CouldNotSendBolt11WithoutAmount
682                        })?,
683                    None => self
684                        .inner
685                        .bolt11_payment()
686                        .send(&bolt11, send_params)
687                        .map_err(|err| {
688                            tracing::error!("Could not send bolt11 {}", err);
689                            Error::CouldNotSendBolt11
690                        })?,
691                    _ => return Err(payment::Error::UnsupportedPaymentOption),
692                };
693
694                // Check payment status for up to 10 seconds
695                let start = std::time::Instant::now();
696                let timeout = std::time::Duration::from_secs(10);
697
698                let (status, payment_details) = loop {
699                    let details = self
700                        .inner
701                        .payment(&payment_id)
702                        .ok_or(Error::PaymentNotFound)?;
703
704                    match details.status {
705                        PaymentStatus::Succeeded => break (MeltQuoteState::Paid, details),
706                        PaymentStatus::Failed => {
707                            tracing::error!("Failed to pay bolt11 payment.");
708                            break (MeltQuoteState::Failed, details);
709                        }
710                        PaymentStatus::Pending => {
711                            if start.elapsed() > timeout {
712                                tracing::warn!(
713                                    "Paying bolt11 exceeded timeout 10 seconds no longer waitning."
714                                );
715                                break (MeltQuoteState::Pending, details);
716                            }
717                            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
718                            continue;
719                        }
720                    }
721                };
722
723                let payment_proof = match payment_details.kind {
724                    PaymentKind::Bolt11 {
725                        hash: _,
726                        preimage,
727                        secret: _,
728                    } => preimage.map(|p| p.to_string()),
729                    _ => return Err(Error::UnexpectedPaymentKind.into()),
730                };
731
732                let total_spent = payment_details
733                    .amount_msat
734                    .ok_or(Error::CouldNotGetAmountSpent)?;
735
736                let total_spent = to_unit(total_spent, &CurrencyUnit::Msat, unit)?;
737
738                Ok(MakePaymentResponse {
739                    payment_lookup_id: PaymentIdentifier::PaymentHash(
740                        bolt11.payment_hash().to_byte_array(),
741                    ),
742                    payment_proof,
743                    status,
744                    total_spent,
745                    unit: unit.clone(),
746                })
747            }
748            OutgoingPaymentOptions::Bolt12(bolt12_options) => {
749                let offer = bolt12_options.offer;
750
751                let payment_id = match bolt12_options.melt_options {
752                    Some(MeltOptions::Amountless { amountless }) => self
753                        .inner
754                        .bolt12_payment()
755                        .send_using_amount(&offer, amountless.amount_msat.into(), None, None)
756                        .map_err(Error::LdkNode)?,
757                    None => self
758                        .inner
759                        .bolt12_payment()
760                        .send(&offer, None, None)
761                        .map_err(Error::LdkNode)?,
762                    _ => return Err(payment::Error::UnsupportedPaymentOption),
763                };
764
765                // Check payment status for up to 10 seconds
766                let start = std::time::Instant::now();
767                let timeout = std::time::Duration::from_secs(10);
768
769                let (status, payment_details) = loop {
770                    let details = self
771                        .inner
772                        .payment(&payment_id)
773                        .ok_or(Error::PaymentNotFound)?;
774
775                    match details.status {
776                        PaymentStatus::Succeeded => break (MeltQuoteState::Paid, details),
777                        PaymentStatus::Failed => {
778                            tracing::error!("Payment with id {} failed.", payment_id);
779                            break (MeltQuoteState::Failed, details);
780                        }
781                        PaymentStatus::Pending => {
782                            if start.elapsed() > timeout {
783                                tracing::warn!(
784                                    "Payment has been being for 10 seconds. No longer waiting"
785                                );
786                                break (MeltQuoteState::Pending, details);
787                            }
788                            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
789                            continue;
790                        }
791                    }
792                };
793
794                let payment_proof = match payment_details.kind {
795                    PaymentKind::Bolt12Offer {
796                        hash: _,
797                        preimage,
798                        secret: _,
799                        offer_id: _,
800                        payer_note: _,
801                        quantity: _,
802                    } => preimage.map(|p| p.to_string()),
803                    _ => return Err(Error::UnexpectedPaymentKind.into()),
804                };
805
806                let total_spent = payment_details
807                    .amount_msat
808                    .ok_or(Error::CouldNotGetAmountSpent)?;
809
810                let total_spent = to_unit(total_spent, &CurrencyUnit::Msat, unit)?;
811
812                Ok(MakePaymentResponse {
813                    payment_lookup_id: PaymentIdentifier::PaymentId(payment_id.0),
814                    payment_proof,
815                    status,
816                    total_spent,
817                    unit: unit.clone(),
818                })
819            }
820        }
821    }
822
823    /// Listen for invoices to be paid to the mint
824    /// Returns a stream of request_lookup_id once invoices are paid
825    #[instrument(skip(self))]
826    async fn wait_payment_event(
827        &self,
828    ) -> Result<Pin<Box<dyn Stream<Item = cdk_common::payment::Event> + Send>>, Self::Err> {
829        tracing::info!("Starting stream for invoices - wait_any_incoming_payment called");
830
831        // Set active flag to indicate stream is active
832        self.wait_invoice_is_active.store(true, Ordering::SeqCst);
833        tracing::debug!("wait_invoice_is_active set to true");
834
835        let receiver = self.receiver.clone();
836
837        tracing::info!("Receiver obtained successfully, creating response stream");
838
839        // Transform the String stream into a WaitPaymentResponse stream
840        let response_stream = BroadcastStream::new(receiver.resubscribe());
841
842        // Map the stream to handle BroadcastStreamRecvError and wrap in Event
843        let response_stream = response_stream.filter_map(|result| async move {
844            match result {
845                Ok(payment) => Some(cdk_common::payment::Event::PaymentReceived(payment)),
846                Err(err) => {
847                    tracing::warn!("Error in broadcast stream: {}", err);
848                    None
849                }
850            }
851        });
852
853        // Create a combined stream that also handles cancellation
854        let cancel_token = self.wait_invoice_cancel_token.clone();
855        let is_active = self.wait_invoice_is_active.clone();
856
857        let stream = Box::pin(response_stream);
858
859        // Set up a task to clean up when the stream is dropped
860        tokio::spawn(async move {
861            cancel_token.cancelled().await;
862            tracing::info!("wait_invoice stream cancelled");
863            is_active.store(false, Ordering::SeqCst);
864        });
865
866        tracing::info!("wait_any_incoming_payment returning stream");
867        Ok(stream)
868    }
869
870    /// Is wait invoice active
871    fn is_wait_invoice_active(&self) -> bool {
872        self.wait_invoice_is_active.load(Ordering::SeqCst)
873    }
874
875    /// Cancel wait invoice
876    fn cancel_wait_invoice(&self) {
877        self.wait_invoice_cancel_token.cancel()
878    }
879
880    /// Check the status of an incoming payment
881    async fn check_incoming_payment_status(
882        &self,
883        payment_identifier: &PaymentIdentifier,
884    ) -> Result<Vec<WaitPaymentResponse>, Self::Err> {
885        let payment_id_str = match payment_identifier {
886            PaymentIdentifier::PaymentHash(hash) => hex::encode(hash),
887            PaymentIdentifier::CustomId(id) => id.clone(),
888            _ => return Err(Error::UnsupportedPaymentIdentifierType.into()),
889        };
890
891        let payment_id = PaymentId(
892            hex::decode(&payment_id_str)?
893                .try_into()
894                .map_err(|_| Error::InvalidPaymentIdLength)?,
895        );
896
897        let payment_details = self
898            .inner
899            .payment(&payment_id)
900            .ok_or(Error::PaymentNotFound)?;
901
902        if payment_details.direction == PaymentDirection::Outbound {
903            return Err(Error::InvalidPaymentDirection.into());
904        }
905
906        let amount = if payment_details.status == PaymentStatus::Succeeded {
907            payment_details
908                .amount_msat
909                .ok_or(Error::CouldNotGetPaymentAmount)?
910        } else {
911            return Ok(vec![]);
912        };
913
914        let response = WaitPaymentResponse {
915            payment_identifier: payment_identifier.clone(),
916            payment_amount: amount.into(),
917            unit: CurrencyUnit::Msat,
918            payment_id: payment_id_str,
919        };
920
921        Ok(vec![response])
922    }
923
924    /// Check the status of an outgoing payment
925    async fn check_outgoing_payment(
926        &self,
927        request_lookup_id: &PaymentIdentifier,
928    ) -> Result<MakePaymentResponse, Self::Err> {
929        let payment_details = match request_lookup_id {
930            PaymentIdentifier::PaymentHash(id_hash) => self
931                .inner
932                .list_payments_with_filter(
933                    |p| matches!(&p.kind, PaymentKind::Bolt11 { hash, .. } if &hash.0 == id_hash),
934                )
935                .first()
936                .cloned(),
937            PaymentIdentifier::PaymentId(id) => self.inner.payment(&PaymentId(
938                hex::decode(id)?
939                    .try_into()
940                    .map_err(|_| payment::Error::Custom("Invalid hex".to_string()))?,
941            )),
942            _ => {
943                return Ok(MakePaymentResponse {
944                    payment_lookup_id: request_lookup_id.clone(),
945                    status: MeltQuoteState::Unknown,
946                    payment_proof: None,
947                    total_spent: Amount::ZERO,
948                    unit: CurrencyUnit::Msat,
949                });
950            }
951        }
952        .ok_or(Error::PaymentNotFound)?;
953
954        // This check seems reversed in the original code, so I'm fixing it here
955        if payment_details.direction != PaymentDirection::Outbound {
956            return Err(Error::InvalidPaymentDirection.into());
957        }
958
959        let status = match payment_details.status {
960            PaymentStatus::Pending => MeltQuoteState::Pending,
961            PaymentStatus::Succeeded => MeltQuoteState::Paid,
962            PaymentStatus::Failed => MeltQuoteState::Failed,
963        };
964
965        let payment_proof = match payment_details.kind {
966            PaymentKind::Bolt11 {
967                hash: _,
968                preimage,
969                secret: _,
970            } => preimage.map(|p| p.to_string()),
971            _ => return Err(Error::UnexpectedPaymentKind.into()),
972        };
973
974        let total_spent = payment_details
975            .amount_msat
976            .ok_or(Error::CouldNotGetAmountSpent)?;
977
978        Ok(MakePaymentResponse {
979            payment_lookup_id: request_lookup_id.clone(),
980            payment_proof,
981            status,
982            total_spent: total_spent.into(),
983            unit: CurrencyUnit::Msat,
984        })
985    }
986}
987
988impl Drop for CdkLdkNode {
989    fn drop(&mut self) {
990        tracing::info!("Drop called on CdkLdkNode");
991        self.wait_invoice_cancel_token.cancel();
992        tracing::debug!("Cancelled wait_invoice token in drop");
993    }
994}