cdk_cln/
lib.rs

1//! CDK lightning backend for CLN
2
3#![doc = include_str!("../README.md")]
4#![warn(missing_docs)]
5#![warn(rustdoc::bare_urls)]
6
7use std::cmp::max;
8use std::path::PathBuf;
9use std::pin::Pin;
10use std::str::FromStr;
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::Arc;
13use std::time::Duration;
14
15use async_trait::async_trait;
16use bitcoin::hashes::sha256::Hash;
17use cdk_common::amount::{to_unit, Amount};
18use cdk_common::common::FeeReserve;
19use cdk_common::database::mint::DynMintKVStore;
20use cdk_common::nuts::{CurrencyUnit, MeltOptions, MeltQuoteState};
21use cdk_common::payment::{
22    self, Bolt11IncomingPaymentOptions, Bolt11Settings, Bolt12IncomingPaymentOptions,
23    CreateIncomingPaymentResponse, Event, IncomingPaymentOptions, MakePaymentResponse, MintPayment,
24    OutgoingPaymentOptions, PaymentIdentifier, PaymentQuoteResponse, WaitPaymentResponse,
25};
26use cdk_common::util::{hex, unix_time};
27use cdk_common::Bolt11Invoice;
28use cln_rpc::model::requests::{
29    DecodeRequest, FetchinvoiceRequest, InvoiceRequest, ListinvoicesRequest, ListpaysRequest,
30    OfferRequest, PayRequest, WaitanyinvoiceRequest,
31};
32use cln_rpc::model::responses::{
33    DecodeResponse, ListinvoicesInvoices, ListinvoicesInvoicesStatus, ListpaysPaysStatus,
34    PayStatus, WaitanyinvoiceResponse, WaitanyinvoiceStatus,
35};
36use cln_rpc::primitives::{Amount as CLN_Amount, AmountOrAny, Sha256};
37use cln_rpc::ClnRpc;
38use error::Error;
39use futures::{Stream, StreamExt};
40use serde_json::Value;
41use tokio_util::sync::CancellationToken;
42use tracing::instrument;
43use uuid::Uuid;
44
45pub mod error;
46
47// KV Store constants for CLN
48const CLN_KV_PRIMARY_NAMESPACE: &str = "cdk_cln_lightning_backend";
49const CLN_KV_SECONDARY_NAMESPACE: &str = "payment_indices";
50const LAST_PAY_INDEX_KV_KEY: &str = "last_pay_index";
51
52/// CLN mint backend
53#[derive(Clone)]
54pub struct Cln {
55    rpc_socket: PathBuf,
56    fee_reserve: FeeReserve,
57    wait_invoice_cancel_token: CancellationToken,
58    wait_invoice_is_active: Arc<AtomicBool>,
59    kv_store: DynMintKVStore,
60}
61
62impl Cln {
63    /// Create new [`Cln`]
64    pub async fn new(
65        rpc_socket: PathBuf,
66        fee_reserve: FeeReserve,
67        kv_store: DynMintKVStore,
68    ) -> Result<Self, Error> {
69        Ok(Self {
70            rpc_socket,
71            fee_reserve,
72            wait_invoice_cancel_token: CancellationToken::new(),
73            wait_invoice_is_active: Arc::new(AtomicBool::new(false)),
74            kv_store,
75        })
76    }
77}
78
79#[async_trait]
80impl MintPayment for Cln {
81    type Err = payment::Error;
82
83    async fn get_settings(&self) -> Result<Value, Self::Err> {
84        Ok(serde_json::to_value(Bolt11Settings {
85            mpp: true,
86            unit: CurrencyUnit::Msat,
87            invoice_description: true,
88            amountless: true,
89            bolt12: true,
90        })?)
91    }
92
93    /// Is wait invoice active
94    fn is_wait_invoice_active(&self) -> bool {
95        self.wait_invoice_is_active.load(Ordering::SeqCst)
96    }
97
98    /// Cancel wait invoice
99    fn cancel_wait_invoice(&self) {
100        self.wait_invoice_cancel_token.cancel()
101    }
102
103    #[instrument(skip_all)]
104    async fn wait_payment_event(
105        &self,
106    ) -> Result<Pin<Box<dyn Stream<Item = Event> + Send>>, Self::Err> {
107        tracing::info!(
108            "CLN: Starting wait_any_incoming_payment with socket: {:?}",
109            self.rpc_socket
110        );
111
112        let last_pay_index = self.get_last_pay_index().await?.inspect(|&idx| {
113            tracing::info!("CLN: Found last payment index: {}", idx);
114        });
115
116        tracing::debug!("CLN: Connecting to CLN node...");
117        let cln_client = match cln_rpc::ClnRpc::new(&self.rpc_socket).await {
118            Ok(client) => {
119                tracing::debug!("CLN: Successfully connected to CLN node");
120                client
121            }
122            Err(err) => {
123                tracing::error!("CLN: Failed to connect to CLN node: {}", err);
124                return Err(Error::from(err).into());
125            }
126        };
127
128        tracing::debug!("CLN: Creating stream processing pipeline");
129        let kv_store = self.kv_store.clone();
130        let stream = futures::stream::unfold(
131            (
132                cln_client,
133                last_pay_index,
134                self.wait_invoice_cancel_token.clone(),
135                Arc::clone(&self.wait_invoice_is_active),
136                kv_store,
137            ),
138            |(mut cln_client, mut last_pay_idx, cancel_token, is_active, kv_store)| async move {
139                // Set the stream as active
140                is_active.store(true, Ordering::SeqCst);
141                tracing::debug!("CLN: Stream is now active, waiting for invoice events with lastpay_index: {:?}", last_pay_idx);
142
143                loop {
144                    tokio::select! {
145                        _ = cancel_token.cancelled() => {
146                            // Set the stream as inactive
147                            is_active.store(false, Ordering::SeqCst);
148                            tracing::info!("CLN: Invoice stream cancelled");
149                            // End the stream
150                            return None;
151                        }
152                        result = cln_client.call(cln_rpc::Request::WaitAnyInvoice(WaitanyinvoiceRequest {
153                            timeout: None,
154                            lastpay_index: last_pay_idx,
155                        })) => {
156                            tracing::debug!("CLN: Received response from WaitAnyInvoice call");
157                            match result {
158                                Ok(invoice) => {
159                                    tracing::debug!("CLN: Successfully received invoice data");
160                                        // Try to convert the invoice to WaitanyinvoiceResponse
161                            let wait_any_response_result: Result<WaitanyinvoiceResponse, _> =
162                                invoice.try_into();
163
164                            let wait_any_response = match wait_any_response_result {
165                                Ok(response) => {
166                                    tracing::debug!("CLN: Parsed WaitAnyInvoice response successfully");
167                                    response
168                                }
169                                Err(e) => {
170                                    tracing::warn!(
171                                        "CLN: Failed to parse WaitAnyInvoice response: {:?}",
172                                        e
173                                    );
174                                    // Continue to the next iteration without panicking
175                                    continue;
176                                }
177                            };
178
179                            // Check the status of the invoice
180                            // We only want to yield invoices that have been paid
181                            match wait_any_response.status {
182                                WaitanyinvoiceStatus::PAID => {
183                                    tracing::info!("CLN: Invoice with payment index {} is PAID", 
184                                                 wait_any_response.pay_index.unwrap_or_default());
185                                }
186                                WaitanyinvoiceStatus::EXPIRED => {
187                                    tracing::debug!("CLN: Invoice with payment index {} is EXPIRED, skipping", 
188                                                  wait_any_response.pay_index.unwrap_or_default());
189                                    continue;
190                                }
191                            }
192
193                            last_pay_idx = wait_any_response.pay_index;
194                            tracing::debug!("CLN: Updated last_pay_idx to {:?}", last_pay_idx);
195
196
197                            // Store the updated pay index in KV store for persistence
198                            if let Some(pay_index) = last_pay_idx {
199                                let index_str = pay_index.to_string();
200                                if let Ok(mut tx) = kv_store.begin_transaction().await {
201                                    if let Err(e) = tx.kv_write(CLN_KV_PRIMARY_NAMESPACE, CLN_KV_SECONDARY_NAMESPACE, LAST_PAY_INDEX_KV_KEY, index_str.as_bytes()).await {
202                                        tracing::warn!("CLN: Failed to write last pay index {} to KV store: {}", pay_index, e);
203                                    } else if let Err(e) = tx.commit().await {
204                                        tracing::warn!("CLN: Failed to commit last pay index {} to KV store: {}", pay_index, e);
205                                    } else {
206                                        tracing::debug!("CLN: Stored last pay index {} in KV store", pay_index);
207                                    }
208                                } else {
209                                    tracing::warn!("CLN: Failed to begin KV transaction for storing pay index {}", pay_index);
210                                }
211                            }
212
213                            let payment_hash = wait_any_response.payment_hash;
214                            tracing::debug!("CLN: Payment hash: {}", payment_hash);
215
216                            let amount_msats = match wait_any_response.amount_received_msat {
217                                Some(amt) => {
218                                    tracing::info!("CLN: Received payment of {} msats for {}", 
219                                                 amt.msat(), payment_hash);
220                                    amt
221                                }
222                                None => {
223                                    tracing::error!("CLN: No amount in paid invoice, this should not happen");
224                                    continue;
225                                }
226                            };
227
228                            let payment_hash = Hash::from_bytes_ref(payment_hash.as_ref());
229
230                            let request_lookup_id = match wait_any_response.bolt12 {
231                                // If it is a bolt12 payment we need to get the offer_id as this is what we use as the request look up.
232                                // Since this is not returned in the wait any response,
233                                // we need to do a second query for it.
234                                Some(bolt12) => {
235                                    tracing::info!("CLN: Processing BOLT12 payment, bolt12 value: {}", bolt12);
236                                    match fetch_invoice_by_payment_hash(
237                                        &mut cln_client,
238                                        payment_hash,
239                                    )
240                                    .await
241                                    {
242                                        Ok(Some(invoice)) => {
243                                            if let Some(local_offer_id) = invoice.local_offer_id {
244                                                tracing::info!("CLN: Received bolt12 payment of {} msats for offer {}", 
245                                                             amount_msats.msat(), local_offer_id);
246                                                PaymentIdentifier::OfferId(local_offer_id.to_string())
247                                            } else {
248                                                tracing::warn!("CLN: BOLT12 invoice has no local_offer_id, skipping");
249                                                continue;
250                                            }
251                                        }
252                                        Ok(None) => {
253                                            tracing::warn!("CLN: Failed to find invoice by payment hash, skipping");
254                                            continue;
255                                        }
256                                        Err(e) => {
257                                            tracing::warn!(
258                                                "CLN: Error fetching invoice by payment hash: {e}"
259                                            );
260                                            continue;
261                                        }
262                                    }
263                                }
264                                None => {
265                                 tracing::info!("CLN: Processing BOLT11 payment with hash {}", payment_hash);
266                                 PaymentIdentifier::PaymentHash(*payment_hash.as_ref())
267                                },
268                            };
269
270                            let response = WaitPaymentResponse {
271                                payment_identifier: request_lookup_id,
272                                payment_amount: amount_msats.msat().into(),
273                                unit: CurrencyUnit::Msat,
274                                payment_id: payment_hash.to_string()
275                            };
276                            tracing::info!("CLN: Created WaitPaymentResponse with amount {} msats", amount_msats.msat());
277                            let event = Event::PaymentReceived(response);
278
279                            break Some((event, (cln_client, last_pay_idx, cancel_token, is_active, kv_store)));
280                                }
281                                Err(e) => {
282                                    tracing::warn!("CLN: Error fetching invoice: {e}");
283                                    tokio::time::sleep(Duration::from_secs(1)).await;
284                                    continue;
285                                }
286                            }
287                        }
288                    }
289                }
290            },
291        )
292        .boxed();
293
294        tracing::info!("CLN: Successfully initialized invoice stream");
295        Ok(stream)
296    }
297
298    #[instrument(skip_all)]
299    async fn get_payment_quote(
300        &self,
301        unit: &CurrencyUnit,
302        options: OutgoingPaymentOptions,
303    ) -> Result<PaymentQuoteResponse, Self::Err> {
304        match options {
305            OutgoingPaymentOptions::Bolt11(bolt11_options) => {
306                // If we have specific amount options, use those
307                let amount_msat: Amount = if let Some(melt_options) = bolt11_options.melt_options {
308                    match melt_options {
309                        MeltOptions::Amountless { amountless } => {
310                            let amount_msat = amountless.amount_msat;
311
312                            if let Some(invoice_amount) =
313                                bolt11_options.bolt11.amount_milli_satoshis()
314                            {
315                                if !invoice_amount == u64::from(amount_msat) {
316                                    return Err(payment::Error::AmountMismatch);
317                                }
318                            }
319                            amount_msat
320                        }
321                        MeltOptions::Mpp { mpp } => mpp.amount,
322                    }
323                } else {
324                    // Fall back to invoice amount
325                    bolt11_options
326                        .bolt11
327                        .amount_milli_satoshis()
328                        .ok_or(Error::UnknownInvoiceAmount)?
329                        .into()
330                };
331                // Convert to target unit
332                let amount = to_unit(amount_msat, &CurrencyUnit::Msat, unit)?;
333
334                // Calculate fee
335                let relative_fee_reserve =
336                    (self.fee_reserve.percent_fee_reserve * u64::from(amount) as f32) as u64;
337                let absolute_fee_reserve: u64 = self.fee_reserve.min_fee_reserve.into();
338                let fee = max(relative_fee_reserve, absolute_fee_reserve);
339
340                Ok(PaymentQuoteResponse {
341                    request_lookup_id: Some(PaymentIdentifier::PaymentHash(
342                        *bolt11_options.bolt11.payment_hash().as_ref(),
343                    )),
344                    amount,
345                    fee: fee.into(),
346                    state: MeltQuoteState::Unpaid,
347                    unit: unit.clone(),
348                })
349            }
350            OutgoingPaymentOptions::Bolt12(bolt12_options) => {
351                let offer = bolt12_options.offer;
352
353                let amount_msat: u64 = if let Some(amount) = bolt12_options.melt_options {
354                    amount.amount_msat().into()
355                } else {
356                    // Fall back to offer amount
357                    let decode_response = self.decode_string(offer.to_string()).await?;
358
359                    decode_response
360                        .offer_amount_msat
361                        .ok_or(Error::UnknownInvoiceAmount)?
362                        .msat()
363                };
364
365                // Convert to target unit
366                let amount = to_unit(amount_msat, &CurrencyUnit::Msat, unit)?;
367
368                // Calculate fee
369                let relative_fee_reserve =
370                    (self.fee_reserve.percent_fee_reserve * u64::from(amount) as f32) as u64;
371                let absolute_fee_reserve: u64 = self.fee_reserve.min_fee_reserve.into();
372                let fee = max(relative_fee_reserve, absolute_fee_reserve);
373
374                Ok(PaymentQuoteResponse {
375                    request_lookup_id: None,
376                    amount,
377                    fee: fee.into(),
378                    state: MeltQuoteState::Unpaid,
379                    unit: unit.clone(),
380                })
381            }
382        }
383    }
384
385    #[instrument(skip_all)]
386    async fn make_payment(
387        &self,
388        unit: &CurrencyUnit,
389        options: OutgoingPaymentOptions,
390    ) -> Result<MakePaymentResponse, Self::Err> {
391        let max_fee_msat: Option<u64>;
392        let mut partial_amount: Option<u64> = None;
393        let mut amount_msat: Option<u64> = None;
394
395        let mut cln_client = self.cln_client().await?;
396
397        let invoice = match &options {
398            OutgoingPaymentOptions::Bolt11(bolt11_options) => {
399                let payment_identifier =
400                    PaymentIdentifier::PaymentHash(*bolt11_options.bolt11.payment_hash().as_ref());
401
402                self.check_outgoing_unpaided(&payment_identifier).await?;
403
404                if let Some(melt_options) = bolt11_options.melt_options {
405                    match melt_options {
406                        MeltOptions::Mpp { mpp } => partial_amount = Some(mpp.amount.into()),
407                        MeltOptions::Amountless { amountless } => {
408                            amount_msat = Some(amountless.amount_msat.into());
409                        }
410                    }
411                }
412
413                max_fee_msat = bolt11_options.max_fee_amount.map(|a| a.into());
414
415                bolt11_options.bolt11.to_string()
416            }
417            OutgoingPaymentOptions::Bolt12(bolt12_options) => {
418                let offer = &bolt12_options.offer;
419
420                let amount_msat: u64 = if let Some(amount) = bolt12_options.melt_options {
421                    amount.amount_msat().into()
422                } else {
423                    // Fall back to offer amount
424                    let decode_response = self.decode_string(offer.to_string()).await?;
425
426                    decode_response
427                        .offer_amount_msat
428                        .ok_or(Error::UnknownInvoiceAmount)?
429                        .msat()
430                };
431
432                // Fetch invoice from offer
433
434                let cln_response = cln_client
435                    .call_typed(&FetchinvoiceRequest {
436                        amount_msat: Some(CLN_Amount::from_msat(amount_msat)),
437                        payer_metadata: None,
438                        payer_note: None,
439                        quantity: None,
440                        recurrence_counter: None,
441                        recurrence_label: None,
442                        recurrence_start: None,
443                        timeout: None,
444                        offer: offer.to_string(),
445                        bip353: None,
446                    })
447                    .await
448                    .map_err(|err| {
449                        tracing::error!("Could not fetch invoice for offer: {:?}", err);
450                        Error::ClnRpc(err)
451                    })?;
452
453                let decode_response = self.decode_string(cln_response.invoice.clone()).await?;
454
455                let payment_identifier = PaymentIdentifier::Bolt12PaymentHash(
456                    hex::decode(
457                        decode_response
458                            .invoice_payment_hash
459                            .ok_or(Error::UnknownInvoice)?,
460                    )
461                    .map_err(|e| Error::Bolt12(e.to_string()))?
462                    .try_into()
463                    .map_err(|_| Error::InvalidHash)?,
464                );
465
466                self.check_outgoing_unpaided(&payment_identifier).await?;
467
468                max_fee_msat = bolt12_options.max_fee_amount.map(|a| a.into());
469
470                cln_response.invoice
471            }
472        };
473
474        let cln_response = cln_client
475            .call_typed(&PayRequest {
476                bolt11: invoice,
477                amount_msat: amount_msat.map(CLN_Amount::from_msat),
478                label: None,
479                riskfactor: None,
480                maxfeepercent: None,
481                retry_for: None,
482                maxdelay: None,
483                exemptfee: None,
484                localinvreqid: None,
485                exclude: None,
486                maxfee: max_fee_msat.map(CLN_Amount::from_msat),
487                description: None,
488                partial_msat: partial_amount.map(CLN_Amount::from_msat),
489            })
490            .await;
491
492        let response = match cln_response {
493            Ok(pay_response) => {
494                let status = match pay_response.status {
495                    PayStatus::COMPLETE => MeltQuoteState::Paid,
496                    PayStatus::PENDING => MeltQuoteState::Pending,
497                    PayStatus::FAILED => MeltQuoteState::Failed,
498                };
499
500                let payment_identifier = match options {
501                    OutgoingPaymentOptions::Bolt11(_) => {
502                        PaymentIdentifier::PaymentHash(*pay_response.payment_hash.as_ref())
503                    }
504                    OutgoingPaymentOptions::Bolt12(_) => {
505                        PaymentIdentifier::Bolt12PaymentHash(*pay_response.payment_hash.as_ref())
506                    }
507                };
508
509                MakePaymentResponse {
510                    payment_proof: Some(hex::encode(pay_response.payment_preimage.to_vec())),
511                    payment_lookup_id: payment_identifier,
512                    status,
513                    total_spent: to_unit(
514                        pay_response.amount_sent_msat.msat(),
515                        &CurrencyUnit::Msat,
516                        unit,
517                    )?,
518                    unit: unit.clone(),
519                }
520            }
521            Err(err) => {
522                tracing::error!("Could not pay invoice: {}", err);
523                return Err(Error::ClnRpc(err).into());
524            }
525        };
526
527        Ok(response)
528    }
529
530    #[instrument(skip_all)]
531    async fn create_incoming_payment_request(
532        &self,
533        unit: &CurrencyUnit,
534        options: IncomingPaymentOptions,
535    ) -> Result<CreateIncomingPaymentResponse, Self::Err> {
536        match options {
537            IncomingPaymentOptions::Bolt11(Bolt11IncomingPaymentOptions {
538                description,
539                amount,
540                unix_expiry,
541            }) => {
542                let time_now = unix_time();
543
544                let mut cln_client = self.cln_client().await?;
545
546                let label = Uuid::new_v4().to_string();
547
548                let amount = to_unit(amount, unit, &CurrencyUnit::Msat)?;
549                let amount_msat = AmountOrAny::Amount(CLN_Amount::from_msat(amount.into()));
550
551                let invoice_response = cln_client
552                    .call_typed(&InvoiceRequest {
553                        amount_msat,
554                        description: description.unwrap_or_default(),
555                        label: label.clone(),
556                        expiry: unix_expiry.map(|t| t - time_now),
557                        fallbacks: None,
558                        preimage: None,
559                        cltv: None,
560                        deschashonly: None,
561                        exposeprivatechannels: None,
562                    })
563                    .await
564                    .map_err(Error::from)?;
565
566                let request = Bolt11Invoice::from_str(&invoice_response.bolt11)?;
567                let expiry = request.expires_at().map(|t| t.as_secs());
568                let payment_hash = request.payment_hash();
569
570                Ok(CreateIncomingPaymentResponse {
571                    request_lookup_id: PaymentIdentifier::PaymentHash(*payment_hash.as_ref()),
572                    request: request.to_string(),
573                    expiry,
574                })
575            }
576            IncomingPaymentOptions::Bolt12(bolt12_options) => {
577                let Bolt12IncomingPaymentOptions {
578                    description,
579                    amount,
580                    unix_expiry,
581                } = *bolt12_options;
582                let mut cln_client = self.cln_client().await?;
583
584                let label = Uuid::new_v4().to_string();
585
586                // Match like this until we change to option
587                let amount = match amount {
588                    Some(amount) => {
589                        let amount = to_unit(amount, unit, &CurrencyUnit::Msat)?;
590
591                        amount.to_string()
592                    }
593                    None => "any".to_string(),
594                };
595
596                // It seems that the only way to force cln to create a unique offer
597                // is to encode some random data in the offer
598                let issuer = Uuid::new_v4().to_string();
599
600                let offer_response = cln_client
601                    .call_typed(&OfferRequest {
602                        amount,
603                        absolute_expiry: unix_expiry,
604                        description: Some(description.unwrap_or_default()),
605                        issuer: Some(issuer.to_string()),
606                        label: Some(label.to_string()),
607                        single_use: None,
608                        quantity_max: None,
609                        recurrence: None,
610                        recurrence_base: None,
611                        recurrence_limit: None,
612                        recurrence_paywindow: None,
613                        recurrence_start_any_period: None,
614                    })
615                    .await
616                    .map_err(Error::from)?;
617
618                Ok(CreateIncomingPaymentResponse {
619                    request_lookup_id: PaymentIdentifier::OfferId(
620                        offer_response.offer_id.to_string(),
621                    ),
622                    request: offer_response.bolt12,
623                    expiry: unix_expiry,
624                })
625            }
626        }
627    }
628
629    #[instrument(skip(self))]
630    async fn check_incoming_payment_status(
631        &self,
632        payment_identifier: &PaymentIdentifier,
633    ) -> Result<Vec<WaitPaymentResponse>, Self::Err> {
634        let mut cln_client = self.cln_client().await?;
635
636        let listinvoices_response = match payment_identifier {
637            PaymentIdentifier::Label(label) => {
638                // Query by label
639                cln_client
640                    .call_typed(&ListinvoicesRequest {
641                        payment_hash: None,
642                        label: Some(label.to_string()),
643                        invstring: None,
644                        offer_id: None,
645                        index: None,
646                        limit: None,
647                        start: None,
648                    })
649                    .await
650                    .map_err(Error::from)?
651            }
652            PaymentIdentifier::OfferId(offer_id) => {
653                // Query by offer_id
654                cln_client
655                    .call_typed(&ListinvoicesRequest {
656                        payment_hash: None,
657                        label: None,
658                        invstring: None,
659                        offer_id: Some(offer_id.to_string()),
660                        index: None,
661                        limit: None,
662                        start: None,
663                    })
664                    .await
665                    .map_err(Error::from)?
666            }
667            PaymentIdentifier::PaymentHash(payment_hash) => {
668                // Query by payment_hash
669                cln_client
670                    .call_typed(&ListinvoicesRequest {
671                        payment_hash: Some(hex::encode(payment_hash)),
672                        label: None,
673                        invstring: None,
674                        offer_id: None,
675                        index: None,
676                        limit: None,
677                        start: None,
678                    })
679                    .await
680                    .map_err(Error::from)?
681            }
682            _ => {
683                tracing::error!("Unsupported payment id for CLN");
684                return Err(payment::Error::UnknownPaymentState);
685            }
686        };
687
688        Ok(listinvoices_response
689            .invoices
690            .iter()
691            .filter(|p| p.status == ListinvoicesInvoicesStatus::PAID)
692            .filter(|p| p.amount_msat.is_some()) // Filter out invoices without an amount
693            .map(|p| WaitPaymentResponse {
694                payment_identifier: payment_identifier.clone(),
695                payment_amount: p
696                    .amount_msat
697                    // Safe to expect since we filtered for Some
698                    .expect("We have filter out those without amounts")
699                    .msat()
700                    .into(),
701                unit: CurrencyUnit::Msat,
702                payment_id: p.payment_hash.to_string(),
703            })
704            .collect())
705    }
706
707    #[instrument(skip(self))]
708    async fn check_outgoing_payment(
709        &self,
710        payment_identifier: &PaymentIdentifier,
711    ) -> Result<MakePaymentResponse, Self::Err> {
712        let mut cln_client = self.cln_client().await?;
713
714        let payment_hash = match payment_identifier {
715            PaymentIdentifier::PaymentHash(hash) => hash,
716            PaymentIdentifier::Bolt12PaymentHash(hash) => hash,
717            _ => {
718                tracing::error!("Unsupported identifier to check outgoing payment for cln.");
719                return Err(payment::Error::UnknownPaymentState);
720            }
721        };
722
723        let listpays_response = cln_client
724            .call_typed(&ListpaysRequest {
725                payment_hash: Some(*Sha256::from_bytes_ref(payment_hash)),
726                bolt11: None,
727                status: None,
728                start: None,
729                index: None,
730                limit: None,
731            })
732            .await
733            .map_err(Error::from)?;
734
735        match listpays_response.pays.first() {
736            Some(pays_response) => {
737                let status = cln_pays_status_to_mint_state(pays_response.status);
738
739                Ok(MakePaymentResponse {
740                    payment_lookup_id: payment_identifier.clone(),
741                    payment_proof: pays_response.preimage.map(|p| hex::encode(p.to_vec())),
742                    status,
743                    total_spent: pays_response
744                        .amount_sent_msat
745                        .map_or(Amount::ZERO, |a| a.msat().into()),
746                    unit: CurrencyUnit::Msat,
747                })
748            }
749            None => Ok(MakePaymentResponse {
750                payment_lookup_id: payment_identifier.clone(),
751                payment_proof: None,
752                status: MeltQuoteState::Unknown,
753                total_spent: Amount::ZERO,
754                unit: CurrencyUnit::Msat,
755            }),
756        }
757    }
758}
759
760impl Cln {
761    async fn cln_client(&self) -> Result<ClnRpc, Error> {
762        Ok(cln_rpc::ClnRpc::new(&self.rpc_socket).await?)
763    }
764
765    /// Get last pay index for cln
766    async fn get_last_pay_index(&self) -> Result<Option<u64>, Error> {
767        // First try to read from KV store
768        if let Some(stored_index) = self
769            .kv_store
770            .kv_read(
771                CLN_KV_PRIMARY_NAMESPACE,
772                CLN_KV_SECONDARY_NAMESPACE,
773                LAST_PAY_INDEX_KV_KEY,
774            )
775            .await
776            .map_err(|e| Error::Database(e.to_string()))?
777        {
778            if let Ok(index_str) = std::str::from_utf8(&stored_index) {
779                if let Ok(index) = index_str.parse::<u64>() {
780                    tracing::debug!("CLN: Retrieved last pay index {} from KV store", index);
781                    return Ok(Some(index));
782                }
783            }
784        }
785
786        // Fall back to querying CLN directly
787        tracing::debug!("CLN: No stored last pay index found in KV store, querying CLN directly");
788        let mut cln_client = self.cln_client().await?;
789        let listinvoices_response = cln_client
790            .call_typed(&ListinvoicesRequest {
791                index: None,
792                invstring: None,
793                label: None,
794                limit: None,
795                offer_id: None,
796                payment_hash: None,
797                start: None,
798            })
799            .await
800            .map_err(Error::from)?;
801
802        match listinvoices_response.invoices.last() {
803            Some(last_invoice) => Ok(last_invoice.pay_index),
804            None => Ok(None),
805        }
806    }
807
808    /// Decode string
809    #[instrument(skip(self))]
810    async fn decode_string(&self, string: String) -> Result<DecodeResponse, Error> {
811        let mut cln_client = self.cln_client().await?;
812
813        cln_client
814            .call_typed(&DecodeRequest { string })
815            .await
816            .map_err(|err| {
817                tracing::error!("Could not fetch invoice for offer: {:?}", err);
818                Error::ClnRpc(err)
819            })
820    }
821
822    /// Checks that outgoing payment is not already paid
823    #[instrument(skip(self))]
824    async fn check_outgoing_unpaided(
825        &self,
826        payment_identifier: &PaymentIdentifier,
827    ) -> Result<(), payment::Error> {
828        let pay_state = self.check_outgoing_payment(payment_identifier).await?;
829
830        match pay_state.status {
831            MeltQuoteState::Unpaid | MeltQuoteState::Unknown | MeltQuoteState::Failed => Ok(()),
832            MeltQuoteState::Paid => {
833                tracing::debug!("Melt attempted on invoice already paid");
834                Err(payment::Error::InvoiceAlreadyPaid)
835            }
836            MeltQuoteState::Pending => {
837                tracing::debug!("Melt attempted on invoice already pending");
838                Err(payment::Error::InvoicePaymentPending)
839            }
840        }
841    }
842}
843
844fn cln_pays_status_to_mint_state(status: ListpaysPaysStatus) -> MeltQuoteState {
845    match status {
846        ListpaysPaysStatus::PENDING => MeltQuoteState::Pending,
847        ListpaysPaysStatus::COMPLETE => MeltQuoteState::Paid,
848        ListpaysPaysStatus::FAILED => MeltQuoteState::Failed,
849    }
850}
851
852async fn fetch_invoice_by_payment_hash(
853    cln_client: &mut cln_rpc::ClnRpc,
854    payment_hash: &Hash,
855) -> Result<Option<ListinvoicesInvoices>, Error> {
856    tracing::debug!("Fetching invoice by payment hash: {}", payment_hash);
857
858    let payment_hash_str = payment_hash.to_string();
859    tracing::debug!("Payment hash string: {}", payment_hash_str);
860
861    let request = ListinvoicesRequest {
862        payment_hash: Some(payment_hash_str),
863        index: None,
864        invstring: None,
865        label: None,
866        limit: None,
867        offer_id: None,
868        start: None,
869    };
870    tracing::debug!("Created ListinvoicesRequest");
871
872    match cln_client.call_typed(&request).await {
873        Ok(invoice_response) => {
874            let invoice_count = invoice_response.invoices.len();
875            tracing::debug!(
876                "Received {} invoices for payment hash {}",
877                invoice_count,
878                payment_hash
879            );
880
881            if invoice_count > 0 {
882                let first_invoice = invoice_response.invoices.first().cloned();
883                if let Some(invoice) = &first_invoice {
884                    tracing::debug!("Found invoice with payment hash {}", payment_hash);
885                    tracing::debug!(
886                        "Invoice details - local_offer_id: {:?}, status: {:?}",
887                        invoice.local_offer_id,
888                        invoice.status
889                    );
890                } else {
891                    tracing::warn!("No invoice found with payment hash {}", payment_hash);
892                }
893                Ok(first_invoice)
894            } else {
895                tracing::warn!("No invoices returned for payment hash {}", payment_hash);
896                Ok(None)
897            }
898        }
899        Err(e) => {
900            tracing::error!(
901                "Error fetching invoice by payment hash {}: {}",
902                payment_hash,
903                e
904            );
905            Err(Error::from(e))
906        }
907    }
908}