Skip to main content

cdk_ldk_node/
lib.rs

1//! CDK lightning backend for ldk-node
2
3#![doc = include_str!("../README.md")]
4
5use std::net::SocketAddr;
6use std::pin::Pin;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use bip39::Mnemonic;
12use cdk_common::common::FeeReserve;
13use cdk_common::payment::{self, *};
14use cdk_common::util::{hex, unix_time};
15use cdk_common::{Amount, CurrencyUnit, MeltOptions, MeltQuoteState};
16use futures::{Stream, StreamExt};
17use ldk_node::bitcoin::hashes::Hash;
18use ldk_node::bitcoin::Network;
19use ldk_node::lightning::ln::channelmanager::PaymentId;
20use ldk_node::lightning::ln::msgs::SocketAddress;
21use ldk_node::lightning::routing::router::RouteParametersConfig;
22use ldk_node::lightning_invoice::{Bolt11InvoiceDescription, Description};
23use ldk_node::lightning_types::payment::PaymentHash;
24use ldk_node::logger::{LogLevel, LogWriter};
25use ldk_node::payment::{PaymentDirection, PaymentKind, PaymentStatus};
26use ldk_node::{Builder, Event, Node};
27use tokio_stream::wrappers::BroadcastStream;
28use tokio_util::sync::CancellationToken;
29use tracing::instrument;
30
31use crate::error::Error;
32use crate::log::StdoutLogWriter;
33
34mod error;
35mod log;
36mod web;
37
38/// CDK Lightning backend using LDK Node
39///
40/// Provides Lightning Network functionality for CDK with support for Cashu operations.
41/// Handles payment creation, processing, and event management using the Lightning Development Kit.
42#[derive(Clone)]
43pub struct CdkLdkNode {
44    inner: Arc<Node>,
45    fee_reserve: FeeReserve,
46    wait_invoice_cancel_token: CancellationToken,
47    wait_invoice_is_active: Arc<AtomicBool>,
48    sender: tokio::sync::broadcast::Sender<WaitPaymentResponse>,
49    receiver: Arc<tokio::sync::broadcast::Receiver<WaitPaymentResponse>>,
50    events_cancel_token: CancellationToken,
51    web_addr: Option<SocketAddr>,
52}
53
54impl std::fmt::Debug for CdkLdkNode {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        f.debug_struct("CdkLdkNode")
57            .field("fee_reserve", &self.fee_reserve)
58            .field("web_addr", &self.web_addr)
59            .finish_non_exhaustive()
60    }
61}
62
63/// Configuration for connecting to Bitcoin RPC
64///
65/// Contains the necessary connection parameters for Bitcoin Core RPC interface.
66#[derive(Debug, Clone)]
67pub struct BitcoinRpcConfig {
68    /// Bitcoin RPC server hostname or IP address
69    pub host: String,
70    /// Bitcoin RPC server port number
71    pub port: u16,
72    /// Username for Bitcoin RPC authentication
73    pub user: String,
74    /// Password for Bitcoin RPC authentication
75    pub password: String,
76}
77
78/// Source of blockchain data for the Lightning node
79///
80/// Specifies how the node should connect to the Bitcoin network to retrieve
81/// blockchain information and broadcast transactions.
82#[derive(Debug, Clone)]
83pub enum ChainSource {
84    /// Use an Esplora server for blockchain data
85    ///
86    /// Contains the URL of the Esplora server endpoint
87    Esplora(String),
88    /// Use Bitcoin Core RPC for blockchain data
89    ///
90    /// Contains the configuration for connecting to Bitcoin Core
91    BitcoinRpc(BitcoinRpcConfig),
92}
93
94/// Source of Lightning network gossip data
95///
96/// Specifies how the node should learn about the Lightning Network topology
97/// and routing information.
98#[derive(Debug, Clone)]
99pub enum GossipSource {
100    /// Learn gossip through peer-to-peer connections
101    ///
102    /// The node will connect to other Lightning nodes and exchange gossip data directly
103    P2P,
104    /// Use Rapid Gossip Sync for efficient gossip updates
105    ///
106    /// Contains the URL of the RGS server for compressed gossip data
107    RapidGossipSync(String),
108}
109/// A builder for an [`CdkLdkNode`] instance.
110#[derive(Debug)]
111pub struct CdkLdkNodeBuilder {
112    network: Network,
113    chain_source: ChainSource,
114    gossip_source: GossipSource,
115    log_dir_path: Option<String>,
116    storage_dir_path: String,
117    fee_reserve: FeeReserve,
118    listening_addresses: Vec<SocketAddress>,
119    seed: Option<Mnemonic>,
120    announcement_addresses: Option<Vec<SocketAddress>>,
121}
122
123impl CdkLdkNodeBuilder {
124    /// Creates a new builder instance.
125    pub fn new(
126        network: Network,
127        chain_source: ChainSource,
128        gossip_source: GossipSource,
129        storage_dir_path: String,
130        fee_reserve: FeeReserve,
131        listening_addresses: Vec<SocketAddress>,
132    ) -> Self {
133        Self {
134            network,
135            chain_source,
136            gossip_source,
137            storage_dir_path,
138            fee_reserve,
139            listening_addresses,
140            seed: None,
141            announcement_addresses: None,
142            log_dir_path: None,
143        }
144    }
145
146    /// Configures the [`CdkLdkNode`] to use the Mnemonic for entropy source configuration
147    pub fn with_seed(mut self, seed: Mnemonic) -> Self {
148        self.seed = Some(seed);
149        self
150    }
151    /// Configures the [`CdkLdkNode`] to use announce this address to the lightning network
152    pub fn with_announcement_address(mut self, announcement_addresses: Vec<SocketAddress>) -> Self {
153        self.announcement_addresses = Some(announcement_addresses);
154        self
155    }
156    /// Configures the [`CdkLdkNode`] to use announce this address to the lightning network
157    pub fn with_log_dir_path(mut self, log_dir_path: String) -> Self {
158        self.log_dir_path = Some(log_dir_path);
159        self
160    }
161
162    /// Builds the [`CdkLdkNode`] instance
163    ///
164    /// # Errors
165    /// Returns an error if the LDK node builder fails to create the node
166    pub fn build(self) -> Result<CdkLdkNode, Error> {
167        let mut ldk = Builder::new();
168        ldk.set_network(self.network);
169        tracing::info!("Storage dir of node is {}", self.storage_dir_path);
170        ldk.set_storage_dir_path(self.storage_dir_path);
171
172        match self.chain_source {
173            ChainSource::Esplora(esplora_url) => {
174                ldk.set_chain_source_esplora(esplora_url, None);
175            }
176            ChainSource::BitcoinRpc(BitcoinRpcConfig {
177                host,
178                port,
179                user,
180                password,
181            }) => {
182                ldk.set_chain_source_bitcoind_rpc(host, port, user, password);
183            }
184        }
185
186        match self.gossip_source {
187            GossipSource::P2P => {
188                ldk.set_gossip_source_p2p();
189            }
190            GossipSource::RapidGossipSync(rgs_url) => {
191                ldk.set_gossip_source_rgs(rgs_url);
192            }
193        }
194
195        ldk.set_listening_addresses(self.listening_addresses)?;
196        if self.log_dir_path.is_some() {
197            ldk.set_filesystem_logger(self.log_dir_path, Some(LogLevel::Info));
198        } else {
199            ldk.set_custom_logger(Arc::new(StdoutLogWriter));
200        }
201
202        ldk.set_node_alias("cdk-ldk-node".to_string())?;
203        // set the seed as bip39 entropy mnemonic
204        if let Some(seed) = self.seed {
205            ldk.set_entropy_bip39_mnemonic(seed, None);
206        }
207        // set the announcement addresses
208        if let Some(announcement_addresses) = self.announcement_addresses {
209            ldk.set_announcement_addresses(announcement_addresses)?;
210        }
211
212        let node = ldk.build()?;
213
214        tracing::info!("Creating tokio channel for payment notifications");
215        let (sender, receiver) = tokio::sync::broadcast::channel(8);
216
217        let id = node.node_id();
218
219        let adr = node.announcement_addresses();
220
221        tracing::info!(
222            "Created node {} with address {:?} on network {}",
223            id,
224            adr,
225            self.network
226        );
227
228        Ok(CdkLdkNode {
229            inner: node.into(),
230            fee_reserve: self.fee_reserve,
231            wait_invoice_cancel_token: CancellationToken::new(),
232            wait_invoice_is_active: Arc::new(AtomicBool::new(false)),
233            sender,
234            receiver: Arc::new(receiver),
235            events_cancel_token: CancellationToken::new(),
236            web_addr: None,
237        })
238    }
239}
240
241impl CdkLdkNode {
242    /// Set the web server address for the LDK node management interface
243    ///
244    /// # Arguments
245    /// * `addr` - Socket address for the web server. If None, no web server will be started.
246    pub fn set_web_addr(&mut self, addr: Option<SocketAddr>) {
247        self.web_addr = addr;
248    }
249
250    /// Get a default web server address using an unused port
251    ///
252    /// Returns a SocketAddr with localhost and port 0, which will cause
253    /// the system to automatically assign an available port
254    pub fn default_web_addr() -> SocketAddr {
255        SocketAddr::from(([127, 0, 0, 1], 8091))
256    }
257
258    /// Start the CDK LDK Node
259    ///
260    /// Starts the underlying LDK node and begins event processing.
261    /// Sets up event handlers to listen for Lightning events like payment received.
262    ///
263    /// # Returns
264    /// Returns `Ok(())` on successful start, error otherwise
265    ///
266    /// # Errors
267    /// Returns an error if the LDK node fails to start or event handling setup fails
268    pub fn start_ldk_node(&self) -> Result<(), Error> {
269        tracing::info!("Starting cdk-ldk node");
270        self.inner.start()?;
271        let node_config = self.inner.config();
272
273        tracing::info!("Starting node with network {}", node_config.network);
274
275        tracing::info!("Node status: {:?}", self.inner.status());
276
277        self.handle_events()?;
278
279        Ok(())
280    }
281
282    /// Start the web server for the LDK node management interface
283    ///
284    /// Starts a web server that provides a user interface for managing the LDK node.
285    /// The web interface allows users to view balances, manage channels, create invoices,
286    /// and send payments.
287    ///
288    /// # Arguments
289    /// * `web_addr` - The socket address to bind the web server to
290    ///
291    /// # Returns
292    /// Returns `Ok(())` on successful start, error otherwise
293    ///
294    /// # Errors
295    /// Returns an error if the web server fails to start
296    pub fn start_web_server(&self, web_addr: SocketAddr) -> Result<(), Error> {
297        let web_server = crate::web::WebServer::new(Arc::new(self.clone()));
298
299        tokio::spawn(async move {
300            if let Err(e) = web_server.serve(web_addr).await {
301                tracing::error!("Web server error: {}", e);
302            }
303        });
304
305        Ok(())
306    }
307
308    /// Stop the CDK LDK Node
309    ///
310    /// Gracefully stops the node by cancelling all active tasks and event handlers.
311    /// This includes:
312    /// - Cancelling the event handler task
313    /// - Cancelling any active wait_invoice streams
314    /// - Stopping the underlying LDK node
315    ///
316    /// # Returns
317    /// Returns `Ok(())` on successful shutdown, error otherwise
318    ///
319    /// # Errors
320    /// Returns an error if the underlying LDK node fails to stop
321    pub fn stop_ldk_node(&self) -> Result<(), Error> {
322        tracing::info!("Stopping CdkLdkNode");
323        // Cancel all tokio tasks
324        tracing::info!("Cancelling event handler");
325        self.events_cancel_token.cancel();
326
327        // Cancel any payment event streams
328        if self.is_payment_event_stream_active() {
329            tracing::info!("Cancelling payment event stream");
330            self.wait_invoice_cancel_token.cancel();
331        }
332
333        // Stop the LDK node
334        tracing::info!("Stopping LDK node");
335        self.inner.stop()?;
336        tracing::info!("CdkLdkNode stopped successfully");
337        Ok(())
338    }
339
340    /// Handle payment received event
341    async fn handle_payment_received(
342        node: &Arc<Node>,
343        sender: &tokio::sync::broadcast::Sender<WaitPaymentResponse>,
344        payment_id: Option<PaymentId>,
345        payment_hash: PaymentHash,
346        amount_msat: u64,
347    ) {
348        tracing::info!(
349            "Received payment for hash={} of amount={} msat",
350            payment_hash,
351            amount_msat
352        );
353
354        let payment_id = match payment_id {
355            Some(id) => id,
356            None => {
357                tracing::warn!("Received payment without payment_id");
358                return;
359            }
360        };
361
362        let payment_id_hex = hex::encode(payment_id.0);
363
364        if amount_msat == 0 {
365            tracing::warn!("Payment of no amount");
366            return;
367        }
368
369        tracing::info!(
370            "Processing payment notification: id={}, amount={} msats",
371            payment_id_hex,
372            amount_msat
373        );
374
375        let payment_details = match node.payment(&payment_id) {
376            Some(details) => details,
377            None => {
378                tracing::error!("Could not find payment details for id={}", payment_id_hex);
379                return;
380            }
381        };
382
383        let (payment_identifier, payment_id) = match payment_details.kind {
384            PaymentKind::Bolt11 { hash, .. } => {
385                (PaymentIdentifier::PaymentHash(hash.0), hash.to_string())
386            }
387            PaymentKind::Bolt12Offer { hash, offer_id, .. } => match hash {
388                Some(h) => (
389                    PaymentIdentifier::OfferId(offer_id.to_string()),
390                    h.to_string(),
391                ),
392                None => {
393                    tracing::error!("Bolt12 payment missing hash");
394                    return;
395                }
396            },
397            k => {
398                tracing::warn!("Received payment of kind {:?} which is not supported", k);
399                return;
400            }
401        };
402
403        let wait_payment_response = WaitPaymentResponse {
404            payment_identifier,
405            payment_amount: Amount::new(amount_msat, CurrencyUnit::Msat),
406            payment_id,
407        };
408
409        match sender.send(wait_payment_response) {
410            Ok(_) => tracing::info!("Successfully sent payment notification to stream"),
411            Err(err) => tracing::error!(
412                "Could not send payment received notification on channel: {}",
413                err
414            ),
415        }
416    }
417
418    /// Set up event handling for the node
419    pub fn handle_events(&self) -> Result<(), Error> {
420        let node = self.inner.clone();
421        let sender = self.sender.clone();
422        let cancel_token = self.events_cancel_token.clone();
423
424        tracing::info!("Starting event handler task");
425
426        tokio::spawn(async move {
427            tracing::info!("Event handler loop started");
428            loop {
429                tokio::select! {
430                    _ = cancel_token.cancelled() => {
431                        tracing::info!("Event handler cancelled");
432                        break;
433                    }
434                    event = node.next_event_async() => {
435                        match event {
436                            Event::PaymentReceived {
437                                payment_id,
438                                payment_hash,
439                                amount_msat,
440                                custom_records: _
441                            } => {
442                                Self::handle_payment_received(
443                                    &node,
444                                    &sender,
445                                    payment_id,
446                                    payment_hash,
447                                    amount_msat
448                                ).await;
449                            }
450                            Event::PaymentFailed {
451                                payment_id,
452                                payment_hash,
453                                reason,
454                            } => {
455                                tracing::error!(
456                                    payment_id = ?payment_id,
457                                    payment_hash = ?payment_hash,
458                                    reason = ?reason,
459                                    "LDK node payment failed"
460                                );
461                            }
462                            event => {
463                                tracing::debug!("Received other ldk node event: {:?}", event);
464                            }
465                        }
466
467                        if let Err(err) = node.event_handled() {
468                            tracing::error!("Error handling node event: {}", err);
469                        } else {
470                            tracing::debug!("Successfully handled node event");
471                        }
472                    }
473                }
474            }
475            tracing::info!("Event handler loop terminated");
476        });
477
478        tracing::info!("Event handler task spawned");
479        Ok(())
480    }
481
482    /// Get Node used
483    pub fn node(&self) -> Arc<Node> {
484        Arc::clone(&self.inner)
485    }
486}
487
488/// Mint payment trait
489#[async_trait]
490impl MintPayment for CdkLdkNode {
491    type Err = payment::Error;
492
493    /// Start the payment processor
494    /// Starts the LDK node and begins event processing
495    async fn start(&self) -> Result<(), Self::Err> {
496        self.start_ldk_node().map_err(|e| {
497            tracing::error!("Failed to start CdkLdkNode: {}", e);
498            e
499        })?;
500
501        tracing::info!("CdkLdkNode payment processor started successfully");
502
503        // Start web server if configured
504        if let Some(web_addr) = self.web_addr {
505            tracing::info!("Starting LDK Node web interface on {}", web_addr);
506            self.start_web_server(web_addr).map_err(|e| {
507                tracing::error!("Failed to start web server: {}", e);
508                e
509            })?;
510        } else {
511            tracing::info!("No web server address configured, skipping web interface");
512        }
513
514        Ok(())
515    }
516
517    /// Stop the payment processor
518    /// Gracefully stops the LDK node and cancels all background tasks
519    async fn stop(&self) -> Result<(), Self::Err> {
520        self.stop_ldk_node().map_err(|e| {
521            tracing::error!("Failed to stop CdkLdkNode: {}", e);
522            e.into()
523        })
524    }
525
526    /// Base Settings
527    async fn get_settings(&self) -> Result<SettingsResponse, Self::Err> {
528        let settings = SettingsResponse {
529            unit: CurrencyUnit::Msat.to_string(),
530            bolt11: Some(payment::Bolt11Settings {
531                mpp: false,
532                amountless: true,
533                invoice_description: true,
534            }),
535            bolt12: Some(payment::Bolt12Settings { amountless: true }),
536            onchain: None,
537            custom: std::collections::HashMap::new(),
538        };
539        Ok(settings)
540    }
541
542    /// Create a new invoice
543    #[instrument(skip(self))]
544    async fn create_incoming_payment_request(
545        &self,
546        options: IncomingPaymentOptions,
547    ) -> Result<CreateIncomingPaymentResponse, Self::Err> {
548        match options {
549            IncomingPaymentOptions::Bolt11(bolt11_options) => {
550                let amount_msat: Amount = bolt11_options
551                    .amount
552                    .convert_to(&CurrencyUnit::Msat)?
553                    .into();
554                let description = bolt11_options.description.unwrap_or_default();
555                let time = match bolt11_options.unix_expiry {
556                    Some(t) => t
557                        .checked_sub(unix_time())
558                        .ok_or(payment::Error::InvalidExpiry)?,
559                    None => 36000,
560                };
561
562                let description = Bolt11InvoiceDescription::Direct(
563                    Description::new(description).map_err(|_| Error::InvalidDescription)?,
564                );
565
566                let payment = self
567                    .inner
568                    .bolt11_payment()
569                    .receive(amount_msat.into(), &description, time as u32)
570                    .map_err(Error::LdkNode)?;
571
572                let payment_hash = payment.payment_hash().to_string();
573                let payment_identifier = PaymentIdentifier::PaymentHash(
574                    hex::decode(&payment_hash)?
575                        .try_into()
576                        .map_err(|_| Error::InvalidPaymentHashLength)?,
577                );
578
579                Ok(CreateIncomingPaymentResponse {
580                    request_lookup_id: payment_identifier,
581                    request: payment.to_string(),
582                    expiry: Some(unix_time() + time),
583                    extra_json: None,
584                })
585            }
586            IncomingPaymentOptions::Bolt12(bolt12_options) => {
587                let Bolt12IncomingPaymentOptions {
588                    description,
589                    amount,
590                    unix_expiry,
591                } = *bolt12_options;
592
593                let time = unix_expiry
594                    .map(|t| {
595                        t.checked_sub(unix_time())
596                            .ok_or(payment::Error::InvalidExpiry)
597                            .map(|t| t as u32)
598                    })
599                    .transpose()?;
600
601                let offer = match amount {
602                    Some(amount) => {
603                        let amount_msat: Amount = amount.convert_to(&CurrencyUnit::Msat)?.into();
604
605                        self.inner
606                            .bolt12_payment()
607                            .receive(
608                                amount_msat.into(),
609                                &description.unwrap_or("".to_string()),
610                                time,
611                                None,
612                            )
613                            .map_err(Error::LdkNode)?
614                    }
615                    None => self
616                        .inner
617                        .bolt12_payment()
618                        .receive_variable_amount(&description.unwrap_or("".to_string()), time)
619                        .map_err(Error::LdkNode)?,
620                };
621                let payment_identifier = PaymentIdentifier::OfferId(offer.id().to_string());
622
623                Ok(CreateIncomingPaymentResponse {
624                    request_lookup_id: payment_identifier,
625                    request: offer.to_string(),
626                    expiry: unix_expiry,
627                    extra_json: None,
628                })
629            }
630            IncomingPaymentOptions::Custom(_) | IncomingPaymentOptions::Onchain(_) => {
631                Err(cdk_common::payment::Error::UnsupportedPaymentOption)
632            }
633        }
634    }
635
636    /// Get payment quote
637    /// Used to get fee and amount required for a payment request
638    #[instrument(skip_all)]
639    async fn get_payment_quote(
640        &self,
641        unit: &CurrencyUnit,
642        options: OutgoingPaymentOptions,
643    ) -> Result<PaymentQuoteResponse, Self::Err> {
644        match options {
645            cdk_common::payment::OutgoingPaymentOptions::Custom(_) => {
646                Err(cdk_common::payment::Error::UnsupportedPaymentOption)
647            }
648            OutgoingPaymentOptions::Bolt11(bolt11_options) => {
649                let bolt11 = bolt11_options.bolt11;
650
651                let amount_msat = match bolt11_options.melt_options {
652                    Some(MeltOptions::Amountless { amountless }) => {
653                        let amount_msat = amountless.amount_msat;
654
655                        if let Some(invoice_amount) = bolt11.amount_milli_satoshis() {
656                            if invoice_amount != u64::from(amount_msat) {
657                                return Err(payment::Error::AmountMismatch);
658                            }
659                        }
660
661                        amount_msat
662                    }
663                    Some(MeltOptions::Mpp { mpp }) => mpp.amount,
664                    None => bolt11
665                        .amount_milli_satoshis()
666                        .ok_or(Error::UnknownInvoiceAmount)?
667                        .into(),
668                };
669
670                let amount =
671                    Amount::new(amount_msat.into(), CurrencyUnit::Msat).convert_to(unit)?;
672
673                let relative_fee_reserve =
674                    (self.fee_reserve.percent_fee_reserve * amount.value() as f32) as u64;
675
676                let absolute_fee_reserve: u64 = self.fee_reserve.min_fee_reserve.into();
677
678                let fee = match relative_fee_reserve > absolute_fee_reserve {
679                    true => relative_fee_reserve,
680                    false => absolute_fee_reserve,
681                };
682
683                let payment_hash = bolt11.payment_hash().to_string();
684                let payment_hash_bytes = hex::decode(&payment_hash)?
685                    .try_into()
686                    .map_err(|_| Error::InvalidPaymentHashLength)?;
687
688                Ok(PaymentQuoteResponse {
689                    request_lookup_id: Some(PaymentIdentifier::PaymentHash(payment_hash_bytes)),
690                    amount,
691                    fee: Amount::new(fee, unit.clone()),
692                    state: MeltQuoteState::Unpaid,
693                    extra_json: None,
694                    estimated_blocks: None,
695                    fee_options: None,
696                })
697            }
698            OutgoingPaymentOptions::Bolt12(bolt12_options) => {
699                let offer = bolt12_options.offer;
700
701                let amount_msat = match bolt12_options.melt_options {
702                    Some(melt_options) => melt_options.amount_msat(),
703                    None => {
704                        let amount = offer.amount().ok_or(payment::Error::AmountMismatch)?;
705
706                        match amount {
707                            ldk_node::lightning::offers::offer::Amount::Bitcoin {
708                                amount_msats,
709                            } => amount_msats.into(),
710                            _ => return Err(payment::Error::AmountMismatch),
711                        }
712                    }
713                };
714                let amount =
715                    Amount::new(amount_msat.into(), CurrencyUnit::Msat).convert_to(unit)?;
716
717                let relative_fee_reserve =
718                    (self.fee_reserve.percent_fee_reserve * amount.value() as f32) as u64;
719
720                let absolute_fee_reserve: u64 = self.fee_reserve.min_fee_reserve.into();
721
722                let fee = match relative_fee_reserve > absolute_fee_reserve {
723                    true => relative_fee_reserve,
724                    false => absolute_fee_reserve,
725                };
726
727                Ok(PaymentQuoteResponse {
728                    request_lookup_id: None,
729                    amount,
730                    fee: Amount::new(fee, unit.clone()),
731                    state: MeltQuoteState::Unpaid,
732                    extra_json: None,
733                    estimated_blocks: None,
734                    fee_options: None,
735                })
736            }
737            OutgoingPaymentOptions::Onchain(_) => {
738                Err(cdk_common::payment::Error::UnsupportedPaymentOption)
739            }
740        }
741    }
742
743    /// Pay request
744    #[instrument(skip(self, options))]
745    async fn make_payment(
746        &self,
747        unit: &CurrencyUnit,
748        options: OutgoingPaymentOptions,
749    ) -> Result<MakePaymentResponse, Self::Err> {
750        match options {
751            cdk_common::payment::OutgoingPaymentOptions::Custom(_) => {
752                Err(cdk_common::payment::Error::UnsupportedPaymentOption)
753            }
754            OutgoingPaymentOptions::Bolt11(bolt11_options) => {
755                let bolt11 = bolt11_options.bolt11;
756
757                let send_params = match bolt11_options
758                    .max_fee_amount
759                    .map(|f| {
760                        f.convert_to(&CurrencyUnit::Msat)
761                            .map(|amount_msat| RouteParametersConfig {
762                                max_total_routing_fee_msat: Some(amount_msat.value()),
763                                ..Default::default()
764                            })
765                    })
766                    .transpose()
767                {
768                    Ok(params) => params,
769                    Err(err) => {
770                        tracing::error!("Failed to convert fee amount: {}", err);
771                        return Err(payment::Error::Custom(format!("Invalid fee amount: {err}")));
772                    }
773                };
774
775                let payment_id = match bolt11_options.melt_options {
776                    Some(MeltOptions::Amountless { amountless }) => {
777                        if let Some(invoice_amount) = bolt11.amount_milli_satoshis() {
778                            if invoice_amount != u64::from(amountless.amount_msat) {
779                                return Err(payment::Error::AmountMismatch);
780                            }
781                        }
782
783                        self.inner
784                            .bolt11_payment()
785                            .send_using_amount(&bolt11, amountless.amount_msat.into(), send_params)
786                            .map_err(|err| {
787                                tracing::error!("Could not send send amountless bolt11: {}", err);
788                                Error::CouldNotSendBolt11WithoutAmount
789                            })?
790                    }
791                    None => self
792                        .inner
793                        .bolt11_payment()
794                        .send(&bolt11, send_params)
795                        .map_err(|err| {
796                            tracing::error!("Could not send bolt11 {}", err);
797                            Error::CouldNotSendBolt11
798                        })?,
799                    _ => return Err(payment::Error::UnsupportedPaymentOption),
800                };
801
802                // Check payment status for up to 10 seconds
803                let start = std::time::Instant::now();
804                let timeout = std::time::Duration::from_secs(10);
805
806                let (status, payment_details) = loop {
807                    let details = self
808                        .inner
809                        .payment(&payment_id)
810                        .ok_or(Error::PaymentNotFound)?;
811
812                    match details.status {
813                        PaymentStatus::Succeeded => break (MeltQuoteState::Paid, details),
814                        PaymentStatus::Failed => {
815                            tracing::error!("Failed to pay bolt11 payment.");
816                            break (MeltQuoteState::Failed, details);
817                        }
818                        PaymentStatus::Pending => {
819                            if start.elapsed() > timeout {
820                                tracing::warn!(
821                                    "Paying bolt11 exceeded timeout 10 seconds no longer waitning."
822                                );
823                                break (MeltQuoteState::Pending, details);
824                            }
825                            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
826                            continue;
827                        }
828                    }
829                };
830
831                let payment_proof = match payment_details.kind {
832                    PaymentKind::Bolt11 {
833                        hash: _,
834                        preimage,
835                        secret: _,
836                    } => preimage.map(|p| p.to_string()),
837                    _ => return Err(Error::UnexpectedPaymentKind.into()),
838                };
839
840                let total_spent = payment_details
841                    .amount_msat
842                    .ok_or(Error::CouldNotGetAmountSpent)?
843                    + payment_details.fee_paid_msat.unwrap_or_default();
844
845                let total_spent = Amount::new(total_spent, CurrencyUnit::Msat).convert_to(unit)?;
846
847                Ok(MakePaymentResponse {
848                    payment_lookup_id: PaymentIdentifier::PaymentHash(
849                        bolt11.payment_hash().to_byte_array(),
850                    ),
851                    payment_proof,
852                    status,
853                    total_spent,
854                })
855            }
856            OutgoingPaymentOptions::Bolt12(bolt12_options) => {
857                let offer = bolt12_options.offer;
858
859                let send_params = match bolt12_options
860                    .max_fee_amount
861                    .map(|f| {
862                        f.convert_to(&CurrencyUnit::Msat)
863                            .map(|amount_msat| RouteParametersConfig {
864                                max_total_routing_fee_msat: Some(amount_msat.value()),
865                                ..Default::default()
866                            })
867                    })
868                    .transpose()
869                {
870                    Ok(params) => params,
871                    Err(err) => {
872                        tracing::error!("Failed to convert fee amount: {}", err);
873                        return Err(payment::Error::Custom(format!("Invalid fee amount: {err}")));
874                    }
875                };
876
877                let payment_id = match bolt12_options.melt_options {
878                    Some(MeltOptions::Amountless { amountless }) => self
879                        .inner
880                        .bolt12_payment()
881                        .send_using_amount(
882                            &offer,
883                            amountless.amount_msat.into(),
884                            None,
885                            None,
886                            send_params,
887                        )
888                        .map_err(Error::LdkNode)?,
889                    None => self
890                        .inner
891                        .bolt12_payment()
892                        .send(&offer, None, None, send_params)
893                        .map_err(Error::LdkNode)?,
894                    _ => return Err(payment::Error::UnsupportedPaymentOption),
895                };
896
897                // Check payment status for up to 10 seconds
898                let start = std::time::Instant::now();
899                let timeout = std::time::Duration::from_secs(10);
900
901                let (status, payment_details) = loop {
902                    let details = self
903                        .inner
904                        .payment(&payment_id)
905                        .ok_or(Error::PaymentNotFound)?;
906
907                    match details.status {
908                        PaymentStatus::Succeeded => break (MeltQuoteState::Paid, details),
909                        PaymentStatus::Failed => {
910                            tracing::error!(
911                                payment_id = %payment_id,
912                                amount_msat = ?details.amount_msat,
913                                fee_paid_msat = ?details.fee_paid_msat,
914                                payment_kind = ?details.kind,
915                                "Bolt12 payment failed"
916                            );
917                            break (MeltQuoteState::Failed, details);
918                        }
919                        PaymentStatus::Pending => {
920                            if start.elapsed() > timeout {
921                                tracing::warn!(
922                                    "Payment has been being for 10 seconds. No longer waiting"
923                                );
924                                break (MeltQuoteState::Pending, details);
925                            }
926                            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
927                            continue;
928                        }
929                    }
930                };
931
932                let payment_proof = match payment_details.kind {
933                    PaymentKind::Bolt12Offer {
934                        hash: _,
935                        preimage,
936                        secret: _,
937                        offer_id: _,
938                        payer_note: _,
939                        quantity: _,
940                    } => preimage.map(|p| p.to_string()),
941                    _ => return Err(Error::UnexpectedPaymentKind.into()),
942                };
943
944                let total_spent = payment_details
945                    .amount_msat
946                    .ok_or(Error::CouldNotGetAmountSpent)?
947                    + payment_details.fee_paid_msat.unwrap_or_default();
948
949                let total_spent = Amount::new(total_spent, CurrencyUnit::Msat).convert_to(unit)?;
950
951                Ok(MakePaymentResponse {
952                    payment_lookup_id: PaymentIdentifier::PaymentId(payment_id.0),
953                    payment_proof,
954                    status,
955                    total_spent,
956                })
957            }
958            OutgoingPaymentOptions::Onchain(_) => {
959                Err(cdk_common::payment::Error::UnsupportedPaymentOption)
960            }
961        }
962    }
963
964    /// Listen for invoices to be paid to the mint
965    /// Returns a stream of request_lookup_id once invoices are paid
966    #[instrument(skip(self))]
967    async fn wait_payment_event(
968        &self,
969    ) -> Result<Pin<Box<dyn Stream<Item = cdk_common::payment::Event> + Send>>, Self::Err> {
970        tracing::info!("Starting stream for invoices - wait_any_incoming_payment called");
971
972        // Set active flag to indicate stream is active
973        self.wait_invoice_is_active.store(true, Ordering::SeqCst);
974        tracing::debug!("wait_invoice_is_active set to true");
975
976        let receiver = self.receiver.clone();
977
978        tracing::info!("Receiver obtained successfully, creating response stream");
979
980        // Transform the String stream into a WaitPaymentResponse stream
981        let response_stream = BroadcastStream::new(receiver.resubscribe());
982
983        // Map the stream to handle BroadcastStreamRecvError and wrap in Event
984        let response_stream = response_stream.filter_map(|result| async move {
985            match result {
986                Ok(payment) => Some(cdk_common::payment::Event::PaymentReceived(payment)),
987                Err(err) => {
988                    tracing::warn!("Error in broadcast stream: {}", err);
989                    None
990                }
991            }
992        });
993
994        // Create a combined stream that also handles cancellation
995        let cancel_token = self.wait_invoice_cancel_token.clone();
996        let is_active = self.wait_invoice_is_active.clone();
997
998        let stream = Box::pin(response_stream);
999
1000        // Set up a task to clean up when the stream is dropped
1001        tokio::spawn(async move {
1002            cancel_token.cancelled().await;
1003            tracing::info!("wait_invoice stream cancelled");
1004            is_active.store(false, Ordering::SeqCst);
1005        });
1006
1007        tracing::info!("wait_any_incoming_payment returning stream");
1008        Ok(stream)
1009    }
1010
1011    /// Is payment event stream active
1012    fn is_payment_event_stream_active(&self) -> bool {
1013        self.wait_invoice_is_active.load(Ordering::SeqCst)
1014    }
1015
1016    /// Cancel payment event stream
1017    fn cancel_payment_event_stream(&self) {
1018        self.wait_invoice_cancel_token.cancel()
1019    }
1020
1021    /// Check the status of an incoming payment
1022    async fn check_incoming_payment_status(
1023        &self,
1024        payment_identifier: &PaymentIdentifier,
1025    ) -> Result<Vec<WaitPaymentResponse>, Self::Err> {
1026        // Bolt12 offers are identified by offer id and can be paid more than
1027        // once, so collect every settled inbound payment for the offer.
1028        if let PaymentIdentifier::OfferId(offer_id) = payment_identifier {
1029            let payments = self.inner.list_payments_with_filter(|p| {
1030                p.direction == PaymentDirection::Inbound
1031                    && p.status == PaymentStatus::Succeeded
1032                    && matches!(
1033                        &p.kind,
1034                        PaymentKind::Bolt12Offer { offer_id: oid, .. } if oid.to_string() == *offer_id
1035                    )
1036            });
1037
1038            return Ok(payments
1039                .into_iter()
1040                .filter_map(|p| {
1041                    let payment_id = match &p.kind {
1042                        PaymentKind::Bolt12Offer {
1043                            hash: Some(hash), ..
1044                        } => hash.to_string(),
1045                        _ => {
1046                            tracing::warn!("Bolt12 payment for offer {} missing hash", offer_id);
1047                            return None;
1048                        }
1049                    };
1050
1051                    Some(WaitPaymentResponse {
1052                        payment_identifier: payment_identifier.clone(),
1053                        payment_amount: Amount::new(p.amount_msat?, CurrencyUnit::Msat),
1054                        payment_id,
1055                    })
1056                })
1057                .collect());
1058        }
1059
1060        let payment_id_str = match payment_identifier {
1061            PaymentIdentifier::PaymentHash(hash) => hex::encode(hash),
1062            PaymentIdentifier::CustomId(id) => id.clone(),
1063            _ => return Err(Error::UnsupportedPaymentIdentifierType.into()),
1064        };
1065
1066        let payment_id = PaymentId(
1067            hex::decode(&payment_id_str)?
1068                .try_into()
1069                .map_err(|_| Error::InvalidPaymentIdLength)?,
1070        );
1071
1072        let payment_details = self
1073            .inner
1074            .payment(&payment_id)
1075            .ok_or(Error::PaymentNotFound)?;
1076
1077        if payment_details.direction == PaymentDirection::Outbound {
1078            return Err(Error::InvalidPaymentDirection.into());
1079        }
1080
1081        let amount = if payment_details.status == PaymentStatus::Succeeded {
1082            payment_details
1083                .amount_msat
1084                .ok_or(Error::CouldNotGetPaymentAmount)?
1085        } else {
1086            return Ok(vec![]);
1087        };
1088
1089        let response = WaitPaymentResponse {
1090            payment_identifier: payment_identifier.clone(),
1091            payment_amount: Amount::new(amount, CurrencyUnit::Msat),
1092            payment_id: payment_id_str,
1093        };
1094
1095        Ok(vec![response])
1096    }
1097
1098    /// Check the status of an outgoing payment
1099    async fn check_outgoing_payment(
1100        &self,
1101        request_lookup_id: &PaymentIdentifier,
1102    ) -> Result<MakePaymentResponse, Self::Err> {
1103        let payment_details = match request_lookup_id {
1104            PaymentIdentifier::PaymentHash(id_hash) => self
1105                .inner
1106                .list_payments_with_filter(
1107                    |p| matches!(&p.kind, PaymentKind::Bolt11 { hash, .. } if &hash.0 == id_hash),
1108                )
1109                .first()
1110                .cloned(),
1111            PaymentIdentifier::PaymentId(id) => self.inner.payment(&PaymentId(*id)),
1112            _ => {
1113                return Ok(MakePaymentResponse {
1114                    payment_lookup_id: request_lookup_id.clone(),
1115                    payment_proof: None,
1116                    status: MeltQuoteState::Unknown,
1117                    total_spent: Amount::new(0, CurrencyUnit::Msat),
1118                });
1119            }
1120        }
1121        .ok_or(Error::PaymentNotFound)?;
1122
1123        if payment_details.direction != PaymentDirection::Outbound {
1124            return Err(Error::InvalidPaymentDirection.into());
1125        }
1126
1127        let status = match payment_details.status {
1128            PaymentStatus::Pending => MeltQuoteState::Pending,
1129            PaymentStatus::Succeeded => MeltQuoteState::Paid,
1130            PaymentStatus::Failed => MeltQuoteState::Failed,
1131        };
1132
1133        let payment_proof = match payment_details.kind {
1134            PaymentKind::Bolt11 { preimage, .. } => preimage.map(|p| p.to_string()),
1135            PaymentKind::Bolt12Offer { preimage, .. } => preimage.map(|p| p.to_string()),
1136            _ => return Err(Error::UnexpectedPaymentKind.into()),
1137        };
1138
1139        let total_spent = payment_details
1140            .amount_msat
1141            .ok_or(Error::CouldNotGetAmountSpent)?
1142            + payment_details.fee_paid_msat.unwrap_or_default();
1143
1144        Ok(MakePaymentResponse {
1145            payment_lookup_id: request_lookup_id.clone(),
1146            payment_proof,
1147            status,
1148            total_spent: Amount::new(total_spent, CurrencyUnit::Msat),
1149        })
1150    }
1151}
1152
1153impl Drop for CdkLdkNode {
1154    fn drop(&mut self) {
1155        tracing::info!("Drop called on CdkLdkNode");
1156        self.wait_invoice_cancel_token.cancel();
1157        tracing::debug!("Cancelled wait_invoice token in drop");
1158    }
1159}