Skip to main content

cdk_ldk_node/
lib.rs

1//! CDK lightning backend for ldk-node
2
3#![doc = include_str!("../README.md")]
4
5use std::net::SocketAddr;
6use std::pin::Pin;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use bip39::Mnemonic;
12use cdk_common::common::FeeReserve;
13use cdk_common::payment::{self, *};
14use cdk_common::util::{hex, unix_time};
15use cdk_common::{Amount, CurrencyUnit, MeltOptions, MeltQuoteState};
16use futures::{Stream, StreamExt};
17use ldk_node::bitcoin::hashes::Hash;
18use ldk_node::bitcoin::Network;
19use ldk_node::lightning::ln::channelmanager::PaymentId;
20use ldk_node::lightning::ln::msgs::SocketAddress;
21use ldk_node::lightning::routing::router::RouteParametersConfig;
22use ldk_node::lightning_invoice::{Bolt11InvoiceDescription, Description};
23use ldk_node::lightning_types::payment::PaymentHash;
24use ldk_node::logger::{LogLevel, LogWriter};
25use ldk_node::payment::{PaymentDirection, PaymentKind, PaymentStatus};
26use ldk_node::{Builder, Event, Node};
27use tokio_stream::wrappers::BroadcastStream;
28use tokio_util::sync::CancellationToken;
29use tracing::instrument;
30
31use crate::error::Error;
32use crate::log::StdoutLogWriter;
33
34mod error;
35mod log;
36mod web;
37
38/// CDK Lightning backend using LDK Node
39///
40/// Provides Lightning Network functionality for CDK with support for Cashu operations.
41/// Handles payment creation, processing, and event management using the Lightning Development Kit.
42#[derive(Clone)]
43pub struct CdkLdkNode {
44    inner: Arc<Node>,
45    fee_reserve: FeeReserve,
46    wait_invoice_cancel_token: CancellationToken,
47    wait_invoice_is_active: Arc<AtomicBool>,
48    sender: tokio::sync::broadcast::Sender<WaitPaymentResponse>,
49    receiver: Arc<tokio::sync::broadcast::Receiver<WaitPaymentResponse>>,
50    events_cancel_token: CancellationToken,
51    web_addr: Option<SocketAddr>,
52}
53
54impl std::fmt::Debug for CdkLdkNode {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        f.debug_struct("CdkLdkNode")
57            .field("fee_reserve", &self.fee_reserve)
58            .field("web_addr", &self.web_addr)
59            .finish_non_exhaustive()
60    }
61}
62
63/// Configuration for connecting to Bitcoin RPC
64///
65/// Contains the necessary connection parameters for Bitcoin Core RPC interface.
66#[derive(Debug, Clone)]
67pub struct BitcoinRpcConfig {
68    /// Bitcoin RPC server hostname or IP address
69    pub host: String,
70    /// Bitcoin RPC server port number
71    pub port: u16,
72    /// Username for Bitcoin RPC authentication
73    pub user: String,
74    /// Password for Bitcoin RPC authentication
75    pub password: String,
76}
77
78/// Source of blockchain data for the Lightning node
79///
80/// Specifies how the node should connect to the Bitcoin network to retrieve
81/// blockchain information and broadcast transactions.
82#[derive(Debug, Clone)]
83pub enum ChainSource {
84    /// Use an Esplora server for blockchain data
85    ///
86    /// Contains the URL of the Esplora server endpoint
87    Esplora(String),
88    /// Use Bitcoin Core RPC for blockchain data
89    ///
90    /// Contains the configuration for connecting to Bitcoin Core
91    BitcoinRpc(BitcoinRpcConfig),
92}
93
94/// Source of Lightning network gossip data
95///
96/// Specifies how the node should learn about the Lightning Network topology
97/// and routing information.
98#[derive(Debug, Clone)]
99pub enum GossipSource {
100    /// Learn gossip through peer-to-peer connections
101    ///
102    /// The node will connect to other Lightning nodes and exchange gossip data directly
103    P2P,
104    /// Use Rapid Gossip Sync for efficient gossip updates
105    ///
106    /// Contains the URL of the RGS server for compressed gossip data
107    RapidGossipSync(String),
108}
109/// A builder for an [`CdkLdkNode`] instance.
110#[derive(Debug)]
111pub struct CdkLdkNodeBuilder {
112    network: Network,
113    chain_source: ChainSource,
114    gossip_source: GossipSource,
115    log_dir_path: Option<String>,
116    storage_dir_path: String,
117    fee_reserve: FeeReserve,
118    listening_addresses: Vec<SocketAddress>,
119    seed: Option<Mnemonic>,
120    announcement_addresses: Option<Vec<SocketAddress>>,
121}
122
123impl CdkLdkNodeBuilder {
124    /// Creates a new builder instance.
125    pub fn new(
126        network: Network,
127        chain_source: ChainSource,
128        gossip_source: GossipSource,
129        storage_dir_path: String,
130        fee_reserve: FeeReserve,
131        listening_addresses: Vec<SocketAddress>,
132    ) -> Self {
133        Self {
134            network,
135            chain_source,
136            gossip_source,
137            storage_dir_path,
138            fee_reserve,
139            listening_addresses,
140            seed: None,
141            announcement_addresses: None,
142            log_dir_path: None,
143        }
144    }
145
146    /// Configures the [`CdkLdkNode`] to use the Mnemonic for entropy source configuration
147    pub fn with_seed(mut self, seed: Mnemonic) -> Self {
148        self.seed = Some(seed);
149        self
150    }
151    /// Configures the [`CdkLdkNode`] to use announce this address to the lightning network
152    pub fn with_announcement_address(mut self, announcement_addresses: Vec<SocketAddress>) -> Self {
153        self.announcement_addresses = Some(announcement_addresses);
154        self
155    }
156    /// Configures the [`CdkLdkNode`] to use announce this address to the lightning network
157    pub fn with_log_dir_path(mut self, log_dir_path: String) -> Self {
158        self.log_dir_path = Some(log_dir_path);
159        self
160    }
161
162    /// Builds the [`CdkLdkNode`] instance
163    ///
164    /// # Errors
165    /// Returns an error if the LDK node builder fails to create the node
166    pub fn build(self) -> Result<CdkLdkNode, Error> {
167        let mut ldk = Builder::new();
168        ldk.set_network(self.network);
169        tracing::info!("Storage dir of node is {}", self.storage_dir_path);
170        ldk.set_storage_dir_path(self.storage_dir_path);
171
172        match self.chain_source {
173            ChainSource::Esplora(esplora_url) => {
174                ldk.set_chain_source_esplora(esplora_url, None);
175            }
176            ChainSource::BitcoinRpc(BitcoinRpcConfig {
177                host,
178                port,
179                user,
180                password,
181            }) => {
182                ldk.set_chain_source_bitcoind_rpc(host, port, user, password);
183            }
184        }
185
186        match self.gossip_source {
187            GossipSource::P2P => {
188                ldk.set_gossip_source_p2p();
189            }
190            GossipSource::RapidGossipSync(rgs_url) => {
191                ldk.set_gossip_source_rgs(rgs_url);
192            }
193        }
194
195        ldk.set_listening_addresses(self.listening_addresses)?;
196        if self.log_dir_path.is_some() {
197            ldk.set_filesystem_logger(self.log_dir_path, Some(LogLevel::Info));
198        } else {
199            ldk.set_custom_logger(Arc::new(StdoutLogWriter));
200        }
201
202        ldk.set_node_alias("cdk-ldk-node".to_string())?;
203        // set the seed as bip39 entropy mnemonic
204        if let Some(seed) = self.seed {
205            ldk.set_entropy_bip39_mnemonic(seed, None);
206        }
207        // set the announcement addresses
208        if let Some(announcement_addresses) = self.announcement_addresses {
209            ldk.set_announcement_addresses(announcement_addresses)?;
210        }
211
212        let node = ldk.build()?;
213
214        tracing::info!("Creating tokio channel for payment notifications");
215        let (sender, receiver) = tokio::sync::broadcast::channel(8);
216
217        let id = node.node_id();
218
219        let adr = node.announcement_addresses();
220
221        tracing::info!(
222            "Created node {} with address {:?} on network {}",
223            id,
224            adr,
225            self.network
226        );
227
228        Ok(CdkLdkNode {
229            inner: node.into(),
230            fee_reserve: self.fee_reserve,
231            wait_invoice_cancel_token: CancellationToken::new(),
232            wait_invoice_is_active: Arc::new(AtomicBool::new(false)),
233            sender,
234            receiver: Arc::new(receiver),
235            events_cancel_token: CancellationToken::new(),
236            web_addr: None,
237        })
238    }
239}
240
241impl CdkLdkNode {
242    /// Set the web server address for the LDK node management interface
243    ///
244    /// # Arguments
245    /// * `addr` - Socket address for the web server. If None, no web server will be started.
246    pub fn set_web_addr(&mut self, addr: Option<SocketAddr>) {
247        self.web_addr = addr;
248    }
249
250    /// Get a default web server address using an unused port
251    ///
252    /// Returns a SocketAddr with localhost and port 0, which will cause
253    /// the system to automatically assign an available port
254    pub fn default_web_addr() -> SocketAddr {
255        SocketAddr::from(([127, 0, 0, 1], 8091))
256    }
257
258    /// Start the CDK LDK Node
259    ///
260    /// Starts the underlying LDK node and begins event processing.
261    /// Sets up event handlers to listen for Lightning events like payment received.
262    ///
263    /// # Returns
264    /// Returns `Ok(())` on successful start, error otherwise
265    ///
266    /// # Errors
267    /// Returns an error if the LDK node fails to start or event handling setup fails
268    pub fn start_ldk_node(&self) -> Result<(), Error> {
269        tracing::info!("Starting cdk-ldk node");
270        self.inner.start()?;
271        let node_config = self.inner.config();
272
273        tracing::info!("Starting node with network {}", node_config.network);
274
275        tracing::info!("Node status: {:?}", self.inner.status());
276
277        self.handle_events()?;
278
279        Ok(())
280    }
281
282    /// Start the web server for the LDK node management interface
283    ///
284    /// Starts a web server that provides a user interface for managing the LDK node.
285    /// The web interface allows users to view balances, manage channels, create invoices,
286    /// and send payments.
287    ///
288    /// # Arguments
289    /// * `web_addr` - The socket address to bind the web server to
290    ///
291    /// # Returns
292    /// Returns `Ok(())` on successful start, error otherwise
293    ///
294    /// # Errors
295    /// Returns an error if the web server fails to start
296    pub fn start_web_server(&self, web_addr: SocketAddr) -> Result<(), Error> {
297        let web_server = crate::web::WebServer::new(Arc::new(self.clone()));
298
299        tokio::spawn(async move {
300            if let Err(e) = web_server.serve(web_addr).await {
301                tracing::error!("Web server error: {}", e);
302            }
303        });
304
305        Ok(())
306    }
307
308    /// Stop the CDK LDK Node
309    ///
310    /// Gracefully stops the node by cancelling all active tasks and event handlers.
311    /// This includes:
312    /// - Cancelling the event handler task
313    /// - Cancelling any active wait_invoice streams
314    /// - Stopping the underlying LDK node
315    ///
316    /// # Returns
317    /// Returns `Ok(())` on successful shutdown, error otherwise
318    ///
319    /// # Errors
320    /// Returns an error if the underlying LDK node fails to stop
321    pub fn stop_ldk_node(&self) -> Result<(), Error> {
322        tracing::info!("Stopping CdkLdkNode");
323        // Cancel all tokio tasks
324        tracing::info!("Cancelling event handler");
325        self.events_cancel_token.cancel();
326
327        // Cancel any payment event streams
328        if self.is_payment_event_stream_active() {
329            tracing::info!("Cancelling payment event stream");
330            self.wait_invoice_cancel_token.cancel();
331        }
332
333        // Stop the LDK node
334        tracing::info!("Stopping LDK node");
335        self.inner.stop()?;
336        tracing::info!("CdkLdkNode stopped successfully");
337        Ok(())
338    }
339
340    /// Handle payment received event
341    async fn handle_payment_received(
342        node: &Arc<Node>,
343        sender: &tokio::sync::broadcast::Sender<WaitPaymentResponse>,
344        payment_id: Option<PaymentId>,
345        payment_hash: PaymentHash,
346        amount_msat: u64,
347    ) {
348        tracing::info!(
349            "Received payment for hash={} of amount={} msat",
350            payment_hash,
351            amount_msat
352        );
353
354        let payment_id = match payment_id {
355            Some(id) => id,
356            None => {
357                tracing::warn!("Received payment without payment_id");
358                return;
359            }
360        };
361
362        let payment_id_hex = hex::encode(payment_id.0);
363
364        if amount_msat == 0 {
365            tracing::warn!("Payment of no amount");
366            return;
367        }
368
369        tracing::info!(
370            "Processing payment notification: id={}, amount={} msats",
371            payment_id_hex,
372            amount_msat
373        );
374
375        let payment_details = match node.payment(&payment_id) {
376            Some(details) => details,
377            None => {
378                tracing::error!("Could not find payment details for id={}", payment_id_hex);
379                return;
380            }
381        };
382
383        let (payment_identifier, payment_id) = match payment_details.kind {
384            PaymentKind::Bolt11 { hash, .. } => {
385                (PaymentIdentifier::PaymentHash(hash.0), hash.to_string())
386            }
387            PaymentKind::Bolt12Offer { hash, offer_id, .. } => match hash {
388                Some(h) => (
389                    PaymentIdentifier::OfferId(offer_id.to_string()),
390                    h.to_string(),
391                ),
392                None => {
393                    tracing::error!("Bolt12 payment missing hash");
394                    return;
395                }
396            },
397            k => {
398                tracing::warn!("Received payment of kind {:?} which is not supported", k);
399                return;
400            }
401        };
402
403        let wait_payment_response = WaitPaymentResponse {
404            payment_identifier,
405            payment_amount: Amount::new(amount_msat, CurrencyUnit::Msat),
406            payment_id,
407        };
408
409        match sender.send(wait_payment_response) {
410            Ok(_) => tracing::info!("Successfully sent payment notification to stream"),
411            Err(err) => tracing::error!(
412                "Could not send payment received notification on channel: {}",
413                err
414            ),
415        }
416    }
417
418    /// Set up event handling for the node
419    pub fn handle_events(&self) -> Result<(), Error> {
420        let node = self.inner.clone();
421        let sender = self.sender.clone();
422        let cancel_token = self.events_cancel_token.clone();
423
424        tracing::info!("Starting event handler task");
425
426        tokio::spawn(async move {
427            tracing::info!("Event handler loop started");
428            loop {
429                tokio::select! {
430                    _ = cancel_token.cancelled() => {
431                        tracing::info!("Event handler cancelled");
432                        break;
433                    }
434                    event = node.next_event_async() => {
435                        match event {
436                            Event::PaymentReceived {
437                                payment_id,
438                                payment_hash,
439                                amount_msat,
440                                custom_records: _
441                            } => {
442                                Self::handle_payment_received(
443                                    &node,
444                                    &sender,
445                                    payment_id,
446                                    payment_hash,
447                                    amount_msat
448                                ).await;
449                            }
450                            event => {
451                                tracing::debug!("Received other ldk node event: {:?}", event);
452                            }
453                        }
454
455                        if let Err(err) = node.event_handled() {
456                            tracing::error!("Error handling node event: {}", err);
457                        } else {
458                            tracing::debug!("Successfully handled node event");
459                        }
460                    }
461                }
462            }
463            tracing::info!("Event handler loop terminated");
464        });
465
466        tracing::info!("Event handler task spawned");
467        Ok(())
468    }
469
470    /// Get Node used
471    pub fn node(&self) -> Arc<Node> {
472        Arc::clone(&self.inner)
473    }
474}
475
476/// Mint payment trait
477#[async_trait]
478impl MintPayment for CdkLdkNode {
479    type Err = payment::Error;
480
481    /// Start the payment processor
482    /// Starts the LDK node and begins event processing
483    async fn start(&self) -> Result<(), Self::Err> {
484        self.start_ldk_node().map_err(|e| {
485            tracing::error!("Failed to start CdkLdkNode: {}", e);
486            e
487        })?;
488
489        tracing::info!("CdkLdkNode payment processor started successfully");
490
491        // Start web server if configured
492        if let Some(web_addr) = self.web_addr {
493            tracing::info!("Starting LDK Node web interface on {}", web_addr);
494            self.start_web_server(web_addr).map_err(|e| {
495                tracing::error!("Failed to start web server: {}", e);
496                e
497            })?;
498        } else {
499            tracing::info!("No web server address configured, skipping web interface");
500        }
501
502        Ok(())
503    }
504
505    /// Stop the payment processor
506    /// Gracefully stops the LDK node and cancels all background tasks
507    async fn stop(&self) -> Result<(), Self::Err> {
508        self.stop_ldk_node().map_err(|e| {
509            tracing::error!("Failed to stop CdkLdkNode: {}", e);
510            e.into()
511        })
512    }
513
514    /// Base Settings
515    async fn get_settings(&self) -> Result<SettingsResponse, Self::Err> {
516        let settings = SettingsResponse {
517            unit: CurrencyUnit::Msat.to_string(),
518            bolt11: Some(payment::Bolt11Settings {
519                mpp: false,
520                amountless: true,
521                invoice_description: true,
522            }),
523            bolt12: Some(payment::Bolt12Settings { amountless: true }),
524            onchain: None,
525            custom: std::collections::HashMap::new(),
526        };
527        Ok(settings)
528    }
529
530    /// Create a new invoice
531    #[instrument(skip(self))]
532    async fn create_incoming_payment_request(
533        &self,
534        options: IncomingPaymentOptions,
535    ) -> Result<CreateIncomingPaymentResponse, Self::Err> {
536        match options {
537            IncomingPaymentOptions::Bolt11(bolt11_options) => {
538                let amount_msat: Amount = bolt11_options
539                    .amount
540                    .convert_to(&CurrencyUnit::Msat)?
541                    .into();
542                let description = bolt11_options.description.unwrap_or_default();
543                let time = match bolt11_options.unix_expiry {
544                    Some(t) => t
545                        .checked_sub(unix_time())
546                        .ok_or(payment::Error::InvalidExpiry)?,
547                    None => 36000,
548                };
549
550                let description = Bolt11InvoiceDescription::Direct(
551                    Description::new(description).map_err(|_| Error::InvalidDescription)?,
552                );
553
554                let payment = self
555                    .inner
556                    .bolt11_payment()
557                    .receive(amount_msat.into(), &description, time as u32)
558                    .map_err(Error::LdkNode)?;
559
560                let payment_hash = payment.payment_hash().to_string();
561                let payment_identifier = PaymentIdentifier::PaymentHash(
562                    hex::decode(&payment_hash)?
563                        .try_into()
564                        .map_err(|_| Error::InvalidPaymentHashLength)?,
565                );
566
567                Ok(CreateIncomingPaymentResponse {
568                    request_lookup_id: payment_identifier,
569                    request: payment.to_string(),
570                    expiry: Some(unix_time() + time),
571                    extra_json: None,
572                })
573            }
574            IncomingPaymentOptions::Bolt12(bolt12_options) => {
575                let Bolt12IncomingPaymentOptions {
576                    description,
577                    amount,
578                    unix_expiry,
579                } = *bolt12_options;
580
581                let time = unix_expiry
582                    .map(|t| {
583                        t.checked_sub(unix_time())
584                            .ok_or(payment::Error::InvalidExpiry)
585                            .map(|t| t as u32)
586                    })
587                    .transpose()?;
588
589                let offer = match amount {
590                    Some(amount) => {
591                        let amount_msat: Amount = amount.convert_to(&CurrencyUnit::Msat)?.into();
592
593                        self.inner
594                            .bolt12_payment()
595                            .receive(
596                                amount_msat.into(),
597                                &description.unwrap_or("".to_string()),
598                                time,
599                                None,
600                            )
601                            .map_err(Error::LdkNode)?
602                    }
603                    None => self
604                        .inner
605                        .bolt12_payment()
606                        .receive_variable_amount(&description.unwrap_or("".to_string()), time)
607                        .map_err(Error::LdkNode)?,
608                };
609                let payment_identifier = PaymentIdentifier::OfferId(offer.id().to_string());
610
611                Ok(CreateIncomingPaymentResponse {
612                    request_lookup_id: payment_identifier,
613                    request: offer.to_string(),
614                    expiry: time.map(|a| a as u64),
615                    extra_json: None,
616                })
617            }
618            IncomingPaymentOptions::Custom(_) | IncomingPaymentOptions::Onchain(_) => {
619                Err(cdk_common::payment::Error::UnsupportedPaymentOption)
620            }
621        }
622    }
623
624    /// Get payment quote
625    /// Used to get fee and amount required for a payment request
626    #[instrument(skip_all)]
627    async fn get_payment_quote(
628        &self,
629        unit: &CurrencyUnit,
630        options: OutgoingPaymentOptions,
631    ) -> Result<PaymentQuoteResponse, Self::Err> {
632        match options {
633            cdk_common::payment::OutgoingPaymentOptions::Custom(_) => {
634                Err(cdk_common::payment::Error::UnsupportedPaymentOption)
635            }
636            OutgoingPaymentOptions::Bolt11(bolt11_options) => {
637                let bolt11 = bolt11_options.bolt11;
638
639                let amount_msat = match bolt11_options.melt_options {
640                    Some(MeltOptions::Amountless { amountless }) => {
641                        let amount_msat = amountless.amount_msat;
642
643                        if let Some(invoice_amount) = bolt11.amount_milli_satoshis() {
644                            if invoice_amount != u64::from(amount_msat) {
645                                return Err(payment::Error::AmountMismatch);
646                            }
647                        }
648
649                        amount_msat
650                    }
651                    Some(MeltOptions::Mpp { mpp }) => mpp.amount,
652                    None => bolt11
653                        .amount_milli_satoshis()
654                        .ok_or(Error::UnknownInvoiceAmount)?
655                        .into(),
656                };
657
658                let amount =
659                    Amount::new(amount_msat.into(), CurrencyUnit::Msat).convert_to(unit)?;
660
661                let relative_fee_reserve =
662                    (self.fee_reserve.percent_fee_reserve * amount.value() as f32) as u64;
663
664                let absolute_fee_reserve: u64 = self.fee_reserve.min_fee_reserve.into();
665
666                let fee = match relative_fee_reserve > absolute_fee_reserve {
667                    true => relative_fee_reserve,
668                    false => absolute_fee_reserve,
669                };
670
671                let payment_hash = bolt11.payment_hash().to_string();
672                let payment_hash_bytes = hex::decode(&payment_hash)?
673                    .try_into()
674                    .map_err(|_| Error::InvalidPaymentHashLength)?;
675
676                Ok(PaymentQuoteResponse {
677                    request_lookup_id: Some(PaymentIdentifier::PaymentHash(payment_hash_bytes)),
678                    amount,
679                    fee: Amount::new(fee, unit.clone()),
680                    state: MeltQuoteState::Unpaid,
681                    extra_json: None,
682                    estimated_blocks: None,
683                    fee_options: None,
684                })
685            }
686            OutgoingPaymentOptions::Bolt12(bolt12_options) => {
687                let offer = bolt12_options.offer;
688
689                let amount_msat = match bolt12_options.melt_options {
690                    Some(melt_options) => melt_options.amount_msat(),
691                    None => {
692                        let amount = offer.amount().ok_or(payment::Error::AmountMismatch)?;
693
694                        match amount {
695                            ldk_node::lightning::offers::offer::Amount::Bitcoin {
696                                amount_msats,
697                            } => amount_msats.into(),
698                            _ => return Err(payment::Error::AmountMismatch),
699                        }
700                    }
701                };
702                let amount =
703                    Amount::new(amount_msat.into(), CurrencyUnit::Msat).convert_to(unit)?;
704
705                let relative_fee_reserve =
706                    (self.fee_reserve.percent_fee_reserve * amount.value() as f32) as u64;
707
708                let absolute_fee_reserve: u64 = self.fee_reserve.min_fee_reserve.into();
709
710                let fee = match relative_fee_reserve > absolute_fee_reserve {
711                    true => relative_fee_reserve,
712                    false => absolute_fee_reserve,
713                };
714
715                Ok(PaymentQuoteResponse {
716                    request_lookup_id: None,
717                    amount,
718                    fee: Amount::new(fee, unit.clone()),
719                    state: MeltQuoteState::Unpaid,
720                    extra_json: None,
721                    estimated_blocks: None,
722                    fee_options: None,
723                })
724            }
725            OutgoingPaymentOptions::Onchain(_) => {
726                Err(cdk_common::payment::Error::UnsupportedPaymentOption)
727            }
728        }
729    }
730
731    /// Pay request
732    #[instrument(skip(self, options))]
733    async fn make_payment(
734        &self,
735        unit: &CurrencyUnit,
736        options: OutgoingPaymentOptions,
737    ) -> Result<MakePaymentResponse, Self::Err> {
738        match options {
739            cdk_common::payment::OutgoingPaymentOptions::Custom(_) => {
740                Err(cdk_common::payment::Error::UnsupportedPaymentOption)
741            }
742            OutgoingPaymentOptions::Bolt11(bolt11_options) => {
743                let bolt11 = bolt11_options.bolt11;
744
745                let send_params = match bolt11_options
746                    .max_fee_amount
747                    .map(|f| {
748                        f.convert_to(&CurrencyUnit::Msat)
749                            .map(|amount_msat| RouteParametersConfig {
750                                max_total_routing_fee_msat: Some(amount_msat.value()),
751                                ..Default::default()
752                            })
753                    })
754                    .transpose()
755                {
756                    Ok(params) => params,
757                    Err(err) => {
758                        tracing::error!("Failed to convert fee amount: {}", err);
759                        return Err(payment::Error::Custom(format!("Invalid fee amount: {err}")));
760                    }
761                };
762
763                let payment_id = match bolt11_options.melt_options {
764                    Some(MeltOptions::Amountless { amountless }) => {
765                        if let Some(invoice_amount) = bolt11.amount_milli_satoshis() {
766                            if invoice_amount != u64::from(amountless.amount_msat) {
767                                return Err(payment::Error::AmountMismatch);
768                            }
769                        }
770
771                        self.inner
772                            .bolt11_payment()
773                            .send_using_amount(&bolt11, amountless.amount_msat.into(), send_params)
774                            .map_err(|err| {
775                                tracing::error!("Could not send send amountless bolt11: {}", err);
776                                Error::CouldNotSendBolt11WithoutAmount
777                            })?
778                    }
779                    None => self
780                        .inner
781                        .bolt11_payment()
782                        .send(&bolt11, send_params)
783                        .map_err(|err| {
784                            tracing::error!("Could not send bolt11 {}", err);
785                            Error::CouldNotSendBolt11
786                        })?,
787                    _ => return Err(payment::Error::UnsupportedPaymentOption),
788                };
789
790                // Check payment status for up to 10 seconds
791                let start = std::time::Instant::now();
792                let timeout = std::time::Duration::from_secs(10);
793
794                let (status, payment_details) = loop {
795                    let details = self
796                        .inner
797                        .payment(&payment_id)
798                        .ok_or(Error::PaymentNotFound)?;
799
800                    match details.status {
801                        PaymentStatus::Succeeded => break (MeltQuoteState::Paid, details),
802                        PaymentStatus::Failed => {
803                            tracing::error!("Failed to pay bolt11 payment.");
804                            break (MeltQuoteState::Failed, details);
805                        }
806                        PaymentStatus::Pending => {
807                            if start.elapsed() > timeout {
808                                tracing::warn!(
809                                    "Paying bolt11 exceeded timeout 10 seconds no longer waitning."
810                                );
811                                break (MeltQuoteState::Pending, details);
812                            }
813                            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
814                            continue;
815                        }
816                    }
817                };
818
819                let payment_proof = match payment_details.kind {
820                    PaymentKind::Bolt11 {
821                        hash: _,
822                        preimage,
823                        secret: _,
824                    } => preimage.map(|p| p.to_string()),
825                    _ => return Err(Error::UnexpectedPaymentKind.into()),
826                };
827
828                let total_spent = payment_details
829                    .amount_msat
830                    .ok_or(Error::CouldNotGetAmountSpent)?
831                    + payment_details.fee_paid_msat.unwrap_or_default();
832
833                let total_spent = Amount::new(total_spent, CurrencyUnit::Msat).convert_to(unit)?;
834
835                Ok(MakePaymentResponse {
836                    payment_lookup_id: PaymentIdentifier::PaymentHash(
837                        bolt11.payment_hash().to_byte_array(),
838                    ),
839                    payment_proof,
840                    status,
841                    total_spent,
842                })
843            }
844            OutgoingPaymentOptions::Bolt12(bolt12_options) => {
845                let offer = bolt12_options.offer;
846
847                let payment_id = match bolt12_options.melt_options {
848                    Some(MeltOptions::Amountless { amountless }) => self
849                        .inner
850                        .bolt12_payment()
851                        .send_using_amount(&offer, amountless.amount_msat.into(), None, None, None)
852                        .map_err(Error::LdkNode)?,
853                    None => self
854                        .inner
855                        .bolt12_payment()
856                        .send(&offer, None, None, None)
857                        .map_err(Error::LdkNode)?,
858                    _ => return Err(payment::Error::UnsupportedPaymentOption),
859                };
860
861                // Check payment status for up to 10 seconds
862                let start = std::time::Instant::now();
863                let timeout = std::time::Duration::from_secs(10);
864
865                let (status, payment_details) = loop {
866                    let details = self
867                        .inner
868                        .payment(&payment_id)
869                        .ok_or(Error::PaymentNotFound)?;
870
871                    match details.status {
872                        PaymentStatus::Succeeded => break (MeltQuoteState::Paid, details),
873                        PaymentStatus::Failed => {
874                            tracing::error!("Payment with id {} failed.", payment_id);
875                            break (MeltQuoteState::Failed, details);
876                        }
877                        PaymentStatus::Pending => {
878                            if start.elapsed() > timeout {
879                                tracing::warn!(
880                                    "Payment has been being for 10 seconds. No longer waiting"
881                                );
882                                break (MeltQuoteState::Pending, details);
883                            }
884                            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
885                            continue;
886                        }
887                    }
888                };
889
890                let payment_proof = match payment_details.kind {
891                    PaymentKind::Bolt12Offer {
892                        hash: _,
893                        preimage,
894                        secret: _,
895                        offer_id: _,
896                        payer_note: _,
897                        quantity: _,
898                    } => preimage.map(|p| p.to_string()),
899                    _ => return Err(Error::UnexpectedPaymentKind.into()),
900                };
901
902                let total_spent = payment_details
903                    .amount_msat
904                    .ok_or(Error::CouldNotGetAmountSpent)?
905                    + payment_details.fee_paid_msat.unwrap_or_default();
906
907                let total_spent = Amount::new(total_spent, CurrencyUnit::Msat).convert_to(unit)?;
908
909                Ok(MakePaymentResponse {
910                    payment_lookup_id: PaymentIdentifier::PaymentId(payment_id.0),
911                    payment_proof,
912                    status,
913                    total_spent,
914                })
915            }
916            OutgoingPaymentOptions::Onchain(_) => {
917                Err(cdk_common::payment::Error::UnsupportedPaymentOption)
918            }
919        }
920    }
921
922    /// Listen for invoices to be paid to the mint
923    /// Returns a stream of request_lookup_id once invoices are paid
924    #[instrument(skip(self))]
925    async fn wait_payment_event(
926        &self,
927    ) -> Result<Pin<Box<dyn Stream<Item = cdk_common::payment::Event> + Send>>, Self::Err> {
928        tracing::info!("Starting stream for invoices - wait_any_incoming_payment called");
929
930        // Set active flag to indicate stream is active
931        self.wait_invoice_is_active.store(true, Ordering::SeqCst);
932        tracing::debug!("wait_invoice_is_active set to true");
933
934        let receiver = self.receiver.clone();
935
936        tracing::info!("Receiver obtained successfully, creating response stream");
937
938        // Transform the String stream into a WaitPaymentResponse stream
939        let response_stream = BroadcastStream::new(receiver.resubscribe());
940
941        // Map the stream to handle BroadcastStreamRecvError and wrap in Event
942        let response_stream = response_stream.filter_map(|result| async move {
943            match result {
944                Ok(payment) => Some(cdk_common::payment::Event::PaymentReceived(payment)),
945                Err(err) => {
946                    tracing::warn!("Error in broadcast stream: {}", err);
947                    None
948                }
949            }
950        });
951
952        // Create a combined stream that also handles cancellation
953        let cancel_token = self.wait_invoice_cancel_token.clone();
954        let is_active = self.wait_invoice_is_active.clone();
955
956        let stream = Box::pin(response_stream);
957
958        // Set up a task to clean up when the stream is dropped
959        tokio::spawn(async move {
960            cancel_token.cancelled().await;
961            tracing::info!("wait_invoice stream cancelled");
962            is_active.store(false, Ordering::SeqCst);
963        });
964
965        tracing::info!("wait_any_incoming_payment returning stream");
966        Ok(stream)
967    }
968
969    /// Is payment event stream active
970    fn is_payment_event_stream_active(&self) -> bool {
971        self.wait_invoice_is_active.load(Ordering::SeqCst)
972    }
973
974    /// Cancel payment event stream
975    fn cancel_payment_event_stream(&self) {
976        self.wait_invoice_cancel_token.cancel()
977    }
978
979    /// Check the status of an incoming payment
980    async fn check_incoming_payment_status(
981        &self,
982        payment_identifier: &PaymentIdentifier,
983    ) -> Result<Vec<WaitPaymentResponse>, Self::Err> {
984        let payment_id_str = match payment_identifier {
985            PaymentIdentifier::PaymentHash(hash) => hex::encode(hash),
986            PaymentIdentifier::CustomId(id) => id.clone(),
987            _ => return Err(Error::UnsupportedPaymentIdentifierType.into()),
988        };
989
990        let payment_id = PaymentId(
991            hex::decode(&payment_id_str)?
992                .try_into()
993                .map_err(|_| Error::InvalidPaymentIdLength)?,
994        );
995
996        let payment_details = self
997            .inner
998            .payment(&payment_id)
999            .ok_or(Error::PaymentNotFound)?;
1000
1001        if payment_details.direction == PaymentDirection::Outbound {
1002            return Err(Error::InvalidPaymentDirection.into());
1003        }
1004
1005        let amount = if payment_details.status == PaymentStatus::Succeeded {
1006            payment_details
1007                .amount_msat
1008                .ok_or(Error::CouldNotGetPaymentAmount)?
1009        } else {
1010            return Ok(vec![]);
1011        };
1012
1013        let response = WaitPaymentResponse {
1014            payment_identifier: payment_identifier.clone(),
1015            payment_amount: Amount::new(amount, CurrencyUnit::Msat),
1016            payment_id: payment_id_str,
1017        };
1018
1019        Ok(vec![response])
1020    }
1021
1022    /// Check the status of an outgoing payment
1023    async fn check_outgoing_payment(
1024        &self,
1025        request_lookup_id: &PaymentIdentifier,
1026    ) -> Result<MakePaymentResponse, Self::Err> {
1027        let payment_details = match request_lookup_id {
1028            PaymentIdentifier::PaymentHash(id_hash) => self
1029                .inner
1030                .list_payments_with_filter(
1031                    |p| matches!(&p.kind, PaymentKind::Bolt11 { hash, .. } if &hash.0 == id_hash),
1032                )
1033                .first()
1034                .cloned(),
1035            PaymentIdentifier::PaymentId(id) => self.inner.payment(&PaymentId(
1036                hex::decode(id)?
1037                    .try_into()
1038                    .map_err(|_| payment::Error::Custom("Invalid hex".to_string()))?,
1039            )),
1040            _ => {
1041                return Ok(MakePaymentResponse {
1042                    payment_lookup_id: request_lookup_id.clone(),
1043                    payment_proof: None,
1044                    status: MeltQuoteState::Unknown,
1045                    total_spent: Amount::new(0, CurrencyUnit::Msat),
1046                });
1047            }
1048        }
1049        .ok_or(Error::PaymentNotFound)?;
1050
1051        // This check seems reversed in the original code, so I'm fixing it here
1052        if payment_details.direction != PaymentDirection::Outbound {
1053            return Err(Error::InvalidPaymentDirection.into());
1054        }
1055
1056        let status = match payment_details.status {
1057            PaymentStatus::Pending => MeltQuoteState::Pending,
1058            PaymentStatus::Succeeded => MeltQuoteState::Paid,
1059            PaymentStatus::Failed => MeltQuoteState::Failed,
1060        };
1061
1062        let payment_proof = match payment_details.kind {
1063            PaymentKind::Bolt11 {
1064                hash: _,
1065                preimage,
1066                secret: _,
1067            } => preimage.map(|p| p.to_string()),
1068            _ => return Err(Error::UnexpectedPaymentKind.into()),
1069        };
1070
1071        let total_spent = payment_details
1072            .amount_msat
1073            .ok_or(Error::CouldNotGetAmountSpent)?;
1074
1075        Ok(MakePaymentResponse {
1076            payment_lookup_id: request_lookup_id.clone(),
1077            payment_proof,
1078            status,
1079            total_spent: Amount::new(total_spent, CurrencyUnit::Msat),
1080        })
1081    }
1082}
1083
1084impl Drop for CdkLdkNode {
1085    fn drop(&mut self) {
1086        tracing::info!("Drop called on CdkLdkNode");
1087        self.wait_invoice_cancel_token.cancel();
1088        tracing::debug!("Cancelled wait_invoice token in drop");
1089    }
1090}