Skip to main content

cdk_ldk_node/
lib.rs

1//! CDK lightning backend for ldk-node
2
3#![doc = include_str!("../README.md")]
4
5use std::net::SocketAddr;
6use std::pin::Pin;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use bip39::Mnemonic;
12use cdk_common::common::FeeReserve;
13use cdk_common::payment::{self, *};
14use cdk_common::util::{hex, unix_time};
15use cdk_common::{Amount, CurrencyUnit, MeltOptions, MeltQuoteState};
16use futures::{Stream, StreamExt};
17use ldk_node::bitcoin::hashes::Hash;
18use ldk_node::bitcoin::Network;
19use ldk_node::lightning::ln::channelmanager::PaymentId;
20use ldk_node::lightning::ln::msgs::SocketAddress;
21use ldk_node::lightning::routing::router::RouteParametersConfig;
22use ldk_node::lightning_invoice::{Bolt11InvoiceDescription, Description};
23use ldk_node::lightning_types::payment::PaymentHash;
24use ldk_node::logger::{LogLevel, LogWriter};
25use ldk_node::payment::{PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus};
26use ldk_node::{Builder, Event, Node};
27use tokio_stream::wrappers::BroadcastStream;
28use tokio_util::sync::CancellationToken;
29use tracing::instrument;
30
31use crate::error::Error;
32use crate::log::StdoutLogWriter;
33
34mod error;
35mod log;
36mod web;
37
38/// CDK Lightning backend using LDK Node
39///
40/// Provides Lightning Network functionality for CDK with support for Cashu operations.
41/// Handles payment creation, processing, and event management using the Lightning Development Kit.
42#[derive(Clone)]
43pub struct CdkLdkNode {
44    inner: Arc<Node>,
45    fee_reserve: FeeReserve,
46    wait_invoice_cancel_token: CancellationToken,
47    wait_invoice_is_active: Arc<AtomicBool>,
48    sender: tokio::sync::broadcast::Sender<WaitPaymentResponse>,
49    receiver: Arc<tokio::sync::broadcast::Receiver<WaitPaymentResponse>>,
50    events_cancel_token: CancellationToken,
51    web_addr: Option<SocketAddr>,
52}
53
54impl std::fmt::Debug for CdkLdkNode {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        f.debug_struct("CdkLdkNode")
57            .field("fee_reserve", &self.fee_reserve)
58            .field("web_addr", &self.web_addr)
59            .finish_non_exhaustive()
60    }
61}
62
63/// Configuration for connecting to Bitcoin RPC
64///
65/// Contains the necessary connection parameters for Bitcoin Core RPC interface.
66#[derive(Debug, Clone)]
67pub struct BitcoinRpcConfig {
68    /// Bitcoin RPC server hostname or IP address
69    pub host: String,
70    /// Bitcoin RPC server port number
71    pub port: u16,
72    /// Username for Bitcoin RPC authentication
73    pub user: String,
74    /// Password for Bitcoin RPC authentication
75    pub password: String,
76}
77
78/// Source of blockchain data for the Lightning node
79///
80/// Specifies how the node should connect to the Bitcoin network to retrieve
81/// blockchain information and broadcast transactions.
82#[derive(Debug, Clone)]
83pub enum ChainSource {
84    /// Use an Esplora server for blockchain data
85    ///
86    /// Contains the URL of the Esplora server endpoint
87    Esplora(String),
88    /// Use Bitcoin Core RPC for blockchain data
89    ///
90    /// Contains the configuration for connecting to Bitcoin Core
91    BitcoinRpc(BitcoinRpcConfig),
92}
93
94/// Source of Lightning network gossip data
95///
96/// Specifies how the node should learn about the Lightning Network topology
97/// and routing information.
98#[derive(Debug, Clone)]
99pub enum GossipSource {
100    /// Learn gossip through peer-to-peer connections
101    ///
102    /// The node will connect to other Lightning nodes and exchange gossip data directly
103    P2P,
104    /// Use Rapid Gossip Sync for efficient gossip updates
105    ///
106    /// Contains the URL of the RGS server for compressed gossip data
107    RapidGossipSync(String),
108}
109/// A builder for an [`CdkLdkNode`] instance.
110#[derive(Debug)]
111pub struct CdkLdkNodeBuilder {
112    network: Network,
113    chain_source: ChainSource,
114    gossip_source: GossipSource,
115    log_dir_path: Option<String>,
116    storage_dir_path: String,
117    fee_reserve: FeeReserve,
118    listening_addresses: Vec<SocketAddress>,
119    seed: Option<Mnemonic>,
120    announcement_addresses: Option<Vec<SocketAddress>>,
121}
122
123impl CdkLdkNodeBuilder {
124    /// Creates a new builder instance.
125    pub fn new(
126        network: Network,
127        chain_source: ChainSource,
128        gossip_source: GossipSource,
129        storage_dir_path: String,
130        fee_reserve: FeeReserve,
131        listening_addresses: Vec<SocketAddress>,
132    ) -> Self {
133        Self {
134            network,
135            chain_source,
136            gossip_source,
137            storage_dir_path,
138            fee_reserve,
139            listening_addresses,
140            seed: None,
141            announcement_addresses: None,
142            log_dir_path: None,
143        }
144    }
145
146    /// Configures the [`CdkLdkNode`] to use the Mnemonic for entropy source configuration
147    pub fn with_seed(mut self, seed: Mnemonic) -> Self {
148        self.seed = Some(seed);
149        self
150    }
151    /// Configures the [`CdkLdkNode`] to use announce this address to the lightning network
152    pub fn with_announcement_address(mut self, announcement_addresses: Vec<SocketAddress>) -> Self {
153        self.announcement_addresses = Some(announcement_addresses);
154        self
155    }
156    /// Configures the [`CdkLdkNode`] to use announce this address to the lightning network
157    pub fn with_log_dir_path(mut self, log_dir_path: String) -> Self {
158        self.log_dir_path = Some(log_dir_path);
159        self
160    }
161
162    /// Builds the [`CdkLdkNode`] instance
163    ///
164    /// # Errors
165    /// Returns an error if the LDK node builder fails to create the node
166    pub fn build(self) -> Result<CdkLdkNode, Error> {
167        let mut ldk = Builder::new();
168        ldk.set_network(self.network);
169        tracing::info!("Storage dir of node is {}", self.storage_dir_path);
170        ldk.set_storage_dir_path(self.storage_dir_path);
171
172        match self.chain_source {
173            ChainSource::Esplora(esplora_url) => {
174                ldk.set_chain_source_esplora(esplora_url, None);
175            }
176            ChainSource::BitcoinRpc(BitcoinRpcConfig {
177                host,
178                port,
179                user,
180                password,
181            }) => {
182                ldk.set_chain_source_bitcoind_rpc(host, port, user, password);
183            }
184        }
185
186        match self.gossip_source {
187            GossipSource::P2P => {
188                ldk.set_gossip_source_p2p();
189            }
190            GossipSource::RapidGossipSync(rgs_url) => {
191                ldk.set_gossip_source_rgs(rgs_url);
192            }
193        }
194
195        ldk.set_listening_addresses(self.listening_addresses)?;
196        if self.log_dir_path.is_some() {
197            ldk.set_filesystem_logger(self.log_dir_path, Some(LogLevel::Info));
198        } else {
199            ldk.set_custom_logger(Arc::new(StdoutLogWriter));
200        }
201
202        ldk.set_node_alias("cdk-ldk-node".to_string())?;
203        // set the seed as bip39 entropy mnemonic
204        if let Some(seed) = self.seed {
205            ldk.set_entropy_bip39_mnemonic(seed, None);
206        }
207        // set the announcement addresses
208        if let Some(announcement_addresses) = self.announcement_addresses {
209            ldk.set_announcement_addresses(announcement_addresses)?;
210        }
211
212        let node = ldk.build()?;
213
214        tracing::info!("Creating tokio channel for payment notifications");
215        let (sender, receiver) = tokio::sync::broadcast::channel(8);
216
217        let id = node.node_id();
218
219        let adr = node.announcement_addresses();
220
221        tracing::info!(
222            "Created node {} with address {:?} on network {}",
223            id,
224            adr,
225            self.network
226        );
227
228        Ok(CdkLdkNode {
229            inner: node.into(),
230            fee_reserve: self.fee_reserve,
231            wait_invoice_cancel_token: CancellationToken::new(),
232            wait_invoice_is_active: Arc::new(AtomicBool::new(false)),
233            sender,
234            receiver: Arc::new(receiver),
235            events_cancel_token: CancellationToken::new(),
236            web_addr: None,
237        })
238    }
239}
240
241impl CdkLdkNode {
242    /// Set the web server address for the LDK node management interface
243    ///
244    /// # Arguments
245    /// * `addr` - Socket address for the web server. If None, no web server will be started.
246    pub fn set_web_addr(&mut self, addr: Option<SocketAddr>) {
247        self.web_addr = addr;
248    }
249
250    /// Get a default web server address using an unused port
251    ///
252    /// Returns a SocketAddr with localhost and port 0, which will cause
253    /// the system to automatically assign an available port
254    pub fn default_web_addr() -> SocketAddr {
255        SocketAddr::from(([127, 0, 0, 1], 8091))
256    }
257
258    fn make_payment_response_from_details(
259        unit: &CurrencyUnit,
260        payment_lookup_id: PaymentIdentifier,
261        payment_details: &PaymentDetails,
262    ) -> Result<MakePaymentResponse, payment::Error> {
263        let status = match payment_details.status {
264            PaymentStatus::Pending => MeltQuoteState::Pending,
265            PaymentStatus::Succeeded => MeltQuoteState::Paid,
266            PaymentStatus::Failed => MeltQuoteState::Failed,
267        };
268
269        let payment_proof = match &payment_details.kind {
270            PaymentKind::Bolt11 { preimage, .. } => preimage.map(|p| p.to_string()),
271            PaymentKind::Bolt12Offer { preimage, .. } => preimage.map(|p| p.to_string()),
272            _ => return Err(Error::UnexpectedPaymentKind.into()),
273        };
274
275        let total_spent = if status == MeltQuoteState::Paid {
276            let total_spent = payment_details
277                .amount_msat
278                .ok_or(Error::CouldNotGetAmountSpent)?
279                + payment_details.fee_paid_msat.unwrap_or_default();
280            Amount::new(total_spent, CurrencyUnit::Msat).convert_to(unit)?
281        } else {
282            Amount::new(0, unit.clone())
283        };
284
285        Ok(MakePaymentResponse {
286            payment_lookup_id,
287            payment_proof,
288            status,
289            total_spent,
290        })
291    }
292
293    /// Start the CDK LDK Node
294    ///
295    /// Starts the underlying LDK node and begins event processing.
296    /// Sets up event handlers to listen for Lightning events like payment received.
297    ///
298    /// # Returns
299    /// Returns `Ok(())` on successful start, error otherwise
300    ///
301    /// # Errors
302    /// Returns an error if the LDK node fails to start or event handling setup fails
303    pub fn start_ldk_node(&self) -> Result<(), Error> {
304        tracing::info!("Starting cdk-ldk node");
305        self.inner.start()?;
306        let node_config = self.inner.config();
307
308        tracing::info!("Starting node with network {}", node_config.network);
309
310        tracing::info!("Node status: {:?}", self.inner.status());
311
312        self.handle_events()?;
313
314        Ok(())
315    }
316
317    /// Start the web server for the LDK node management interface
318    ///
319    /// Starts a web server that provides a user interface for managing the LDK node.
320    /// The web interface allows users to view balances, manage channels, create invoices,
321    /// and send payments.
322    ///
323    /// # Arguments
324    /// * `web_addr` - The socket address to bind the web server to
325    ///
326    /// # Returns
327    /// Returns `Ok(())` on successful start, error otherwise
328    ///
329    /// # Errors
330    /// Returns an error if the web server fails to start
331    pub fn start_web_server(&self, web_addr: SocketAddr) -> Result<(), Error> {
332        let web_server = crate::web::WebServer::new(Arc::new(self.clone()));
333
334        tokio::spawn(async move {
335            if let Err(e) = web_server.serve(web_addr).await {
336                tracing::error!("Web server error: {}", e);
337            }
338        });
339
340        Ok(())
341    }
342
343    /// Stop the CDK LDK Node
344    ///
345    /// Gracefully stops the node by cancelling all active tasks and event handlers.
346    /// This includes:
347    /// - Cancelling the event handler task
348    /// - Cancelling any active wait_invoice streams
349    /// - Stopping the underlying LDK node
350    ///
351    /// # Returns
352    /// Returns `Ok(())` on successful shutdown, error otherwise
353    ///
354    /// # Errors
355    /// Returns an error if the underlying LDK node fails to stop
356    pub fn stop_ldk_node(&self) -> Result<(), Error> {
357        tracing::info!("Stopping CdkLdkNode");
358        // Cancel all tokio tasks
359        tracing::info!("Cancelling event handler");
360        self.events_cancel_token.cancel();
361
362        // Cancel any payment event streams
363        if self.is_payment_event_stream_active() {
364            tracing::info!("Cancelling payment event stream");
365            self.wait_invoice_cancel_token.cancel();
366        }
367
368        // Stop the LDK node
369        tracing::info!("Stopping LDK node");
370        self.inner.stop()?;
371        tracing::info!("CdkLdkNode stopped successfully");
372        Ok(())
373    }
374
375    /// Handle payment received event
376    async fn handle_payment_received(
377        node: &Arc<Node>,
378        sender: &tokio::sync::broadcast::Sender<WaitPaymentResponse>,
379        payment_id: Option<PaymentId>,
380        payment_hash: PaymentHash,
381        amount_msat: u64,
382    ) {
383        tracing::info!(
384            "Received payment for hash={} of amount={} msat",
385            payment_hash,
386            amount_msat
387        );
388
389        let payment_id = match payment_id {
390            Some(id) => id,
391            None => {
392                tracing::warn!("Received payment without payment_id");
393                return;
394            }
395        };
396
397        let payment_id_hex = hex::encode(payment_id.0);
398
399        if amount_msat == 0 {
400            tracing::warn!("Payment of no amount");
401            return;
402        }
403
404        tracing::info!(
405            "Processing payment notification: id={}, amount={} msats",
406            payment_id_hex,
407            amount_msat
408        );
409
410        let payment_details = match node.payment(&payment_id) {
411            Some(details) => details,
412            None => {
413                tracing::error!("Could not find payment details for id={}", payment_id_hex);
414                return;
415            }
416        };
417
418        let (payment_identifier, payment_id) = match payment_details.kind {
419            PaymentKind::Bolt11 { hash, .. } => {
420                (PaymentIdentifier::PaymentHash(hash.0), hash.to_string())
421            }
422            PaymentKind::Bolt12Offer { hash, offer_id, .. } => match hash {
423                Some(h) => (
424                    PaymentIdentifier::OfferId(offer_id.to_string()),
425                    h.to_string(),
426                ),
427                None => {
428                    tracing::error!("Bolt12 payment missing hash");
429                    return;
430                }
431            },
432            k => {
433                tracing::warn!("Received payment of kind {:?} which is not supported", k);
434                return;
435            }
436        };
437
438        let wait_payment_response = WaitPaymentResponse {
439            payment_identifier,
440            payment_amount: Amount::new(amount_msat, CurrencyUnit::Msat),
441            payment_id,
442        };
443
444        match sender.send(wait_payment_response) {
445            Ok(_) => tracing::info!("Successfully sent payment notification to stream"),
446            Err(err) => tracing::error!(
447                "Could not send payment received notification on channel: {}",
448                err
449            ),
450        }
451    }
452
453    /// Set up event handling for the node
454    pub fn handle_events(&self) -> Result<(), Error> {
455        let node = self.inner.clone();
456        let sender = self.sender.clone();
457        let cancel_token = self.events_cancel_token.clone();
458
459        tracing::info!("Starting event handler task");
460
461        tokio::spawn(async move {
462            tracing::info!("Event handler loop started");
463            loop {
464                tokio::select! {
465                    _ = cancel_token.cancelled() => {
466                        tracing::info!("Event handler cancelled");
467                        break;
468                    }
469                    event = node.next_event_async() => {
470                        match event {
471                            Event::PaymentReceived {
472                                payment_id,
473                                payment_hash,
474                                amount_msat,
475                                custom_records: _
476                            } => {
477                                Self::handle_payment_received(
478                                    &node,
479                                    &sender,
480                                    payment_id,
481                                    payment_hash,
482                                    amount_msat
483                                ).await;
484                            }
485                            Event::PaymentFailed {
486                                payment_id,
487                                payment_hash,
488                                reason,
489                            } => {
490                                tracing::error!(
491                                    payment_id = ?payment_id,
492                                    payment_hash = ?payment_hash,
493                                    reason = ?reason,
494                                    "LDK node payment failed"
495                                );
496                            }
497                            event => {
498                                tracing::debug!("Received other ldk node event: {:?}", event);
499                            }
500                        }
501
502                        if let Err(err) = node.event_handled() {
503                            tracing::error!("Error handling node event: {}", err);
504                        } else {
505                            tracing::debug!("Successfully handled node event");
506                        }
507                    }
508                }
509            }
510            tracing::info!("Event handler loop terminated");
511        });
512
513        tracing::info!("Event handler task spawned");
514        Ok(())
515    }
516
517    /// Get Node used
518    pub fn node(&self) -> Arc<Node> {
519        Arc::clone(&self.inner)
520    }
521}
522
523/// Mint payment trait
524#[async_trait]
525impl MintPayment for CdkLdkNode {
526    type Err = payment::Error;
527
528    /// Start the payment processor
529    /// Starts the LDK node and begins event processing
530    async fn start(&self) -> Result<(), Self::Err> {
531        self.start_ldk_node().map_err(|e| {
532            tracing::error!("Failed to start CdkLdkNode: {}", e);
533            e
534        })?;
535
536        tracing::info!("CdkLdkNode payment processor started successfully");
537
538        // Start web server if configured
539        if let Some(web_addr) = self.web_addr {
540            tracing::info!("Starting LDK Node web interface on {}", web_addr);
541            self.start_web_server(web_addr).map_err(|e| {
542                tracing::error!("Failed to start web server: {}", e);
543                e
544            })?;
545        } else {
546            tracing::info!("No web server address configured, skipping web interface");
547        }
548
549        Ok(())
550    }
551
552    /// Stop the payment processor
553    /// Gracefully stops the LDK node and cancels all background tasks
554    async fn stop(&self) -> Result<(), Self::Err> {
555        self.stop_ldk_node().map_err(|e| {
556            tracing::error!("Failed to stop CdkLdkNode: {}", e);
557            e.into()
558        })
559    }
560
561    /// Base Settings
562    async fn get_settings(&self) -> Result<SettingsResponse, Self::Err> {
563        let settings = SettingsResponse {
564            unit: CurrencyUnit::Msat.to_string(),
565            bolt11: Some(payment::Bolt11Settings {
566                mpp: false,
567                amountless: true,
568                invoice_description: true,
569            }),
570            bolt12: Some(payment::Bolt12Settings { amountless: true }),
571            onchain: None,
572            custom: std::collections::HashMap::new(),
573        };
574        Ok(settings)
575    }
576
577    /// Create a new invoice
578    #[instrument(skip(self))]
579    async fn create_incoming_payment_request(
580        &self,
581        options: IncomingPaymentOptions,
582    ) -> Result<CreateIncomingPaymentResponse, Self::Err> {
583        match options {
584            IncomingPaymentOptions::Bolt11(bolt11_options) => {
585                let amount_msat: Amount = bolt11_options
586                    .amount
587                    .convert_to(&CurrencyUnit::Msat)?
588                    .into();
589                let description = bolt11_options.description.unwrap_or_default();
590                let time = match bolt11_options.unix_expiry {
591                    Some(t) => t
592                        .checked_sub(unix_time())
593                        .ok_or(payment::Error::InvalidExpiry)?,
594                    None => 36000,
595                };
596
597                let description = Bolt11InvoiceDescription::Direct(
598                    Description::new(description).map_err(|_| Error::InvalidDescription)?,
599                );
600
601                let payment = self
602                    .inner
603                    .bolt11_payment()
604                    .receive(amount_msat.into(), &description, time as u32)
605                    .map_err(Error::LdkNode)?;
606
607                let payment_hash = payment.payment_hash().to_string();
608                let payment_identifier = PaymentIdentifier::PaymentHash(
609                    hex::decode(&payment_hash)?
610                        .try_into()
611                        .map_err(|_| Error::InvalidPaymentHashLength)?,
612                );
613
614                Ok(CreateIncomingPaymentResponse {
615                    request_lookup_id: payment_identifier,
616                    request: payment.to_string(),
617                    expiry: Some(unix_time() + time),
618                    extra_json: None,
619                })
620            }
621            IncomingPaymentOptions::Bolt12(bolt12_options) => {
622                let Bolt12IncomingPaymentOptions {
623                    description,
624                    amount,
625                    unix_expiry,
626                } = *bolt12_options;
627
628                let time = unix_expiry
629                    .map(|t| {
630                        t.checked_sub(unix_time())
631                            .ok_or(payment::Error::InvalidExpiry)
632                            .map(|t| t as u32)
633                    })
634                    .transpose()?;
635
636                let offer = match amount {
637                    Some(amount) => {
638                        let amount_msat: Amount = amount.convert_to(&CurrencyUnit::Msat)?.into();
639
640                        self.inner
641                            .bolt12_payment()
642                            .receive(
643                                amount_msat.into(),
644                                &description.unwrap_or("".to_string()),
645                                time,
646                                None,
647                            )
648                            .map_err(Error::LdkNode)?
649                    }
650                    None => self
651                        .inner
652                        .bolt12_payment()
653                        .receive_variable_amount(&description.unwrap_or("".to_string()), time)
654                        .map_err(Error::LdkNode)?,
655                };
656                let payment_identifier = PaymentIdentifier::OfferId(offer.id().to_string());
657
658                Ok(CreateIncomingPaymentResponse {
659                    request_lookup_id: payment_identifier,
660                    request: offer.to_string(),
661                    expiry: unix_expiry,
662                    extra_json: None,
663                })
664            }
665            IncomingPaymentOptions::Custom(_) | IncomingPaymentOptions::Onchain(_) => {
666                Err(cdk_common::payment::Error::UnsupportedPaymentOption)
667            }
668        }
669    }
670
671    /// Get payment quote
672    /// Used to get fee and amount required for a payment request
673    #[instrument(skip_all)]
674    async fn get_payment_quote(
675        &self,
676        unit: &CurrencyUnit,
677        options: OutgoingPaymentOptions,
678    ) -> Result<PaymentQuoteResponse, Self::Err> {
679        match options {
680            cdk_common::payment::OutgoingPaymentOptions::Custom(_) => {
681                Err(cdk_common::payment::Error::UnsupportedPaymentOption)
682            }
683            OutgoingPaymentOptions::Bolt11(bolt11_options) => {
684                let bolt11 = bolt11_options.bolt11;
685
686                let amount_msat = match bolt11_options.melt_options {
687                    Some(MeltOptions::Amountless { amountless }) => {
688                        let amount_msat = amountless.amount_msat;
689
690                        if let Some(invoice_amount) = bolt11.amount_milli_satoshis() {
691                            if invoice_amount != u64::from(amount_msat) {
692                                return Err(payment::Error::AmountMismatch);
693                            }
694                        }
695
696                        amount_msat
697                    }
698                    Some(MeltOptions::Mpp { mpp }) => mpp.amount,
699                    None => bolt11
700                        .amount_milli_satoshis()
701                        .ok_or(Error::UnknownInvoiceAmount)?
702                        .into(),
703                };
704
705                let amount =
706                    Amount::new(amount_msat.into(), CurrencyUnit::Msat).convert_to(unit)?;
707
708                let relative_fee_reserve =
709                    (self.fee_reserve.percent_fee_reserve * amount.value() as f32) as u64;
710
711                let absolute_fee_reserve: u64 = self.fee_reserve.min_fee_reserve.into();
712
713                let fee = match relative_fee_reserve > absolute_fee_reserve {
714                    true => relative_fee_reserve,
715                    false => absolute_fee_reserve,
716                };
717
718                let payment_hash = bolt11.payment_hash().to_string();
719                let payment_hash_bytes = hex::decode(&payment_hash)?
720                    .try_into()
721                    .map_err(|_| Error::InvalidPaymentHashLength)?;
722
723                Ok(PaymentQuoteResponse {
724                    request_lookup_id: Some(PaymentIdentifier::PaymentHash(payment_hash_bytes)),
725                    amount,
726                    fee: Amount::new(fee, unit.clone()),
727                    state: MeltQuoteState::Unpaid,
728                    extra_json: None,
729                    estimated_blocks: None,
730                    fee_options: None,
731                })
732            }
733            OutgoingPaymentOptions::Bolt12(bolt12_options) => {
734                let offer = bolt12_options.offer;
735
736                let amount_msat = match bolt12_options.melt_options {
737                    Some(melt_options) => melt_options.amount_msat(),
738                    None => {
739                        let amount = offer.amount().ok_or(payment::Error::AmountMismatch)?;
740
741                        match amount {
742                            ldk_node::lightning::offers::offer::Amount::Bitcoin {
743                                amount_msats,
744                            } => amount_msats.into(),
745                            _ => return Err(payment::Error::AmountMismatch),
746                        }
747                    }
748                };
749                let amount =
750                    Amount::new(amount_msat.into(), CurrencyUnit::Msat).convert_to(unit)?;
751
752                let relative_fee_reserve =
753                    (self.fee_reserve.percent_fee_reserve * amount.value() as f32) as u64;
754
755                let absolute_fee_reserve: u64 = self.fee_reserve.min_fee_reserve.into();
756
757                let fee = match relative_fee_reserve > absolute_fee_reserve {
758                    true => relative_fee_reserve,
759                    false => absolute_fee_reserve,
760                };
761
762                Ok(PaymentQuoteResponse {
763                    request_lookup_id: None,
764                    amount,
765                    fee: Amount::new(fee, unit.clone()),
766                    state: MeltQuoteState::Unpaid,
767                    extra_json: None,
768                    estimated_blocks: None,
769                    fee_options: None,
770                })
771            }
772            OutgoingPaymentOptions::Onchain(_) => {
773                Err(cdk_common::payment::Error::UnsupportedPaymentOption)
774            }
775        }
776    }
777
778    /// Pay request
779    #[instrument(skip(self, options))]
780    async fn make_payment(
781        &self,
782        unit: &CurrencyUnit,
783        options: OutgoingPaymentOptions,
784    ) -> Result<MakePaymentResponse, Self::Err> {
785        match options {
786            cdk_common::payment::OutgoingPaymentOptions::Custom(_) => {
787                Err(cdk_common::payment::Error::UnsupportedPaymentOption)
788            }
789            OutgoingPaymentOptions::Bolt11(bolt11_options) => {
790                let bolt11 = bolt11_options.bolt11;
791
792                let send_params = match bolt11_options
793                    .max_fee_amount
794                    .map(|f| {
795                        f.convert_to(&CurrencyUnit::Msat)
796                            .map(|amount_msat| RouteParametersConfig {
797                                max_total_routing_fee_msat: Some(amount_msat.value()),
798                                ..Default::default()
799                            })
800                    })
801                    .transpose()
802                {
803                    Ok(params) => params,
804                    Err(err) => {
805                        tracing::error!("Failed to convert fee amount: {}", err);
806                        return Err(payment::Error::Custom(format!("Invalid fee amount: {err}")));
807                    }
808                };
809
810                let payment_id = match bolt11_options.melt_options {
811                    Some(MeltOptions::Amountless { amountless }) => {
812                        if let Some(invoice_amount) = bolt11.amount_milli_satoshis() {
813                            if invoice_amount != u64::from(amountless.amount_msat) {
814                                return Err(payment::Error::AmountMismatch);
815                            }
816                        }
817
818                        self.inner
819                            .bolt11_payment()
820                            .send_using_amount(&bolt11, amountless.amount_msat.into(), send_params)
821                            .map_err(|err| {
822                                tracing::error!("Could not send send amountless bolt11: {}", err);
823                                Error::CouldNotSendBolt11WithoutAmount
824                            })?
825                    }
826                    None => self
827                        .inner
828                        .bolt11_payment()
829                        .send(&bolt11, send_params)
830                        .map_err(|err| {
831                            tracing::error!("Could not send bolt11 {}", err);
832                            Error::CouldNotSendBolt11
833                        })?,
834                    _ => return Err(payment::Error::UnsupportedPaymentOption),
835                };
836
837                // Check payment status for up to 10 seconds
838                let start = std::time::Instant::now();
839                let timeout = std::time::Duration::from_secs(10);
840
841                let payment_details = loop {
842                    let details = self
843                        .inner
844                        .payment(&payment_id)
845                        .ok_or(Error::PaymentNotFound)?;
846
847                    match details.status {
848                        PaymentStatus::Succeeded => break details,
849                        PaymentStatus::Failed => {
850                            tracing::error!("Failed to pay bolt11 payment.");
851                            break details;
852                        }
853                        PaymentStatus::Pending => {
854                            if start.elapsed() > timeout {
855                                tracing::warn!(
856                                    "Paying bolt11 exceeded timeout 10 seconds no longer waitning."
857                                );
858                                break details;
859                            }
860                            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
861                            continue;
862                        }
863                    }
864                };
865
866                Self::make_payment_response_from_details(
867                    unit,
868                    PaymentIdentifier::PaymentHash(bolt11.payment_hash().to_byte_array()),
869                    &payment_details,
870                )
871            }
872            OutgoingPaymentOptions::Bolt12(bolt12_options) => {
873                let offer = bolt12_options.offer;
874
875                let send_params = match bolt12_options
876                    .max_fee_amount
877                    .map(|f| {
878                        f.convert_to(&CurrencyUnit::Msat)
879                            .map(|amount_msat| RouteParametersConfig {
880                                max_total_routing_fee_msat: Some(amount_msat.value()),
881                                ..Default::default()
882                            })
883                    })
884                    .transpose()
885                {
886                    Ok(params) => params,
887                    Err(err) => {
888                        tracing::error!("Failed to convert fee amount: {}", err);
889                        return Err(payment::Error::Custom(format!("Invalid fee amount: {err}")));
890                    }
891                };
892
893                let payment_id = match bolt12_options.melt_options {
894                    Some(MeltOptions::Amountless { amountless }) => self
895                        .inner
896                        .bolt12_payment()
897                        .send_using_amount(
898                            &offer,
899                            amountless.amount_msat.into(),
900                            None,
901                            None,
902                            send_params,
903                        )
904                        .map_err(Error::LdkNode)?,
905                    None => self
906                        .inner
907                        .bolt12_payment()
908                        .send(&offer, None, None, send_params)
909                        .map_err(Error::LdkNode)?,
910                    _ => return Err(payment::Error::UnsupportedPaymentOption),
911                };
912
913                // Check payment status for up to 10 seconds
914                let start = std::time::Instant::now();
915                let timeout = std::time::Duration::from_secs(10);
916
917                let payment_details = loop {
918                    let details = self
919                        .inner
920                        .payment(&payment_id)
921                        .ok_or(Error::PaymentNotFound)?;
922
923                    match details.status {
924                        PaymentStatus::Succeeded => break details,
925                        PaymentStatus::Failed => {
926                            tracing::error!(
927                                payment_id = %payment_id,
928                                amount_msat = ?details.amount_msat,
929                                fee_paid_msat = ?details.fee_paid_msat,
930                                payment_kind = ?details.kind,
931                                "Bolt12 payment failed"
932                            );
933                            break details;
934                        }
935                        PaymentStatus::Pending => {
936                            if start.elapsed() > timeout {
937                                tracing::warn!(
938                                    "Payment has been being for 10 seconds. No longer waiting"
939                                );
940                                break details;
941                            }
942                            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
943                            continue;
944                        }
945                    }
946                };
947
948                Self::make_payment_response_from_details(
949                    unit,
950                    PaymentIdentifier::PaymentId(payment_id.0),
951                    &payment_details,
952                )
953            }
954            OutgoingPaymentOptions::Onchain(_) => {
955                Err(cdk_common::payment::Error::UnsupportedPaymentOption)
956            }
957        }
958    }
959
960    /// Listen for invoices to be paid to the mint
961    /// Returns a stream of request_lookup_id once invoices are paid
962    #[instrument(skip(self))]
963    async fn wait_payment_event(
964        &self,
965    ) -> Result<Pin<Box<dyn Stream<Item = cdk_common::payment::Event> + Send>>, Self::Err> {
966        tracing::info!("Starting stream for invoices - wait_any_incoming_payment called");
967
968        // Set active flag to indicate stream is active
969        self.wait_invoice_is_active.store(true, Ordering::SeqCst);
970        tracing::debug!("wait_invoice_is_active set to true");
971
972        let receiver = self.receiver.clone();
973
974        tracing::info!("Receiver obtained successfully, creating response stream");
975
976        // Transform the String stream into a WaitPaymentResponse stream
977        let response_stream = BroadcastStream::new(receiver.resubscribe());
978
979        // Map the stream to handle BroadcastStreamRecvError and wrap in Event
980        let response_stream = response_stream.filter_map(|result| async move {
981            match result {
982                Ok(payment) => Some(cdk_common::payment::Event::PaymentReceived(payment)),
983                Err(err) => {
984                    tracing::warn!("Error in broadcast stream: {}", err);
985                    None
986                }
987            }
988        });
989
990        // Create a combined stream that also handles cancellation
991        let cancel_token = self.wait_invoice_cancel_token.clone();
992        let is_active = self.wait_invoice_is_active.clone();
993
994        let stream = Box::pin(response_stream);
995
996        // Set up a task to clean up when the stream is dropped
997        tokio::spawn(async move {
998            cancel_token.cancelled().await;
999            tracing::info!("wait_invoice stream cancelled");
1000            is_active.store(false, Ordering::SeqCst);
1001        });
1002
1003        tracing::info!("wait_any_incoming_payment returning stream");
1004        Ok(stream)
1005    }
1006
1007    /// Is payment event stream active
1008    fn is_payment_event_stream_active(&self) -> bool {
1009        self.wait_invoice_is_active.load(Ordering::SeqCst)
1010    }
1011
1012    /// Cancel payment event stream
1013    fn cancel_payment_event_stream(&self) {
1014        self.wait_invoice_cancel_token.cancel()
1015    }
1016
1017    /// Check the status of an incoming payment
1018    async fn check_incoming_payment_status(
1019        &self,
1020        payment_identifier: &PaymentIdentifier,
1021    ) -> Result<Vec<WaitPaymentResponse>, Self::Err> {
1022        // Bolt12 offers are identified by offer id and can be paid more than
1023        // once, so collect every settled inbound payment for the offer.
1024        if let PaymentIdentifier::OfferId(offer_id) = payment_identifier {
1025            let payments = self.inner.list_payments_with_filter(|p| {
1026                p.direction == PaymentDirection::Inbound
1027                    && p.status == PaymentStatus::Succeeded
1028                    && matches!(
1029                        &p.kind,
1030                        PaymentKind::Bolt12Offer { offer_id: oid, .. } if oid.to_string() == *offer_id
1031                    )
1032            });
1033
1034            return Ok(payments
1035                .into_iter()
1036                .filter_map(|p| {
1037                    let payment_id = match &p.kind {
1038                        PaymentKind::Bolt12Offer {
1039                            hash: Some(hash), ..
1040                        } => hash.to_string(),
1041                        _ => {
1042                            tracing::warn!("Bolt12 payment for offer {} missing hash", offer_id);
1043                            return None;
1044                        }
1045                    };
1046
1047                    Some(WaitPaymentResponse {
1048                        payment_identifier: payment_identifier.clone(),
1049                        payment_amount: Amount::new(p.amount_msat?, CurrencyUnit::Msat),
1050                        payment_id,
1051                    })
1052                })
1053                .collect());
1054        }
1055
1056        let payment_id_str = match payment_identifier {
1057            PaymentIdentifier::PaymentHash(hash) => hex::encode(hash),
1058            PaymentIdentifier::CustomId(id) => id.clone(),
1059            _ => return Err(Error::UnsupportedPaymentIdentifierType.into()),
1060        };
1061
1062        let payment_id = PaymentId(
1063            hex::decode(&payment_id_str)?
1064                .try_into()
1065                .map_err(|_| Error::InvalidPaymentIdLength)?,
1066        );
1067
1068        let payment_details = self
1069            .inner
1070            .payment(&payment_id)
1071            .ok_or(Error::PaymentNotFound)?;
1072
1073        if payment_details.direction == PaymentDirection::Outbound {
1074            return Err(Error::InvalidPaymentDirection.into());
1075        }
1076
1077        let amount = if payment_details.status == PaymentStatus::Succeeded {
1078            payment_details
1079                .amount_msat
1080                .ok_or(Error::CouldNotGetPaymentAmount)?
1081        } else {
1082            return Ok(vec![]);
1083        };
1084
1085        let response = WaitPaymentResponse {
1086            payment_identifier: payment_identifier.clone(),
1087            payment_amount: Amount::new(amount, CurrencyUnit::Msat),
1088            payment_id: payment_id_str,
1089        };
1090
1091        Ok(vec![response])
1092    }
1093
1094    /// Check the status of an outgoing payment
1095    async fn check_outgoing_payment(
1096        &self,
1097        request_lookup_id: &PaymentIdentifier,
1098    ) -> Result<MakePaymentResponse, Self::Err> {
1099        let payment_details = match request_lookup_id {
1100            PaymentIdentifier::PaymentHash(id_hash) => self
1101                .inner
1102                .list_payments_with_filter(
1103                    |p| matches!(&p.kind, PaymentKind::Bolt11 { hash, .. } if &hash.0 == id_hash),
1104                )
1105                .first()
1106                .cloned(),
1107            PaymentIdentifier::PaymentId(id) => self.inner.payment(&PaymentId(*id)),
1108            _ => {
1109                return Ok(MakePaymentResponse {
1110                    payment_lookup_id: request_lookup_id.clone(),
1111                    payment_proof: None,
1112                    status: MeltQuoteState::Unknown,
1113                    total_spent: Amount::new(0, CurrencyUnit::Msat),
1114                });
1115            }
1116        }
1117        .ok_or(Error::PaymentNotFound)?;
1118
1119        if payment_details.direction != PaymentDirection::Outbound {
1120            return Err(Error::InvalidPaymentDirection.into());
1121        }
1122
1123        Self::make_payment_response_from_details(
1124            &CurrencyUnit::Msat,
1125            request_lookup_id.clone(),
1126            &payment_details,
1127        )
1128    }
1129}
1130
1131impl Drop for CdkLdkNode {
1132    fn drop(&mut self) {
1133        tracing::info!("Drop called on CdkLdkNode");
1134        self.wait_invoice_cancel_token.cancel();
1135        tracing::debug!("Cancelled wait_invoice token in drop");
1136    }
1137}
1138
1139#[cfg(test)]
1140mod tests {
1141    use super::*;
1142
1143    fn test_payment_details(status: PaymentStatus, amount_msat: Option<u64>) -> PaymentDetails {
1144        PaymentDetails {
1145            id: PaymentId([2; 32]),
1146            kind: PaymentKind::Bolt11 {
1147                hash: PaymentHash([1; 32]),
1148                preimage: None,
1149                secret: None,
1150            },
1151            amount_msat,
1152            fee_paid_msat: None,
1153            direction: PaymentDirection::Outbound,
1154            status,
1155            latest_update_timestamp: 0,
1156        }
1157    }
1158
1159    #[test]
1160    fn failed_payment_response_does_not_require_amount() {
1161        let details = test_payment_details(PaymentStatus::Failed, None);
1162
1163        let response = CdkLdkNode::make_payment_response_from_details(
1164            &CurrencyUnit::Msat,
1165            PaymentIdentifier::PaymentId([2; 32]),
1166            &details,
1167        )
1168        .expect("failed payment details should map without amount");
1169
1170        assert_eq!(response.status, MeltQuoteState::Failed);
1171        assert_eq!(response.total_spent, Amount::new(0, CurrencyUnit::Msat));
1172    }
1173
1174    #[test]
1175    fn pending_payment_response_does_not_require_amount() {
1176        let details = test_payment_details(PaymentStatus::Pending, None);
1177
1178        let response = CdkLdkNode::make_payment_response_from_details(
1179            &CurrencyUnit::Msat,
1180            PaymentIdentifier::PaymentId([2; 32]),
1181            &details,
1182        )
1183        .expect("pending payment details should map without amount");
1184
1185        assert_eq!(response.status, MeltQuoteState::Pending);
1186        assert_eq!(response.total_spent, Amount::new(0, CurrencyUnit::Msat));
1187    }
1188
1189    #[test]
1190    fn paid_payment_response_requires_amount() {
1191        let details = test_payment_details(PaymentStatus::Succeeded, None);
1192
1193        let err = CdkLdkNode::make_payment_response_from_details(
1194            &CurrencyUnit::Msat,
1195            PaymentIdentifier::PaymentId([2; 32]),
1196            &details,
1197        )
1198        .expect_err("paid payment details without amount should fail");
1199
1200        assert!(matches!(err, payment::Error::Lightning(_)));
1201    }
1202}