fedimint_recurringd/
lib.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::anyhow;
6use fedimint_api_client::api::net::Connector;
7use fedimint_client::{Client, ClientHandleArc, ClientModule, ClientModuleInstance};
8use fedimint_core::config::FederationId;
9use fedimint_core::core::{ModuleKind, OperationId};
10use fedimint_core::db::{
11    AutocommitResultExt, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
12    IRawDatabase,
13};
14use fedimint_core::encoding::{Decodable, Encodable};
15use fedimint_core::invite_code::InviteCode;
16use fedimint_core::secp256k1::SECP256K1;
17use fedimint_core::secp256k1::hashes::sha256;
18use fedimint_core::task::timeout;
19use fedimint_core::util::SafeUrl;
20use fedimint_core::{Amount, BitcoinHash};
21use fedimint_derive_secret::DerivableSecret;
22use fedimint_ln_client::recurring::{
23    PaymentCodeId, PaymentCodeRootKey, RecurringPaymentError, RecurringPaymentProtocol,
24};
25use fedimint_ln_client::{
26    LightningClientInit, LightningClientModule, LightningOperationMeta,
27    LightningOperationMetaVariant, LnReceiveState, tweak_user_key,
28};
29use fedimint_mint_client::MintClientInit;
30use futures::StreamExt;
31use lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription, Sha256};
32use lnurl::Tag;
33use lnurl::lnurl::LnUrl;
34use lnurl::pay::PayResponse;
35use serde::{Deserialize, Serialize};
36use tokio::sync::{Notify, RwLock};
37use tracing::{info, warn};
38
39use crate::db::{
40    FederationDbPrefix, PaymentCodeEntry, PaymentCodeInvoiceEntry, PaymentCodeInvoiceKey,
41    PaymentCodeKey, PaymentCodeNextInvoiceIndexKey, PaymentCodeVariant, SchemaVersionKey,
42    load_federation_client_databases, open_client_db, try_add_federation_database,
43};
44
45mod db;
46
47#[derive(Clone)]
48pub struct RecurringInvoiceServer {
49    db: Database,
50    clients: Arc<RwLock<HashMap<FederationId, ClientHandleArc>>>,
51    invoice_generated: Arc<Notify>,
52    base_url: SafeUrl,
53}
54
55impl RecurringInvoiceServer {
56    pub async fn new(db: impl IRawDatabase + 'static, base_url: SafeUrl) -> anyhow::Result<Self> {
57        let db = Database::new(db, Default::default());
58
59        let mut clients = HashMap::<_, ClientHandleArc>::new();
60
61        for (federation_id, db) in load_federation_client_databases(&db).await {
62            let mut client_builder = Client::builder().await?;
63            client_builder.with_module(LightningClientInit::default());
64            client_builder.with_module(MintClientInit);
65            client_builder.with_primary_module_kind(ModuleKind::from_static_str("mint"));
66            let client = client_builder
67                .open(
68                    db,
69                    fedimint_client::RootSecret::StandardDoubleDerive(Self::default_secret()),
70                )
71                .await?;
72            clients.insert(federation_id, Arc::new(client));
73        }
74
75        let slf = Self {
76            db: db.clone(),
77            clients: Arc::new(RwLock::new(clients)),
78            invoice_generated: Arc::new(Default::default()),
79            base_url,
80        };
81
82        slf.run_db_migrations().await;
83
84        Ok(slf)
85    }
86
87    /// We don't want to hold any money or sign anything ourselves, we only use
88    /// the client with externally supplied key material and to track
89    /// ongoing progress of other users' receives.
90    fn default_secret() -> DerivableSecret {
91        DerivableSecret::new_root(&[], &[])
92    }
93
94    pub async fn register_federation(
95        &self,
96        invite_code: &InviteCode,
97    ) -> Result<FederationId, RecurringPaymentError> {
98        let federation_id = invite_code.federation_id();
99        info!("Registering federation {}", federation_id);
100
101        // We lock to prevent parallel join attempts
102        // TODO: lock per federation
103        let mut clients = self.clients.write().await;
104        if clients.contains_key(&federation_id) {
105            return Err(RecurringPaymentError::FederationAlreadyRegistered(
106                federation_id,
107            ));
108        }
109
110        // We don't know if joining will succeed or be interrupted. We use a random DB
111        // prefix to initialize the client and only write the prefix to the DB if that
112        // succeeds. If it fails we end up with some orphaned data in the DB, if it ever
113        // becomes a problem we can clean it up later.
114        let client_db_prefix = FederationDbPrefix::random();
115        let client_db = open_client_db(&self.db, client_db_prefix);
116
117        match Self::join_federation_static(client_db, invite_code).await {
118            Ok(client) => {
119                try_add_federation_database(&self.db, federation_id, client_db_prefix)
120                    .await
121                    .expect("We hold a global lock, no parallel joining can happen");
122                clients.insert(federation_id, client);
123                Ok(federation_id)
124            }
125            Err(e) => {
126                // TODO: clean up DB?
127                Err(e)
128            }
129        }
130    }
131
132    async fn join_federation_static(
133        client_db: Database,
134        invite_code: &InviteCode,
135    ) -> Result<ClientHandleArc, RecurringPaymentError> {
136        let mut client_builder = Client::builder()
137            .await
138            .map_err(RecurringPaymentError::JoiningFederationFailed)?;
139
140        client_builder.with_connector(Connector::default());
141        client_builder.with_module(LightningClientInit::default());
142        client_builder.with_module(MintClientInit);
143        client_builder.with_primary_module_kind(ModuleKind::from_static_str("mint"));
144
145        let client = client_builder
146            .preview(invite_code)
147            .await?
148            .join(
149                client_db,
150                fedimint_client::RootSecret::StandardDoubleDerive(Self::default_secret()),
151            )
152            .await
153            .map_err(RecurringPaymentError::JoiningFederationFailed)?;
154        Ok(Arc::new(client))
155    }
156
157    pub async fn register_recurring_payment_code(
158        &self,
159        federation_id: FederationId,
160        payment_code_root_key: PaymentCodeRootKey,
161        protocol: RecurringPaymentProtocol,
162        meta: &str,
163    ) -> Result<String, RecurringPaymentError> {
164        // TODO: support BOLT12
165        if protocol != RecurringPaymentProtocol::LNURL {
166            return Err(RecurringPaymentError::UnsupportedProtocol(protocol));
167        }
168
169        // Ensure the federation is supported
170        self.get_federation_client(federation_id).await?;
171
172        let payment_code = self.create_lnurl(payment_code_root_key.to_payment_code_id());
173        let payment_code_entry = PaymentCodeEntry {
174            root_key: payment_code_root_key,
175            federation_id,
176            protocol,
177            payment_code: payment_code.clone(),
178            variant: PaymentCodeVariant::Lnurl {
179                meta: meta.to_owned(),
180            },
181        };
182
183        let mut dbtx = self.db.begin_transaction().await;
184        if let Some(existing_code) = dbtx
185            .insert_entry(
186                &PaymentCodeKey {
187                    payment_code_id: payment_code_root_key.to_payment_code_id(),
188                },
189                &payment_code_entry,
190            )
191            .await
192        {
193            if existing_code != payment_code_entry {
194                return Err(RecurringPaymentError::PaymentCodeAlreadyExists(
195                    payment_code_root_key,
196                ));
197            }
198
199            dbtx.ignore_uncommitted();
200            return Ok(payment_code);
201        }
202
203        dbtx.insert_new_entry(
204            &PaymentCodeNextInvoiceIndexKey {
205                payment_code_id: payment_code_root_key.to_payment_code_id(),
206            },
207            &0,
208        )
209        .await;
210        dbtx.commit_tx_result().await?;
211
212        Ok(payment_code)
213    }
214
215    fn create_lnurl(&self, payment_code_id: PaymentCodeId) -> String {
216        let lnurl = LnUrl::from_url(format!(
217            "{}lnv1/paycodes/{}",
218            self.base_url, payment_code_id
219        ));
220        lnurl.encode()
221    }
222
223    pub async fn lnurl_pay(
224        &self,
225        payment_code_id: PaymentCodeId,
226    ) -> Result<PayResponse, RecurringPaymentError> {
227        let payment_code = self.get_payment_code(payment_code_id).await?;
228        let PaymentCodeVariant::Lnurl { meta } = payment_code.variant;
229
230        Ok(PayResponse {
231            callback: format!("{}lnv1/paycodes/{}/invoice", self.base_url, payment_code_id),
232            max_sendable: 100000000000,
233            min_sendable: 1,
234            tag: Tag::PayRequest,
235            metadata: meta,
236            comment_allowed: None,
237            allows_nostr: None,
238            nostr_pubkey: None,
239        })
240    }
241
242    pub async fn lnurl_invoice(
243        &self,
244        payment_code_id: PaymentCodeId,
245        amount: Amount,
246    ) -> Result<LNURLPayInvoice, RecurringPaymentError> {
247        let (operation_id, federation_id, invoice) =
248            self.create_bolt11_invoice(payment_code_id, amount).await?;
249        Ok(LNURLPayInvoice {
250            pr: invoice.to_string(),
251            verify: format!(
252                "{}lnv1/verify/{}/{}",
253                self.base_url,
254                federation_id,
255                operation_id.fmt_full()
256            ),
257        })
258    }
259
260    async fn create_bolt11_invoice(
261        &self,
262        payment_code_id: PaymentCodeId,
263        amount: Amount,
264    ) -> Result<(OperationId, FederationId, Bolt11Invoice), RecurringPaymentError> {
265        // Invoices are valid for one day by default, might become dynamic with BOLT12
266        // support
267        const DEFAULT_EXPIRY_TIME: u64 = 60 * 60 * 24;
268
269        let payment_code = self.get_payment_code(payment_code_id).await?;
270
271        let federation_client = self
272            .get_federation_client(payment_code.federation_id)
273            .await?;
274
275        let (operation_id, invoice) = self
276            .db
277            .autocommit(
278                |dbtx, _| {
279                    let federation_client = federation_client.clone();
280                    let payment_code = payment_code.clone();
281                    Box::pin(async move {
282                        let invoice_index = self
283                            .get_next_invoice_index(&mut dbtx.to_ref_nc(), payment_code_id)
284                            .await;
285
286                        // Check if the invoice index was already used in an aborted call to this
287                        // fn. If so:
288                        //   1. Save the previously generated invoice. We don't want to reuse it
289                        //      since it may be expired and in the future may contain call-specific
290                        //      data, but also want to allow the client to sync past it.
291                        //   2. Increment the invoice index to generate a new invoice since re-using
292                        //      the same index wouldn't work (operation id reuse is forbidden).
293                        let initial_operation_id =
294                            operation_id_from_user_key(payment_code.root_key, invoice_index);
295                        let invoice_index = if let Some(invoice) =
296                            Self::check_if_invoice_exists(&federation_client, initial_operation_id)
297                                .await
298                        {
299                            self.save_bolt11_invoice(
300                                dbtx,
301                                initial_operation_id,
302                                payment_code_id,
303                                invoice_index,
304                                invoice,
305                            )
306                            .await;
307                            self.get_next_invoice_index(&mut dbtx.to_ref_nc(), payment_code_id)
308                                .await
309                        } else {
310                            invoice_index
311                        };
312
313                        // This is where the main part starts: generate the invoice and save it to
314                        // the DB
315                        let federation_client_ln_module = federation_client.get_ln_module()?;
316                        let gateway = federation_client_ln_module
317                            .get_gateway(None, false)
318                            .await?
319                            .ok_or(RecurringPaymentError::NoGatewayFound)?;
320
321                        let lnurl_meta = match payment_code.variant {
322                            PaymentCodeVariant::Lnurl { meta } => meta,
323                        };
324                        let meta_hash = Sha256(sha256::Hash::hash(lnurl_meta.as_bytes()));
325                        let description = Bolt11InvoiceDescription::Hash(meta_hash);
326
327                        // TODO: ideally creating the invoice would take a dbtx as argument so we
328                        // don't have to do the "check if invoice already exists" dance
329                        let (operation_id, invoice, _preimage) = federation_client_ln_module
330                            .create_bolt11_invoice_for_user_tweaked(
331                                amount,
332                                description,
333                                Some(DEFAULT_EXPIRY_TIME),
334                                payment_code.root_key.0,
335                                invoice_index,
336                                serde_json::Value::Null,
337                                Some(gateway),
338                            )
339                            .await?;
340
341                        self.save_bolt11_invoice(
342                            dbtx,
343                            operation_id,
344                            payment_code_id,
345                            invoice_index,
346                            invoice.clone(),
347                        )
348                        .await;
349
350                        Result::<_, anyhow::Error>::Ok((operation_id, invoice))
351                    })
352                },
353                None,
354            )
355            .await
356            .unwrap_autocommit()?;
357
358        await_invoice_confirmed(&federation_client.get_ln_module()?, operation_id).await?;
359
360        Ok((operation_id, federation_client.federation_id(), invoice))
361    }
362
363    async fn save_bolt11_invoice(
364        &self,
365        dbtx: &mut DatabaseTransaction<'_>,
366        operation_id: OperationId,
367        payment_code_id: PaymentCodeId,
368        invoice_index: u64,
369        invoice: Bolt11Invoice,
370    ) {
371        dbtx.insert_new_entry(
372            &PaymentCodeInvoiceKey {
373                payment_code_id,
374                index: invoice_index,
375            },
376            &PaymentCodeInvoiceEntry {
377                operation_id,
378                invoice: PaymentCodeInvoice::Bolt11(invoice.clone()),
379            },
380        )
381        .await;
382
383        let invoice_generated_notifier = self.invoice_generated.clone();
384        dbtx.on_commit(move || {
385            invoice_generated_notifier.notify_waiters();
386        });
387    }
388
389    async fn check_if_invoice_exists(
390        federation_client: &ClientHandleArc,
391        operation_id: OperationId,
392    ) -> Option<Bolt11Invoice> {
393        let operation = federation_client
394            .operation_log()
395            .get_operation(operation_id)
396            .await?;
397
398        assert_eq!(
399            operation.operation_module_kind(),
400            LightningClientModule::kind().as_str()
401        );
402
403        let LightningOperationMetaVariant::Receive { invoice, .. } =
404            operation.meta::<LightningOperationMeta>().variant
405        else {
406            panic!(
407                "Unexpected operation meta variant: {:?}",
408                operation.meta::<LightningOperationMeta>().variant
409            );
410        };
411
412        Some(invoice)
413    }
414
415    async fn get_federation_client(
416        &self,
417        federation_id: FederationId,
418    ) -> Result<ClientHandleArc, RecurringPaymentError> {
419        self.clients
420            .read()
421            .await
422            .get(&federation_id)
423            .cloned()
424            .ok_or(RecurringPaymentError::UnknownFederationId(federation_id))
425    }
426
427    pub async fn await_invoice_index_generated(
428        &self,
429        payment_code_id: PaymentCodeId,
430        invoice_index: u64,
431    ) -> Result<PaymentCodeInvoiceEntry, RecurringPaymentError> {
432        self.get_payment_code(payment_code_id).await?;
433
434        let mut notified = self.invoice_generated.notified();
435        loop {
436            let mut dbtx = self.db.begin_transaction_nc().await;
437            if let Some(invoice_entry) = dbtx
438                .get_value(&PaymentCodeInvoiceKey {
439                    payment_code_id,
440                    index: invoice_index,
441                })
442                .await
443            {
444                break Ok(invoice_entry);
445            };
446
447            notified.await;
448            notified = self.invoice_generated.notified();
449        }
450    }
451
452    async fn get_next_invoice_index(
453        &self,
454        dbtx: &mut DatabaseTransaction<'_>,
455        payment_code_id: PaymentCodeId,
456    ) -> u64 {
457        let next_index = dbtx
458            .get_value(&PaymentCodeNextInvoiceIndexKey { payment_code_id })
459            .await
460            .map(|index| index + 1)
461            .unwrap_or(0);
462        dbtx.insert_entry(
463            &PaymentCodeNextInvoiceIndexKey { payment_code_id },
464            &next_index,
465        )
466        .await;
467
468        next_index
469    }
470
471    pub async fn list_federations(&self) -> Vec<FederationId> {
472        self.clients.read().await.keys().cloned().collect()
473    }
474
475    async fn get_payment_code(
476        &self,
477        payment_code_id: PaymentCodeId,
478    ) -> Result<PaymentCodeEntry, RecurringPaymentError> {
479        self.db
480            .begin_transaction_nc()
481            .await
482            .get_value(&PaymentCodeKey { payment_code_id })
483            .await
484            .ok_or(RecurringPaymentError::UnknownPaymentCode(payment_code_id))
485    }
486
487    /// Returns if an invoice has been paid yet. To avoid DB indirection and
488    /// since the URLs would be similarly long either way we identify
489    /// invoices by federation id and operation id instead of the payment
490    /// code. This function is the basis of `recurringd`'s [LUD-21]
491    /// implementation that allows clients to verify if a given invoice they
492    /// generated using the LNURL has been paid yet.
493    ///
494    /// [LUD-21]: https://github.com/lnurl/luds/blob/luds/21.md
495    pub async fn verify_invoice_paid(
496        &self,
497        federation_id: FederationId,
498        operation_id: OperationId,
499    ) -> Result<InvoiceStatus, RecurringPaymentError> {
500        let federation_client = self.get_federation_client(federation_id).await?;
501
502        // Unfortunately LUD-21 wants us to return the invoice again, so we have to
503        // fetch it from the operation meta.
504        let invoice = {
505            let operation = federation_client
506                .operation_log()
507                .get_operation(operation_id)
508                .await
509                .ok_or(RecurringPaymentError::UnknownInvoice(operation_id))?;
510
511            if operation.operation_module_kind() != LightningClientModule::kind().as_str() {
512                return Err(RecurringPaymentError::UnknownInvoice(operation_id));
513            }
514
515            let LightningOperationMetaVariant::Receive { invoice, .. } =
516                operation.meta::<LightningOperationMeta>().variant
517            else {
518                return Err(RecurringPaymentError::UnknownInvoice(operation_id));
519            };
520
521            invoice
522        };
523
524        let ln_module = federation_client
525            .get_first_module::<LightningClientModule>()
526            .map_err(|e| {
527                warn!("No compatible lightning module found {e}");
528                RecurringPaymentError::NoLightningModuleFound
529            })?;
530
531        let mut stream = ln_module
532            .subscribe_ln_receive(operation_id)
533            .await
534            .map_err(|_| RecurringPaymentError::UnknownInvoice(operation_id))?
535            .into_stream();
536        let status = loop {
537            // Unfortunately the fedimint client doesn't track payment status internally
538            // yet, but relies on integrators to consume the update streams belonging to
539            // operations to figure out their state. Since the verify endpoint is meant to
540            // be non-blocking, we need to find a way to consume the stream until we think
541            // no immediate progress will be made anymore. That's why we limit each update
542            // step to 100ms, far more than a DB read should ever take, and abort if we'd
543            // block to wait for further progress to be made.
544            let update = timeout(Duration::from_millis(100), stream.next()).await;
545            match update {
546                // For some reason recurringd jumps right to claimed without going over funded … but
547                // either is fine to conclude the user will receive their money once they come
548                // online.
549                Ok(Some(LnReceiveState::Funded | LnReceiveState::Claimed)) => {
550                    break PaymentStatus::Paid;
551                }
552                // Keep looking for a state update indicating the invoice having been paid
553                Ok(Some(_)) => {
554                    continue;
555                }
556                // If we reach the end of the update stream without observing a state indicating the
557                // invoice having been paid there was likely some error or the invoice timed out.
558                // Either way we just show the invoice as unpaid.
559                Ok(None) | Err(_) => {
560                    break PaymentStatus::Pending;
561                }
562            }
563        };
564
565        Ok(InvoiceStatus { invoice, status })
566    }
567
568    async fn run_db_migrations(&self) {
569        let migrations = Self::migrations();
570        let schema_version: u64 = self
571            .db
572            .begin_transaction_nc()
573            .await
574            .get_value(&SchemaVersionKey)
575            .await
576            .unwrap_or_default();
577
578        for (target_schema, migration_fn) in migrations
579            .into_iter()
580            .skip_while(|(target_schema, _)| *target_schema <= schema_version)
581        {
582            let mut dbtx = self.db.begin_transaction().await;
583            dbtx.insert_entry(&SchemaVersionKey, &target_schema).await;
584
585            migration_fn(self, dbtx.to_ref_nc()).await;
586
587            dbtx.commit_tx().await;
588        }
589    }
590}
591
592async fn await_invoice_confirmed(
593    ln_module: &ClientModuleInstance<'_, LightningClientModule>,
594    operation_id: OperationId,
595) -> Result<(), RecurringPaymentError> {
596    let mut operation_updated = ln_module
597        .subscribe_ln_receive(operation_id)
598        .await?
599        .into_stream();
600
601    while let Some(update) = operation_updated.next().await {
602        if matches!(update, LnReceiveState::WaitingForPayment { .. }) {
603            return Ok(());
604        }
605    }
606
607    Err(RecurringPaymentError::Other(anyhow!(
608        "BOLT11 invoice not confirmed"
609    )))
610}
611
612#[derive(Debug, Clone, Eq, PartialEq, Hash, Encodable, Decodable)]
613pub enum PaymentCodeInvoice {
614    Bolt11(Bolt11Invoice),
615}
616
617/// Helper struct indicating if an invoice was paid. In the future it may also
618/// contain the preimage to be fully LUD-21 compliant.
619pub struct InvoiceStatus {
620    pub invoice: Bolt11Invoice,
621    pub status: PaymentStatus,
622}
623
624pub enum PaymentStatus {
625    Paid,
626    Pending,
627}
628
629impl PaymentStatus {
630    pub fn is_paid(&self) -> bool {
631        matches!(self, PaymentStatus::Paid)
632    }
633}
634
635/// The lnurl-rs crate doesn't have the `verify` field in this type and we don't
636/// use any of the other fields right now. Once we upstream the verify field
637/// this struct can be removed.
638#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
639pub struct LNURLPayInvoice {
640    pub pr: String,
641    pub verify: String,
642}
643
644fn operation_id_from_user_key(user_key: PaymentCodeRootKey, index: u64) -> OperationId {
645    let invoice_key = tweak_user_key(SECP256K1, user_key.0, index);
646    let preimage = sha256::Hash::hash(&invoice_key.serialize()[..]);
647    let payment_hash = sha256::Hash::hash(&preimage[..]);
648
649    OperationId(payment_hash.to_byte_array())
650}
651
652trait LnClientContextExt {
653    fn get_ln_module(
654        &'_ self,
655    ) -> Result<ClientModuleInstance<'_, LightningClientModule>, RecurringPaymentError>;
656}
657
658impl LnClientContextExt for ClientHandleArc {
659    fn get_ln_module(
660        &'_ self,
661    ) -> Result<ClientModuleInstance<'_, LightningClientModule>, RecurringPaymentError> {
662        self.get_first_module::<LightningClientModule>()
663            .map_err(|e| {
664                warn!("No compatible lightning module found {e}");
665                RecurringPaymentError::NoLightningModuleFound
666            })
667    }
668}