cdk_lnd/
lib.rs

1//! CDK lightning backend for LND
2
3// Copyright (c) 2023 Steffen (MIT)
4
5#![doc = include_str!("../README.md")]
6#![warn(missing_docs)]
7#![warn(rustdoc::bare_urls)]
8
9use std::cmp::max;
10use std::path::PathBuf;
11use std::pin::Pin;
12use std::str::FromStr;
13use std::sync::atomic::{AtomicBool, Ordering};
14use std::sync::Arc;
15
16use anyhow::anyhow;
17use async_trait::async_trait;
18use cdk_common::amount::{to_unit, Amount, MSAT_IN_SAT};
19use cdk_common::bitcoin::hashes::Hash;
20use cdk_common::common::FeeReserve;
21use cdk_common::database::mint::DynMintKVStore;
22use cdk_common::nuts::{CurrencyUnit, MeltOptions, MeltQuoteState};
23use cdk_common::payment::{
24    self, Bolt11Settings, CreateIncomingPaymentResponse, Event, IncomingPaymentOptions,
25    MakePaymentResponse, MintPayment, OutgoingPaymentOptions, PaymentIdentifier,
26    PaymentQuoteResponse, WaitPaymentResponse,
27};
28use cdk_common::util::hex;
29use cdk_common::Bolt11Invoice;
30use error::Error;
31use futures::{Stream, StreamExt};
32use lnrpc::fee_limit::Limit;
33use lnrpc::payment::PaymentStatus;
34use lnrpc::{FeeLimit, Hop, MppRecord};
35use tokio_util::sync::CancellationToken;
36use tracing::instrument;
37
38mod client;
39pub mod error;
40
41mod proto;
42pub(crate) use proto::{lnrpc, routerrpc};
43
44use crate::lnrpc::invoice::InvoiceState;
45
46/// LND KV Store constants
47const LND_KV_PRIMARY_NAMESPACE: &str = "cdk_lnd_lightning_backend";
48const LND_KV_SECONDARY_NAMESPACE: &str = "payment_indices";
49const LAST_ADD_INDEX_KV_KEY: &str = "last_add_index";
50const LAST_SETTLE_INDEX_KV_KEY: &str = "last_settle_index";
51
52/// Lnd mint backend
53#[derive(Clone)]
54pub struct Lnd {
55    _address: String,
56    _cert_file: PathBuf,
57    _macaroon_file: PathBuf,
58    lnd_client: client::Client,
59    fee_reserve: FeeReserve,
60    kv_store: DynMintKVStore,
61    wait_invoice_cancel_token: CancellationToken,
62    wait_invoice_is_active: Arc<AtomicBool>,
63    settings: Bolt11Settings,
64}
65
66impl Lnd {
67    /// Maximum number of attempts at a partial payment
68    pub const MAX_ROUTE_RETRIES: usize = 50;
69
70    /// Create new [`Lnd`]
71    pub async fn new(
72        address: String,
73        cert_file: PathBuf,
74        macaroon_file: PathBuf,
75        fee_reserve: FeeReserve,
76        kv_store: DynMintKVStore,
77    ) -> Result<Self, Error> {
78        // Validate address is not empty
79        if address.is_empty() {
80            return Err(Error::InvalidConfig("LND address cannot be empty".into()));
81        }
82
83        // Validate cert_file exists and is not empty
84        if !cert_file.exists() || cert_file.metadata().map(|m| m.len() == 0).unwrap_or(true) {
85            return Err(Error::InvalidConfig(format!(
86                "LND certificate file not found or empty: {cert_file:?}"
87            )));
88        }
89
90        // Validate macaroon_file exists and is not empty
91        if !macaroon_file.exists()
92            || macaroon_file
93                .metadata()
94                .map(|m| m.len() == 0)
95                .unwrap_or(true)
96        {
97            return Err(Error::InvalidConfig(format!(
98                "LND macaroon file not found or empty: {macaroon_file:?}"
99            )));
100        }
101
102        let lnd_client = client::connect(&address, &cert_file, &macaroon_file)
103            .await
104            .map_err(|err| {
105                tracing::error!("Connection error: {}", err.to_string());
106                Error::Connection
107            })
108            .unwrap();
109
110        Ok(Self {
111            _address: address,
112            _cert_file: cert_file,
113            _macaroon_file: macaroon_file,
114            lnd_client,
115            fee_reserve,
116            kv_store,
117            wait_invoice_cancel_token: CancellationToken::new(),
118            wait_invoice_is_active: Arc::new(AtomicBool::new(false)),
119            settings: Bolt11Settings {
120                mpp: true,
121                unit: CurrencyUnit::Msat,
122                invoice_description: true,
123                amountless: true,
124                bolt12: false,
125            },
126        })
127    }
128
129    /// Get last add and settle indices from KV store
130    #[instrument(skip_all)]
131    async fn get_last_indices(&self) -> Result<(Option<u64>, Option<u64>), Error> {
132        let add_index = if let Some(stored_index) = self
133            .kv_store
134            .kv_read(
135                LND_KV_PRIMARY_NAMESPACE,
136                LND_KV_SECONDARY_NAMESPACE,
137                LAST_ADD_INDEX_KV_KEY,
138            )
139            .await
140            .map_err(|e| Error::Database(e.to_string()))?
141        {
142            if let Ok(index_str) = std::str::from_utf8(stored_index.as_slice()) {
143                index_str.parse::<u64>().ok()
144            } else {
145                None
146            }
147        } else {
148            None
149        };
150
151        let settle_index = if let Some(stored_index) = self
152            .kv_store
153            .kv_read(
154                LND_KV_PRIMARY_NAMESPACE,
155                LND_KV_SECONDARY_NAMESPACE,
156                LAST_SETTLE_INDEX_KV_KEY,
157            )
158            .await
159            .map_err(|e| Error::Database(e.to_string()))?
160        {
161            if let Ok(index_str) = std::str::from_utf8(stored_index.as_slice()) {
162                index_str.parse::<u64>().ok()
163            } else {
164                None
165            }
166        } else {
167            None
168        };
169
170        tracing::debug!(
171            "LND: Retrieved last indices from KV store - add_index: {:?}, settle_index: {:?}",
172            add_index,
173            settle_index
174        );
175        Ok((add_index, settle_index))
176    }
177}
178
179#[async_trait]
180impl MintPayment for Lnd {
181    type Err = payment::Error;
182
183    #[instrument(skip_all)]
184    async fn get_settings(&self) -> Result<serde_json::Value, Self::Err> {
185        Ok(serde_json::to_value(&self.settings)?)
186    }
187
188    #[instrument(skip_all)]
189    fn is_wait_invoice_active(&self) -> bool {
190        self.wait_invoice_is_active.load(Ordering::SeqCst)
191    }
192
193    #[instrument(skip_all)]
194    fn cancel_wait_invoice(&self) {
195        self.wait_invoice_cancel_token.cancel()
196    }
197
198    #[instrument(skip_all)]
199    async fn wait_payment_event(
200        &self,
201    ) -> Result<Pin<Box<dyn Stream<Item = Event> + Send>>, Self::Err> {
202        let mut lnd_client = self.lnd_client.clone();
203
204        // Get last indices from KV store
205        let (last_add_index, last_settle_index) =
206            self.get_last_indices().await.unwrap_or((None, None));
207
208        let stream_req = lnrpc::InvoiceSubscription {
209            add_index: last_add_index.unwrap_or(0),
210            settle_index: last_settle_index.unwrap_or(0),
211        };
212
213        tracing::debug!(
214            "LND: Starting invoice subscription with add_index: {}, settle_index: {}",
215            stream_req.add_index,
216            stream_req.settle_index
217        );
218
219        let stream = lnd_client
220            .lightning()
221            .subscribe_invoices(stream_req)
222            .await
223            .map_err(|_err| {
224                tracing::error!("Could not subscribe to invoice");
225                Error::Connection
226            })?
227            .into_inner();
228
229        let cancel_token = self.wait_invoice_cancel_token.clone();
230        let kv_store = self.kv_store.clone();
231
232        let event_stream = futures::stream::unfold(
233            (
234                stream,
235                cancel_token,
236                Arc::clone(&self.wait_invoice_is_active),
237                kv_store,
238                last_add_index.unwrap_or(0),
239                last_settle_index.unwrap_or(0),
240            ),
241            |(
242                mut stream,
243                cancel_token,
244                is_active,
245                kv_store,
246                mut current_add_index,
247                mut current_settle_index,
248            )| async move {
249                is_active.store(true, Ordering::SeqCst);
250
251                loop {
252                    tokio::select! {
253                        _ = cancel_token.cancelled() => {
254                            // Stream is cancelled
255                            is_active.store(false, Ordering::SeqCst);
256                            tracing::info!("Waiting for lnd invoice ending");
257                            return None;
258                        }
259                        msg = stream.message() => {
260                            match msg {
261                                Ok(Some(msg)) => {
262                                    // Update indices based on the message
263                                    current_add_index = current_add_index.max(msg.add_index);
264                                    current_settle_index = current_settle_index.max(msg.settle_index);
265
266                                    // Store the updated indices in KV store regardless of settlement status
267                                    let add_index_str = current_add_index.to_string();
268                                    let settle_index_str = current_settle_index.to_string();
269
270                                    if let Ok(mut tx) = kv_store.begin_transaction().await {
271                                        let mut has_error = false;
272
273                                        if let Err(e) = tx.kv_write(LND_KV_PRIMARY_NAMESPACE, LND_KV_SECONDARY_NAMESPACE, LAST_ADD_INDEX_KV_KEY, add_index_str.as_bytes()).await {
274                                            tracing::warn!("LND: Failed to write add_index {} to KV store: {}", current_add_index, e);
275                                            has_error = true;
276                                        }
277
278                                        if let Err(e) = tx.kv_write(LND_KV_PRIMARY_NAMESPACE, LND_KV_SECONDARY_NAMESPACE, LAST_SETTLE_INDEX_KV_KEY, settle_index_str.as_bytes()).await {
279                                            tracing::warn!("LND: Failed to write settle_index {} to KV store: {}", current_settle_index, e);
280                                            has_error = true;
281                                        }
282
283                                        if !has_error {
284                                            if let Err(e) = tx.commit().await {
285                                                tracing::warn!("LND: Failed to commit indices to KV store: {}", e);
286                                            } else {
287                                                tracing::debug!("LND: Stored updated indices - add_index: {}, settle_index: {}", current_add_index, current_settle_index);
288                                            }
289                                        }
290                                    } else {
291                                        tracing::warn!("LND: Failed to begin KV transaction for storing indices");
292                                    }
293
294                                    // Only emit event for settled invoices
295                                    if msg.state() == InvoiceState::Settled {
296                                        let hash_slice: Result<[u8;32], _> = msg.r_hash.try_into();
297
298                                        if let Ok(hash_slice) = hash_slice {
299                                            let hash = hex::encode(hash_slice);
300
301                                            tracing::info!("LND: Payment for {} with amount {} msat", hash,  msg.amt_paid_msat);
302
303                                            let wait_response = WaitPaymentResponse {
304                                                payment_identifier: PaymentIdentifier::PaymentHash(hash_slice),
305                                                payment_amount: Amount::from(msg.amt_paid_msat as u64),
306                                                unit: CurrencyUnit::Msat,
307                                                payment_id: hash,
308                                            };
309                                            let event = Event::PaymentReceived(wait_response);
310                                            return Some((event, (stream, cancel_token, is_active, kv_store, current_add_index, current_settle_index)));
311                                        } else {
312                                            // Invalid hash, skip this message but continue streaming
313                                            tracing::error!("LND returned invalid payment hash");
314                                            // Continue the loop without yielding
315                                            continue;
316                                        }
317                                    } else {
318                                        // Not a settled invoice, continue but don't emit event
319                                        tracing::debug!("LND: Received non-settled invoice, continuing to wait for settled invoices");
320                                        // Continue the loop without yielding
321                                        continue;
322                                    }
323                                }
324                                Ok(None) => {
325                                    is_active.store(false, Ordering::SeqCst);
326                                    tracing::info!("LND invoice stream ended.");
327                                    return None;
328                                }
329                                Err(err) => {
330                                    is_active.store(false, Ordering::SeqCst);
331                                    tracing::warn!("Encountered error in LND invoice stream. Stream ending");
332                                    tracing::error!("{:?}", err);
333                                    return None;
334                                }
335                            }
336                        }
337                    }
338                }
339            },
340        );
341
342        Ok(Box::pin(event_stream))
343    }
344
345    #[instrument(skip_all)]
346    async fn get_payment_quote(
347        &self,
348        unit: &CurrencyUnit,
349        options: OutgoingPaymentOptions,
350    ) -> Result<PaymentQuoteResponse, Self::Err> {
351        match options {
352            OutgoingPaymentOptions::Bolt11(bolt11_options) => {
353                let amount_msat = match bolt11_options.melt_options {
354                    Some(amount) => amount.amount_msat(),
355                    None => bolt11_options
356                        .bolt11
357                        .amount_milli_satoshis()
358                        .ok_or(Error::UnknownInvoiceAmount)?
359                        .into(),
360                };
361
362                let amount = to_unit(amount_msat, &CurrencyUnit::Msat, unit)?;
363
364                let relative_fee_reserve =
365                    (self.fee_reserve.percent_fee_reserve * u64::from(amount) as f32) as u64;
366
367                let absolute_fee_reserve: u64 = self.fee_reserve.min_fee_reserve.into();
368
369                let fee = max(relative_fee_reserve, absolute_fee_reserve);
370
371                Ok(PaymentQuoteResponse {
372                    request_lookup_id: Some(PaymentIdentifier::PaymentHash(
373                        *bolt11_options.bolt11.payment_hash().as_ref(),
374                    )),
375                    amount,
376                    fee: fee.into(),
377                    state: MeltQuoteState::Unpaid,
378                    unit: unit.clone(),
379                })
380            }
381            OutgoingPaymentOptions::Bolt12(_) => {
382                Err(Self::Err::Anyhow(anyhow!("BOLT12 not supported by LND")))
383            }
384        }
385    }
386
387    #[instrument(skip_all)]
388    async fn make_payment(
389        &self,
390        _unit: &CurrencyUnit,
391        options: OutgoingPaymentOptions,
392    ) -> Result<MakePaymentResponse, Self::Err> {
393        match options {
394            OutgoingPaymentOptions::Bolt11(bolt11_options) => {
395                let bolt11 = bolt11_options.bolt11;
396
397                let pay_state = self
398                    .check_outgoing_payment(&PaymentIdentifier::PaymentHash(
399                        *bolt11.payment_hash().as_ref(),
400                    ))
401                    .await?;
402
403                match pay_state.status {
404                    MeltQuoteState::Unpaid | MeltQuoteState::Unknown | MeltQuoteState::Failed => (),
405                    MeltQuoteState::Paid => {
406                        tracing::debug!("Melt attempted on invoice already paid");
407                        return Err(Self::Err::InvoiceAlreadyPaid);
408                    }
409                    MeltQuoteState::Pending => {
410                        tracing::debug!("Melt attempted on invoice already pending");
411                        return Err(Self::Err::InvoicePaymentPending);
412                    }
413                }
414
415                // Detect partial payments
416                match bolt11_options.melt_options {
417                    Some(MeltOptions::Mpp { mpp }) => {
418                        let amount_msat: u64 = bolt11
419                            .amount_milli_satoshis()
420                            .ok_or(Error::UnknownInvoiceAmount)?;
421                        {
422                            let partial_amount_msat = mpp.amount;
423                            let invoice = bolt11;
424                            let max_fee: Option<Amount> = bolt11_options.max_fee_amount;
425
426                            // Extract information from invoice
427                            let pub_key = invoice.get_payee_pub_key();
428                            let payer_addr = invoice.payment_secret().0.to_vec();
429                            let payment_hash = invoice.payment_hash();
430
431                            let mut lnd_client = self.lnd_client.clone();
432
433                            for attempt in 0..Self::MAX_ROUTE_RETRIES {
434                                // Create a request for the routes
435                                let route_req = lnrpc::QueryRoutesRequest {
436                                    pub_key: hex::encode(pub_key.serialize()),
437                                    amt_msat: u64::from(partial_amount_msat) as i64,
438                                    fee_limit: max_fee.map(|f| {
439                                        let limit = Limit::Fixed(u64::from(f) as i64);
440                                        FeeLimit { limit: Some(limit) }
441                                    }),
442                                    use_mission_control: true,
443                                    ..Default::default()
444                                };
445
446                                // Query the routes
447                                let mut routes_response = lnd_client
448                                    .lightning()
449                                    .query_routes(route_req)
450                                    .await
451                                    .map_err(Error::LndError)?
452                                    .into_inner();
453
454                                // update its MPP record,
455                                // attempt it and check the result
456                                let last_hop: &mut Hop = routes_response.routes[0]
457                                    .hops
458                                    .last_mut()
459                                    .ok_or(Error::MissingLastHop)?;
460                                let mpp_record = MppRecord {
461                                    payment_addr: payer_addr.clone(),
462                                    total_amt_msat: amount_msat as i64,
463                                };
464                                last_hop.mpp_record = Some(mpp_record);
465
466                                let payment_response = lnd_client
467                                    .router()
468                                    .send_to_route_v2(routerrpc::SendToRouteRequest {
469                                        payment_hash: payment_hash.to_byte_array().to_vec(),
470                                        route: Some(routes_response.routes[0].clone()),
471                                        ..Default::default()
472                                    })
473                                    .await
474                                    .map_err(Error::LndError)?
475                                    .into_inner();
476
477                                if let Some(failure) = payment_response.failure {
478                                    if failure.code == 15 {
479                                        tracing::debug!(
480                                            "Attempt number {}: route has failed. Re-querying...",
481                                            attempt + 1
482                                        );
483                                        continue;
484                                    }
485                                }
486
487                                // Get status and maybe the preimage
488                                let (status, payment_preimage) = match payment_response.status {
489                                    0 => (MeltQuoteState::Pending, None),
490                                    1 => (
491                                        MeltQuoteState::Paid,
492                                        Some(hex::encode(payment_response.preimage)),
493                                    ),
494                                    2 => (MeltQuoteState::Unpaid, None),
495                                    _ => (MeltQuoteState::Unknown, None),
496                                };
497
498                                // Get the actual amount paid in sats
499                                let mut total_amt: u64 = 0;
500                                if let Some(route) = payment_response.route {
501                                    total_amt = (route.total_amt_msat / 1000) as u64;
502                                }
503
504                                return Ok(MakePaymentResponse {
505                                    payment_lookup_id: PaymentIdentifier::PaymentHash(
506                                        payment_hash.to_byte_array(),
507                                    ),
508                                    payment_proof: payment_preimage,
509                                    status,
510                                    total_spent: total_amt.into(),
511                                    unit: CurrencyUnit::Sat,
512                                });
513                            }
514
515                            // "We have exhausted all tactical options" -- STEM, Upgrade (2018)
516                            // The payment was not possible within 50 retries.
517                            tracing::error!("Limit of retries reached, payment couldn't succeed.");
518                            Err(Error::PaymentFailed.into())
519                        }
520                    }
521                    _ => {
522                        let mut lnd_client = self.lnd_client.clone();
523
524                        let max_fee: Option<Amount> = bolt11_options.max_fee_amount;
525
526                        let amount_msat = u64::from(
527                            bolt11_options
528                                .melt_options
529                                .map(|a| a.amount_msat())
530                                .unwrap_or_default(),
531                        );
532
533                        let pay_req = lnrpc::SendRequest {
534                            payment_request: bolt11.to_string(),
535                            fee_limit: max_fee.map(|f| {
536                                let limit = Limit::Fixed(u64::from(f) as i64);
537                                FeeLimit { limit: Some(limit) }
538                            }),
539                            amt_msat: amount_msat as i64,
540                            ..Default::default()
541                        };
542
543                        let payment_response = lnd_client
544                            .lightning()
545                            .send_payment_sync(tonic::Request::new(pay_req))
546                            .await
547                            .map_err(|err| {
548                                tracing::warn!("Lightning payment failed: {}", err);
549                                Error::PaymentFailed
550                            })?
551                            .into_inner();
552
553                        let total_amount = payment_response
554                            .payment_route
555                            .map_or(0, |route| route.total_amt_msat / MSAT_IN_SAT as i64)
556                            as u64;
557
558                        let (status, payment_preimage) = match total_amount == 0 {
559                            true => (MeltQuoteState::Unpaid, None),
560                            false => (
561                                MeltQuoteState::Paid,
562                                Some(hex::encode(payment_response.payment_preimage)),
563                            ),
564                        };
565
566                        let payment_identifier =
567                            PaymentIdentifier::PaymentHash(*bolt11.payment_hash().as_ref());
568
569                        Ok(MakePaymentResponse {
570                            payment_lookup_id: payment_identifier,
571                            payment_proof: payment_preimage,
572                            status,
573                            total_spent: total_amount.into(),
574                            unit: CurrencyUnit::Sat,
575                        })
576                    }
577                }
578            }
579            OutgoingPaymentOptions::Bolt12(_) => {
580                Err(Self::Err::Anyhow(anyhow!("BOLT12 not supported by LND")))
581            }
582        }
583    }
584
585    #[instrument(skip(self, options))]
586    async fn create_incoming_payment_request(
587        &self,
588        unit: &CurrencyUnit,
589        options: IncomingPaymentOptions,
590    ) -> Result<CreateIncomingPaymentResponse, Self::Err> {
591        match options {
592            IncomingPaymentOptions::Bolt11(bolt11_options) => {
593                let description = bolt11_options.description.unwrap_or_default();
594                let amount = bolt11_options.amount;
595                let unix_expiry = bolt11_options.unix_expiry;
596
597                let amount_msat = to_unit(amount, unit, &CurrencyUnit::Msat)?;
598
599                let invoice_request = lnrpc::Invoice {
600                    value_msat: u64::from(amount_msat) as i64,
601                    memo: description,
602                    ..Default::default()
603                };
604
605                let mut lnd_client = self.lnd_client.clone();
606
607                let invoice = lnd_client
608                    .lightning()
609                    .add_invoice(tonic::Request::new(invoice_request))
610                    .await
611                    .map_err(|e| payment::Error::Anyhow(anyhow!(e)))?
612                    .into_inner();
613
614                let bolt11 = Bolt11Invoice::from_str(&invoice.payment_request)?;
615
616                let payment_identifier =
617                    PaymentIdentifier::PaymentHash(*bolt11.payment_hash().as_ref());
618
619                Ok(CreateIncomingPaymentResponse {
620                    request_lookup_id: payment_identifier,
621                    request: bolt11.to_string(),
622                    expiry: unix_expiry,
623                })
624            }
625            IncomingPaymentOptions::Bolt12(_) => {
626                Err(Self::Err::Anyhow(anyhow!("BOLT12 not supported by LND")))
627            }
628        }
629    }
630
631    #[instrument(skip(self))]
632    async fn check_incoming_payment_status(
633        &self,
634        payment_identifier: &PaymentIdentifier,
635    ) -> Result<Vec<WaitPaymentResponse>, Self::Err> {
636        let mut lnd_client = self.lnd_client.clone();
637
638        let invoice_request = lnrpc::PaymentHash {
639            r_hash: hex::decode(payment_identifier.to_string()).unwrap(),
640            ..Default::default()
641        };
642
643        let invoice = lnd_client
644            .lightning()
645            .lookup_invoice(tonic::Request::new(invoice_request))
646            .await
647            .map_err(|e| payment::Error::Anyhow(anyhow!(e)))?
648            .into_inner();
649
650        if invoice.state() == InvoiceState::Settled {
651            Ok(vec![WaitPaymentResponse {
652                payment_identifier: payment_identifier.clone(),
653                payment_amount: Amount::from(invoice.amt_paid_msat as u64),
654                unit: CurrencyUnit::Msat,
655                payment_id: hex::encode(invoice.r_hash),
656            }])
657        } else {
658            Ok(vec![])
659        }
660    }
661
662    #[instrument(skip(self))]
663    async fn check_outgoing_payment(
664        &self,
665        payment_identifier: &PaymentIdentifier,
666    ) -> Result<MakePaymentResponse, Self::Err> {
667        let mut lnd_client = self.lnd_client.clone();
668
669        let payment_hash = &payment_identifier.to_string();
670
671        let track_request = routerrpc::TrackPaymentRequest {
672            payment_hash: hex::decode(payment_hash).map_err(|_| Error::InvalidHash)?,
673            no_inflight_updates: true,
674        };
675
676        let payment_response = lnd_client.router().track_payment_v2(track_request).await;
677
678        let mut payment_stream = match payment_response {
679            Ok(stream) => stream.into_inner(),
680            Err(err) => {
681                let err_code = err.code();
682                if err_code == tonic::Code::NotFound {
683                    return Ok(MakePaymentResponse {
684                        payment_lookup_id: payment_identifier.clone(),
685                        payment_proof: None,
686                        status: MeltQuoteState::Unknown,
687                        total_spent: Amount::ZERO,
688                        unit: self.settings.unit.clone(),
689                    });
690                } else {
691                    return Err(payment::Error::UnknownPaymentState);
692                }
693            }
694        };
695
696        while let Some(update_result) = payment_stream.next().await {
697            match update_result {
698                Ok(update) => {
699                    let status = update.status();
700
701                    let response = match status {
702                        PaymentStatus::Unknown => MakePaymentResponse {
703                            payment_lookup_id: payment_identifier.clone(),
704                            payment_proof: Some(update.payment_preimage),
705                            status: MeltQuoteState::Unknown,
706                            total_spent: Amount::ZERO,
707                            unit: self.settings.unit.clone(),
708                        },
709                        PaymentStatus::InFlight | PaymentStatus::Initiated => {
710                            // Continue waiting for the next update
711                            continue;
712                        }
713                        PaymentStatus::Succeeded => MakePaymentResponse {
714                            payment_lookup_id: payment_identifier.clone(),
715                            payment_proof: Some(update.payment_preimage),
716                            status: MeltQuoteState::Paid,
717                            total_spent: Amount::from(
718                                (update
719                                    .value_sat
720                                    .checked_add(update.fee_sat)
721                                    .ok_or(Error::AmountOverflow)?)
722                                    as u64,
723                            ),
724                            unit: CurrencyUnit::Sat,
725                        },
726                        PaymentStatus::Failed => MakePaymentResponse {
727                            payment_lookup_id: payment_identifier.clone(),
728                            payment_proof: Some(update.payment_preimage),
729                            status: MeltQuoteState::Failed,
730                            total_spent: Amount::ZERO,
731                            unit: self.settings.unit.clone(),
732                        },
733                    };
734
735                    return Ok(response);
736                }
737                Err(_) => {
738                    // Handle the case where the update itself is an error (e.g., stream failure)
739                    return Err(Error::UnknownPaymentStatus.into());
740                }
741            }
742        }
743
744        // If the stream is exhausted without a final status
745        Err(Error::UnknownPaymentStatus.into())
746    }
747}