1#![doc = include_str!("../README.md")]
4
5use std::net::SocketAddr;
6use std::pin::Pin;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use bip39::Mnemonic;
12use cdk_common::common::FeeReserve;
13use cdk_common::payment::{self, *};
14use cdk_common::util::{hex, unix_time};
15use cdk_common::{Amount, CurrencyUnit, MeltOptions, MeltQuoteState};
16use futures::{Stream, StreamExt};
17use ldk_node::bitcoin::hashes::Hash;
18use ldk_node::bitcoin::Network;
19use ldk_node::lightning::ln::channelmanager::PaymentId;
20use ldk_node::lightning::ln::msgs::SocketAddress;
21use ldk_node::lightning::routing::router::RouteParametersConfig;
22use ldk_node::lightning_invoice::{Bolt11InvoiceDescription, Description};
23use ldk_node::lightning_types::payment::PaymentHash;
24use ldk_node::logger::{LogLevel, LogWriter};
25use ldk_node::payment::{PaymentDirection, PaymentKind, PaymentStatus};
26use ldk_node::{Builder, Event, Node};
27use tokio_stream::wrappers::BroadcastStream;
28use tokio_util::sync::CancellationToken;
29use tracing::instrument;
30
31use crate::error::Error;
32use crate::log::StdoutLogWriter;
33
34mod error;
35mod log;
36mod web;
37
38#[derive(Clone)]
43pub struct CdkLdkNode {
44 inner: Arc<Node>,
45 fee_reserve: FeeReserve,
46 wait_invoice_cancel_token: CancellationToken,
47 wait_invoice_is_active: Arc<AtomicBool>,
48 sender: tokio::sync::broadcast::Sender<WaitPaymentResponse>,
49 receiver: Arc<tokio::sync::broadcast::Receiver<WaitPaymentResponse>>,
50 events_cancel_token: CancellationToken,
51 web_addr: Option<SocketAddr>,
52}
53
54impl std::fmt::Debug for CdkLdkNode {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 f.debug_struct("CdkLdkNode")
57 .field("fee_reserve", &self.fee_reserve)
58 .field("web_addr", &self.web_addr)
59 .finish_non_exhaustive()
60 }
61}
62
63#[derive(Debug, Clone)]
67pub struct BitcoinRpcConfig {
68 pub host: String,
70 pub port: u16,
72 pub user: String,
74 pub password: String,
76}
77
78#[derive(Debug, Clone)]
83pub enum ChainSource {
84 Esplora(String),
88 BitcoinRpc(BitcoinRpcConfig),
92}
93
94#[derive(Debug, Clone)]
99pub enum GossipSource {
100 P2P,
104 RapidGossipSync(String),
108}
109#[derive(Debug)]
111pub struct CdkLdkNodeBuilder {
112 network: Network,
113 chain_source: ChainSource,
114 gossip_source: GossipSource,
115 log_dir_path: Option<String>,
116 storage_dir_path: String,
117 fee_reserve: FeeReserve,
118 listening_addresses: Vec<SocketAddress>,
119 seed: Option<Mnemonic>,
120 announcement_addresses: Option<Vec<SocketAddress>>,
121}
122
123impl CdkLdkNodeBuilder {
124 pub fn new(
126 network: Network,
127 chain_source: ChainSource,
128 gossip_source: GossipSource,
129 storage_dir_path: String,
130 fee_reserve: FeeReserve,
131 listening_addresses: Vec<SocketAddress>,
132 ) -> Self {
133 Self {
134 network,
135 chain_source,
136 gossip_source,
137 storage_dir_path,
138 fee_reserve,
139 listening_addresses,
140 seed: None,
141 announcement_addresses: None,
142 log_dir_path: None,
143 }
144 }
145
146 pub fn with_seed(mut self, seed: Mnemonic) -> Self {
148 self.seed = Some(seed);
149 self
150 }
151 pub fn with_announcement_address(mut self, announcement_addresses: Vec<SocketAddress>) -> Self {
153 self.announcement_addresses = Some(announcement_addresses);
154 self
155 }
156 pub fn with_log_dir_path(mut self, log_dir_path: String) -> Self {
158 self.log_dir_path = Some(log_dir_path);
159 self
160 }
161
162 pub fn build(self) -> Result<CdkLdkNode, Error> {
167 let mut ldk = Builder::new();
168 ldk.set_network(self.network);
169 tracing::info!("Storage dir of node is {}", self.storage_dir_path);
170 ldk.set_storage_dir_path(self.storage_dir_path);
171
172 match self.chain_source {
173 ChainSource::Esplora(esplora_url) => {
174 ldk.set_chain_source_esplora(esplora_url, None);
175 }
176 ChainSource::BitcoinRpc(BitcoinRpcConfig {
177 host,
178 port,
179 user,
180 password,
181 }) => {
182 ldk.set_chain_source_bitcoind_rpc(host, port, user, password);
183 }
184 }
185
186 match self.gossip_source {
187 GossipSource::P2P => {
188 ldk.set_gossip_source_p2p();
189 }
190 GossipSource::RapidGossipSync(rgs_url) => {
191 ldk.set_gossip_source_rgs(rgs_url);
192 }
193 }
194
195 ldk.set_listening_addresses(self.listening_addresses)?;
196 if self.log_dir_path.is_some() {
197 ldk.set_filesystem_logger(self.log_dir_path, Some(LogLevel::Info));
198 } else {
199 ldk.set_custom_logger(Arc::new(StdoutLogWriter));
200 }
201
202 ldk.set_node_alias("cdk-ldk-node".to_string())?;
203 if let Some(seed) = self.seed {
205 ldk.set_entropy_bip39_mnemonic(seed, None);
206 }
207 if let Some(announcement_addresses) = self.announcement_addresses {
209 ldk.set_announcement_addresses(announcement_addresses)?;
210 }
211
212 let node = ldk.build()?;
213
214 tracing::info!("Creating tokio channel for payment notifications");
215 let (sender, receiver) = tokio::sync::broadcast::channel(8);
216
217 let id = node.node_id();
218
219 let adr = node.announcement_addresses();
220
221 tracing::info!(
222 "Created node {} with address {:?} on network {}",
223 id,
224 adr,
225 self.network
226 );
227
228 Ok(CdkLdkNode {
229 inner: node.into(),
230 fee_reserve: self.fee_reserve,
231 wait_invoice_cancel_token: CancellationToken::new(),
232 wait_invoice_is_active: Arc::new(AtomicBool::new(false)),
233 sender,
234 receiver: Arc::new(receiver),
235 events_cancel_token: CancellationToken::new(),
236 web_addr: None,
237 })
238 }
239}
240
241impl CdkLdkNode {
242 pub fn set_web_addr(&mut self, addr: Option<SocketAddr>) {
247 self.web_addr = addr;
248 }
249
250 pub fn default_web_addr() -> SocketAddr {
255 SocketAddr::from(([127, 0, 0, 1], 8091))
256 }
257
258 pub fn start_ldk_node(&self) -> Result<(), Error> {
269 tracing::info!("Starting cdk-ldk node");
270 self.inner.start()?;
271 let node_config = self.inner.config();
272
273 tracing::info!("Starting node with network {}", node_config.network);
274
275 tracing::info!("Node status: {:?}", self.inner.status());
276
277 self.handle_events()?;
278
279 Ok(())
280 }
281
282 pub fn start_web_server(&self, web_addr: SocketAddr) -> Result<(), Error> {
297 let web_server = crate::web::WebServer::new(Arc::new(self.clone()));
298
299 tokio::spawn(async move {
300 if let Err(e) = web_server.serve(web_addr).await {
301 tracing::error!("Web server error: {}", e);
302 }
303 });
304
305 Ok(())
306 }
307
308 pub fn stop_ldk_node(&self) -> Result<(), Error> {
322 tracing::info!("Stopping CdkLdkNode");
323 tracing::info!("Cancelling event handler");
325 self.events_cancel_token.cancel();
326
327 if self.is_payment_event_stream_active() {
329 tracing::info!("Cancelling payment event stream");
330 self.wait_invoice_cancel_token.cancel();
331 }
332
333 tracing::info!("Stopping LDK node");
335 self.inner.stop()?;
336 tracing::info!("CdkLdkNode stopped successfully");
337 Ok(())
338 }
339
340 async fn handle_payment_received(
342 node: &Arc<Node>,
343 sender: &tokio::sync::broadcast::Sender<WaitPaymentResponse>,
344 payment_id: Option<PaymentId>,
345 payment_hash: PaymentHash,
346 amount_msat: u64,
347 ) {
348 tracing::info!(
349 "Received payment for hash={} of amount={} msat",
350 payment_hash,
351 amount_msat
352 );
353
354 let payment_id = match payment_id {
355 Some(id) => id,
356 None => {
357 tracing::warn!("Received payment without payment_id");
358 return;
359 }
360 };
361
362 let payment_id_hex = hex::encode(payment_id.0);
363
364 if amount_msat == 0 {
365 tracing::warn!("Payment of no amount");
366 return;
367 }
368
369 tracing::info!(
370 "Processing payment notification: id={}, amount={} msats",
371 payment_id_hex,
372 amount_msat
373 );
374
375 let payment_details = match node.payment(&payment_id) {
376 Some(details) => details,
377 None => {
378 tracing::error!("Could not find payment details for id={}", payment_id_hex);
379 return;
380 }
381 };
382
383 let (payment_identifier, payment_id) = match payment_details.kind {
384 PaymentKind::Bolt11 { hash, .. } => {
385 (PaymentIdentifier::PaymentHash(hash.0), hash.to_string())
386 }
387 PaymentKind::Bolt12Offer { hash, offer_id, .. } => match hash {
388 Some(h) => (
389 PaymentIdentifier::OfferId(offer_id.to_string()),
390 h.to_string(),
391 ),
392 None => {
393 tracing::error!("Bolt12 payment missing hash");
394 return;
395 }
396 },
397 k => {
398 tracing::warn!("Received payment of kind {:?} which is not supported", k);
399 return;
400 }
401 };
402
403 let wait_payment_response = WaitPaymentResponse {
404 payment_identifier,
405 payment_amount: Amount::new(amount_msat, CurrencyUnit::Msat),
406 payment_id,
407 };
408
409 match sender.send(wait_payment_response) {
410 Ok(_) => tracing::info!("Successfully sent payment notification to stream"),
411 Err(err) => tracing::error!(
412 "Could not send payment received notification on channel: {}",
413 err
414 ),
415 }
416 }
417
418 pub fn handle_events(&self) -> Result<(), Error> {
420 let node = self.inner.clone();
421 let sender = self.sender.clone();
422 let cancel_token = self.events_cancel_token.clone();
423
424 tracing::info!("Starting event handler task");
425
426 tokio::spawn(async move {
427 tracing::info!("Event handler loop started");
428 loop {
429 tokio::select! {
430 _ = cancel_token.cancelled() => {
431 tracing::info!("Event handler cancelled");
432 break;
433 }
434 event = node.next_event_async() => {
435 match event {
436 Event::PaymentReceived {
437 payment_id,
438 payment_hash,
439 amount_msat,
440 custom_records: _
441 } => {
442 Self::handle_payment_received(
443 &node,
444 &sender,
445 payment_id,
446 payment_hash,
447 amount_msat
448 ).await;
449 }
450 Event::PaymentFailed {
451 payment_id,
452 payment_hash,
453 reason,
454 } => {
455 tracing::error!(
456 payment_id = ?payment_id,
457 payment_hash = ?payment_hash,
458 reason = ?reason,
459 "LDK node payment failed"
460 );
461 }
462 event => {
463 tracing::debug!("Received other ldk node event: {:?}", event);
464 }
465 }
466
467 if let Err(err) = node.event_handled() {
468 tracing::error!("Error handling node event: {}", err);
469 } else {
470 tracing::debug!("Successfully handled node event");
471 }
472 }
473 }
474 }
475 tracing::info!("Event handler loop terminated");
476 });
477
478 tracing::info!("Event handler task spawned");
479 Ok(())
480 }
481
482 pub fn node(&self) -> Arc<Node> {
484 Arc::clone(&self.inner)
485 }
486}
487
488#[async_trait]
490impl MintPayment for CdkLdkNode {
491 type Err = payment::Error;
492
493 async fn start(&self) -> Result<(), Self::Err> {
496 self.start_ldk_node().map_err(|e| {
497 tracing::error!("Failed to start CdkLdkNode: {}", e);
498 e
499 })?;
500
501 tracing::info!("CdkLdkNode payment processor started successfully");
502
503 if let Some(web_addr) = self.web_addr {
505 tracing::info!("Starting LDK Node web interface on {}", web_addr);
506 self.start_web_server(web_addr).map_err(|e| {
507 tracing::error!("Failed to start web server: {}", e);
508 e
509 })?;
510 } else {
511 tracing::info!("No web server address configured, skipping web interface");
512 }
513
514 Ok(())
515 }
516
517 async fn stop(&self) -> Result<(), Self::Err> {
520 self.stop_ldk_node().map_err(|e| {
521 tracing::error!("Failed to stop CdkLdkNode: {}", e);
522 e.into()
523 })
524 }
525
526 async fn get_settings(&self) -> Result<SettingsResponse, Self::Err> {
528 let settings = SettingsResponse {
529 unit: CurrencyUnit::Msat.to_string(),
530 bolt11: Some(payment::Bolt11Settings {
531 mpp: false,
532 amountless: true,
533 invoice_description: true,
534 }),
535 bolt12: Some(payment::Bolt12Settings { amountless: true }),
536 onchain: None,
537 custom: std::collections::HashMap::new(),
538 };
539 Ok(settings)
540 }
541
542 #[instrument(skip(self))]
544 async fn create_incoming_payment_request(
545 &self,
546 options: IncomingPaymentOptions,
547 ) -> Result<CreateIncomingPaymentResponse, Self::Err> {
548 match options {
549 IncomingPaymentOptions::Bolt11(bolt11_options) => {
550 let amount_msat: Amount = bolt11_options
551 .amount
552 .convert_to(&CurrencyUnit::Msat)?
553 .into();
554 let description = bolt11_options.description.unwrap_or_default();
555 let time = match bolt11_options.unix_expiry {
556 Some(t) => t
557 .checked_sub(unix_time())
558 .ok_or(payment::Error::InvalidExpiry)?,
559 None => 36000,
560 };
561
562 let description = Bolt11InvoiceDescription::Direct(
563 Description::new(description).map_err(|_| Error::InvalidDescription)?,
564 );
565
566 let payment = self
567 .inner
568 .bolt11_payment()
569 .receive(amount_msat.into(), &description, time as u32)
570 .map_err(Error::LdkNode)?;
571
572 let payment_hash = payment.payment_hash().to_string();
573 let payment_identifier = PaymentIdentifier::PaymentHash(
574 hex::decode(&payment_hash)?
575 .try_into()
576 .map_err(|_| Error::InvalidPaymentHashLength)?,
577 );
578
579 Ok(CreateIncomingPaymentResponse {
580 request_lookup_id: payment_identifier,
581 request: payment.to_string(),
582 expiry: Some(unix_time() + time),
583 extra_json: None,
584 })
585 }
586 IncomingPaymentOptions::Bolt12(bolt12_options) => {
587 let Bolt12IncomingPaymentOptions {
588 description,
589 amount,
590 unix_expiry,
591 } = *bolt12_options;
592
593 let time = unix_expiry
594 .map(|t| {
595 t.checked_sub(unix_time())
596 .ok_or(payment::Error::InvalidExpiry)
597 .map(|t| t as u32)
598 })
599 .transpose()?;
600
601 let offer = match amount {
602 Some(amount) => {
603 let amount_msat: Amount = amount.convert_to(&CurrencyUnit::Msat)?.into();
604
605 self.inner
606 .bolt12_payment()
607 .receive(
608 amount_msat.into(),
609 &description.unwrap_or("".to_string()),
610 time,
611 None,
612 )
613 .map_err(Error::LdkNode)?
614 }
615 None => self
616 .inner
617 .bolt12_payment()
618 .receive_variable_amount(&description.unwrap_or("".to_string()), time)
619 .map_err(Error::LdkNode)?,
620 };
621 let payment_identifier = PaymentIdentifier::OfferId(offer.id().to_string());
622
623 Ok(CreateIncomingPaymentResponse {
624 request_lookup_id: payment_identifier,
625 request: offer.to_string(),
626 expiry: unix_expiry,
627 extra_json: None,
628 })
629 }
630 IncomingPaymentOptions::Custom(_) | IncomingPaymentOptions::Onchain(_) => {
631 Err(cdk_common::payment::Error::UnsupportedPaymentOption)
632 }
633 }
634 }
635
636 #[instrument(skip_all)]
639 async fn get_payment_quote(
640 &self,
641 unit: &CurrencyUnit,
642 options: OutgoingPaymentOptions,
643 ) -> Result<PaymentQuoteResponse, Self::Err> {
644 match options {
645 cdk_common::payment::OutgoingPaymentOptions::Custom(_) => {
646 Err(cdk_common::payment::Error::UnsupportedPaymentOption)
647 }
648 OutgoingPaymentOptions::Bolt11(bolt11_options) => {
649 let bolt11 = bolt11_options.bolt11;
650
651 let amount_msat = match bolt11_options.melt_options {
652 Some(MeltOptions::Amountless { amountless }) => {
653 let amount_msat = amountless.amount_msat;
654
655 if let Some(invoice_amount) = bolt11.amount_milli_satoshis() {
656 if invoice_amount != u64::from(amount_msat) {
657 return Err(payment::Error::AmountMismatch);
658 }
659 }
660
661 amount_msat
662 }
663 Some(MeltOptions::Mpp { mpp }) => mpp.amount,
664 None => bolt11
665 .amount_milli_satoshis()
666 .ok_or(Error::UnknownInvoiceAmount)?
667 .into(),
668 };
669
670 let amount =
671 Amount::new(amount_msat.into(), CurrencyUnit::Msat).convert_to(unit)?;
672
673 let relative_fee_reserve =
674 (self.fee_reserve.percent_fee_reserve * amount.value() as f32) as u64;
675
676 let absolute_fee_reserve: u64 = self.fee_reserve.min_fee_reserve.into();
677
678 let fee = match relative_fee_reserve > absolute_fee_reserve {
679 true => relative_fee_reserve,
680 false => absolute_fee_reserve,
681 };
682
683 let payment_hash = bolt11.payment_hash().to_string();
684 let payment_hash_bytes = hex::decode(&payment_hash)?
685 .try_into()
686 .map_err(|_| Error::InvalidPaymentHashLength)?;
687
688 Ok(PaymentQuoteResponse {
689 request_lookup_id: Some(PaymentIdentifier::PaymentHash(payment_hash_bytes)),
690 amount,
691 fee: Amount::new(fee, unit.clone()),
692 state: MeltQuoteState::Unpaid,
693 extra_json: None,
694 estimated_blocks: None,
695 fee_options: None,
696 })
697 }
698 OutgoingPaymentOptions::Bolt12(bolt12_options) => {
699 let offer = bolt12_options.offer;
700
701 let amount_msat = match bolt12_options.melt_options {
702 Some(melt_options) => melt_options.amount_msat(),
703 None => {
704 let amount = offer.amount().ok_or(payment::Error::AmountMismatch)?;
705
706 match amount {
707 ldk_node::lightning::offers::offer::Amount::Bitcoin {
708 amount_msats,
709 } => amount_msats.into(),
710 _ => return Err(payment::Error::AmountMismatch),
711 }
712 }
713 };
714 let amount =
715 Amount::new(amount_msat.into(), CurrencyUnit::Msat).convert_to(unit)?;
716
717 let relative_fee_reserve =
718 (self.fee_reserve.percent_fee_reserve * amount.value() as f32) as u64;
719
720 let absolute_fee_reserve: u64 = self.fee_reserve.min_fee_reserve.into();
721
722 let fee = match relative_fee_reserve > absolute_fee_reserve {
723 true => relative_fee_reserve,
724 false => absolute_fee_reserve,
725 };
726
727 Ok(PaymentQuoteResponse {
728 request_lookup_id: None,
729 amount,
730 fee: Amount::new(fee, unit.clone()),
731 state: MeltQuoteState::Unpaid,
732 extra_json: None,
733 estimated_blocks: None,
734 fee_options: None,
735 })
736 }
737 OutgoingPaymentOptions::Onchain(_) => {
738 Err(cdk_common::payment::Error::UnsupportedPaymentOption)
739 }
740 }
741 }
742
743 #[instrument(skip(self, options))]
745 async fn make_payment(
746 &self,
747 unit: &CurrencyUnit,
748 options: OutgoingPaymentOptions,
749 ) -> Result<MakePaymentResponse, Self::Err> {
750 match options {
751 cdk_common::payment::OutgoingPaymentOptions::Custom(_) => {
752 Err(cdk_common::payment::Error::UnsupportedPaymentOption)
753 }
754 OutgoingPaymentOptions::Bolt11(bolt11_options) => {
755 let bolt11 = bolt11_options.bolt11;
756
757 let send_params = match bolt11_options
758 .max_fee_amount
759 .map(|f| {
760 f.convert_to(&CurrencyUnit::Msat)
761 .map(|amount_msat| RouteParametersConfig {
762 max_total_routing_fee_msat: Some(amount_msat.value()),
763 ..Default::default()
764 })
765 })
766 .transpose()
767 {
768 Ok(params) => params,
769 Err(err) => {
770 tracing::error!("Failed to convert fee amount: {}", err);
771 return Err(payment::Error::Custom(format!("Invalid fee amount: {err}")));
772 }
773 };
774
775 let payment_id = match bolt11_options.melt_options {
776 Some(MeltOptions::Amountless { amountless }) => {
777 if let Some(invoice_amount) = bolt11.amount_milli_satoshis() {
778 if invoice_amount != u64::from(amountless.amount_msat) {
779 return Err(payment::Error::AmountMismatch);
780 }
781 }
782
783 self.inner
784 .bolt11_payment()
785 .send_using_amount(&bolt11, amountless.amount_msat.into(), send_params)
786 .map_err(|err| {
787 tracing::error!("Could not send send amountless bolt11: {}", err);
788 Error::CouldNotSendBolt11WithoutAmount
789 })?
790 }
791 None => self
792 .inner
793 .bolt11_payment()
794 .send(&bolt11, send_params)
795 .map_err(|err| {
796 tracing::error!("Could not send bolt11 {}", err);
797 Error::CouldNotSendBolt11
798 })?,
799 _ => return Err(payment::Error::UnsupportedPaymentOption),
800 };
801
802 let start = std::time::Instant::now();
804 let timeout = std::time::Duration::from_secs(10);
805
806 let (status, payment_details) = loop {
807 let details = self
808 .inner
809 .payment(&payment_id)
810 .ok_or(Error::PaymentNotFound)?;
811
812 match details.status {
813 PaymentStatus::Succeeded => break (MeltQuoteState::Paid, details),
814 PaymentStatus::Failed => {
815 tracing::error!("Failed to pay bolt11 payment.");
816 break (MeltQuoteState::Failed, details);
817 }
818 PaymentStatus::Pending => {
819 if start.elapsed() > timeout {
820 tracing::warn!(
821 "Paying bolt11 exceeded timeout 10 seconds no longer waitning."
822 );
823 break (MeltQuoteState::Pending, details);
824 }
825 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
826 continue;
827 }
828 }
829 };
830
831 let payment_proof = match payment_details.kind {
832 PaymentKind::Bolt11 {
833 hash: _,
834 preimage,
835 secret: _,
836 } => preimage.map(|p| p.to_string()),
837 _ => return Err(Error::UnexpectedPaymentKind.into()),
838 };
839
840 let total_spent = payment_details
841 .amount_msat
842 .ok_or(Error::CouldNotGetAmountSpent)?
843 + payment_details.fee_paid_msat.unwrap_or_default();
844
845 let total_spent = Amount::new(total_spent, CurrencyUnit::Msat).convert_to(unit)?;
846
847 Ok(MakePaymentResponse {
848 payment_lookup_id: PaymentIdentifier::PaymentHash(
849 bolt11.payment_hash().to_byte_array(),
850 ),
851 payment_proof,
852 status,
853 total_spent,
854 })
855 }
856 OutgoingPaymentOptions::Bolt12(bolt12_options) => {
857 let offer = bolt12_options.offer;
858
859 let send_params = match bolt12_options
860 .max_fee_amount
861 .map(|f| {
862 f.convert_to(&CurrencyUnit::Msat)
863 .map(|amount_msat| RouteParametersConfig {
864 max_total_routing_fee_msat: Some(amount_msat.value()),
865 ..Default::default()
866 })
867 })
868 .transpose()
869 {
870 Ok(params) => params,
871 Err(err) => {
872 tracing::error!("Failed to convert fee amount: {}", err);
873 return Err(payment::Error::Custom(format!("Invalid fee amount: {err}")));
874 }
875 };
876
877 let payment_id = match bolt12_options.melt_options {
878 Some(MeltOptions::Amountless { amountless }) => self
879 .inner
880 .bolt12_payment()
881 .send_using_amount(
882 &offer,
883 amountless.amount_msat.into(),
884 None,
885 None,
886 send_params,
887 )
888 .map_err(Error::LdkNode)?,
889 None => self
890 .inner
891 .bolt12_payment()
892 .send(&offer, None, None, send_params)
893 .map_err(Error::LdkNode)?,
894 _ => return Err(payment::Error::UnsupportedPaymentOption),
895 };
896
897 let start = std::time::Instant::now();
899 let timeout = std::time::Duration::from_secs(10);
900
901 let (status, payment_details) = loop {
902 let details = self
903 .inner
904 .payment(&payment_id)
905 .ok_or(Error::PaymentNotFound)?;
906
907 match details.status {
908 PaymentStatus::Succeeded => break (MeltQuoteState::Paid, details),
909 PaymentStatus::Failed => {
910 tracing::error!(
911 payment_id = %payment_id,
912 amount_msat = ?details.amount_msat,
913 fee_paid_msat = ?details.fee_paid_msat,
914 payment_kind = ?details.kind,
915 "Bolt12 payment failed"
916 );
917 break (MeltQuoteState::Failed, details);
918 }
919 PaymentStatus::Pending => {
920 if start.elapsed() > timeout {
921 tracing::warn!(
922 "Payment has been being for 10 seconds. No longer waiting"
923 );
924 break (MeltQuoteState::Pending, details);
925 }
926 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
927 continue;
928 }
929 }
930 };
931
932 let payment_proof = match payment_details.kind {
933 PaymentKind::Bolt12Offer {
934 hash: _,
935 preimage,
936 secret: _,
937 offer_id: _,
938 payer_note: _,
939 quantity: _,
940 } => preimage.map(|p| p.to_string()),
941 _ => return Err(Error::UnexpectedPaymentKind.into()),
942 };
943
944 let total_spent = payment_details
945 .amount_msat
946 .ok_or(Error::CouldNotGetAmountSpent)?
947 + payment_details.fee_paid_msat.unwrap_or_default();
948
949 let total_spent = Amount::new(total_spent, CurrencyUnit::Msat).convert_to(unit)?;
950
951 Ok(MakePaymentResponse {
952 payment_lookup_id: PaymentIdentifier::PaymentId(payment_id.0),
953 payment_proof,
954 status,
955 total_spent,
956 })
957 }
958 OutgoingPaymentOptions::Onchain(_) => {
959 Err(cdk_common::payment::Error::UnsupportedPaymentOption)
960 }
961 }
962 }
963
964 #[instrument(skip(self))]
967 async fn wait_payment_event(
968 &self,
969 ) -> Result<Pin<Box<dyn Stream<Item = cdk_common::payment::Event> + Send>>, Self::Err> {
970 tracing::info!("Starting stream for invoices - wait_any_incoming_payment called");
971
972 self.wait_invoice_is_active.store(true, Ordering::SeqCst);
974 tracing::debug!("wait_invoice_is_active set to true");
975
976 let receiver = self.receiver.clone();
977
978 tracing::info!("Receiver obtained successfully, creating response stream");
979
980 let response_stream = BroadcastStream::new(receiver.resubscribe());
982
983 let response_stream = response_stream.filter_map(|result| async move {
985 match result {
986 Ok(payment) => Some(cdk_common::payment::Event::PaymentReceived(payment)),
987 Err(err) => {
988 tracing::warn!("Error in broadcast stream: {}", err);
989 None
990 }
991 }
992 });
993
994 let cancel_token = self.wait_invoice_cancel_token.clone();
996 let is_active = self.wait_invoice_is_active.clone();
997
998 let stream = Box::pin(response_stream);
999
1000 tokio::spawn(async move {
1002 cancel_token.cancelled().await;
1003 tracing::info!("wait_invoice stream cancelled");
1004 is_active.store(false, Ordering::SeqCst);
1005 });
1006
1007 tracing::info!("wait_any_incoming_payment returning stream");
1008 Ok(stream)
1009 }
1010
1011 fn is_payment_event_stream_active(&self) -> bool {
1013 self.wait_invoice_is_active.load(Ordering::SeqCst)
1014 }
1015
1016 fn cancel_payment_event_stream(&self) {
1018 self.wait_invoice_cancel_token.cancel()
1019 }
1020
1021 async fn check_incoming_payment_status(
1023 &self,
1024 payment_identifier: &PaymentIdentifier,
1025 ) -> Result<Vec<WaitPaymentResponse>, Self::Err> {
1026 if let PaymentIdentifier::OfferId(offer_id) = payment_identifier {
1029 let payments = self.inner.list_payments_with_filter(|p| {
1030 p.direction == PaymentDirection::Inbound
1031 && p.status == PaymentStatus::Succeeded
1032 && matches!(
1033 &p.kind,
1034 PaymentKind::Bolt12Offer { offer_id: oid, .. } if oid.to_string() == *offer_id
1035 )
1036 });
1037
1038 return Ok(payments
1039 .into_iter()
1040 .filter_map(|p| {
1041 let payment_id = match &p.kind {
1042 PaymentKind::Bolt12Offer {
1043 hash: Some(hash), ..
1044 } => hash.to_string(),
1045 _ => {
1046 tracing::warn!("Bolt12 payment for offer {} missing hash", offer_id);
1047 return None;
1048 }
1049 };
1050
1051 Some(WaitPaymentResponse {
1052 payment_identifier: payment_identifier.clone(),
1053 payment_amount: Amount::new(p.amount_msat?, CurrencyUnit::Msat),
1054 payment_id,
1055 })
1056 })
1057 .collect());
1058 }
1059
1060 let payment_id_str = match payment_identifier {
1061 PaymentIdentifier::PaymentHash(hash) => hex::encode(hash),
1062 PaymentIdentifier::CustomId(id) => id.clone(),
1063 _ => return Err(Error::UnsupportedPaymentIdentifierType.into()),
1064 };
1065
1066 let payment_id = PaymentId(
1067 hex::decode(&payment_id_str)?
1068 .try_into()
1069 .map_err(|_| Error::InvalidPaymentIdLength)?,
1070 );
1071
1072 let payment_details = self
1073 .inner
1074 .payment(&payment_id)
1075 .ok_or(Error::PaymentNotFound)?;
1076
1077 if payment_details.direction == PaymentDirection::Outbound {
1078 return Err(Error::InvalidPaymentDirection.into());
1079 }
1080
1081 let amount = if payment_details.status == PaymentStatus::Succeeded {
1082 payment_details
1083 .amount_msat
1084 .ok_or(Error::CouldNotGetPaymentAmount)?
1085 } else {
1086 return Ok(vec![]);
1087 };
1088
1089 let response = WaitPaymentResponse {
1090 payment_identifier: payment_identifier.clone(),
1091 payment_amount: Amount::new(amount, CurrencyUnit::Msat),
1092 payment_id: payment_id_str,
1093 };
1094
1095 Ok(vec![response])
1096 }
1097
1098 async fn check_outgoing_payment(
1100 &self,
1101 request_lookup_id: &PaymentIdentifier,
1102 ) -> Result<MakePaymentResponse, Self::Err> {
1103 let payment_details = match request_lookup_id {
1104 PaymentIdentifier::PaymentHash(id_hash) => self
1105 .inner
1106 .list_payments_with_filter(
1107 |p| matches!(&p.kind, PaymentKind::Bolt11 { hash, .. } if &hash.0 == id_hash),
1108 )
1109 .first()
1110 .cloned(),
1111 PaymentIdentifier::PaymentId(id) => self.inner.payment(&PaymentId(*id)),
1112 _ => {
1113 return Ok(MakePaymentResponse {
1114 payment_lookup_id: request_lookup_id.clone(),
1115 payment_proof: None,
1116 status: MeltQuoteState::Unknown,
1117 total_spent: Amount::new(0, CurrencyUnit::Msat),
1118 });
1119 }
1120 }
1121 .ok_or(Error::PaymentNotFound)?;
1122
1123 if payment_details.direction != PaymentDirection::Outbound {
1124 return Err(Error::InvalidPaymentDirection.into());
1125 }
1126
1127 let status = match payment_details.status {
1128 PaymentStatus::Pending => MeltQuoteState::Pending,
1129 PaymentStatus::Succeeded => MeltQuoteState::Paid,
1130 PaymentStatus::Failed => MeltQuoteState::Failed,
1131 };
1132
1133 let payment_proof = match payment_details.kind {
1134 PaymentKind::Bolt11 { preimage, .. } => preimage.map(|p| p.to_string()),
1135 PaymentKind::Bolt12Offer { preimage, .. } => preimage.map(|p| p.to_string()),
1136 _ => return Err(Error::UnexpectedPaymentKind.into()),
1137 };
1138
1139 let total_spent = payment_details
1140 .amount_msat
1141 .ok_or(Error::CouldNotGetAmountSpent)?
1142 + payment_details.fee_paid_msat.unwrap_or_default();
1143
1144 Ok(MakePaymentResponse {
1145 payment_lookup_id: request_lookup_id.clone(),
1146 payment_proof,
1147 status,
1148 total_spent: Amount::new(total_spent, CurrencyUnit::Msat),
1149 })
1150 }
1151}
1152
1153impl Drop for CdkLdkNode {
1154 fn drop(&mut self) {
1155 tracing::info!("Drop called on CdkLdkNode");
1156 self.wait_invoice_cancel_token.cancel();
1157 tracing::debug!("Cancelled wait_invoice token in drop");
1158 }
1159}