fedimint_client_rpc/
lib.rs

1use std::collections::HashMap;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::Context;
7use async_stream::try_stream;
8use fedimint_bip39::{Bip39RootSecretStrategy, Mnemonic};
9use fedimint_client::module::ClientModule;
10use fedimint_client::secret::RootSecretStrategy;
11use fedimint_client::{ClientHandleArc, RootSecret};
12use fedimint_core::config::{FederationId, FederationIdPrefix};
13use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
14use fedimint_core::encoding::{Decodable, Encodable};
15use fedimint_core::invite_code::InviteCode;
16use fedimint_core::task::{MaybeSend, MaybeSync};
17use fedimint_core::util::{BoxFuture, BoxStream};
18use fedimint_core::{Amount, TieredCounts, impl_db_record};
19use fedimint_derive_secret::{ChildId, DerivableSecret};
20use fedimint_ln_client::{LightningClientInit, LightningClientModule};
21use fedimint_meta_client::MetaClientInit;
22use fedimint_mint_client::{MintClientInit, MintClientModule, OOBNotes};
23use fedimint_wallet_client::{WalletClientInit, WalletClientModule};
24use futures::StreamExt;
25use futures::future::{AbortHandle, Abortable};
26use lightning_invoice::Bolt11InvoiceDescriptionRef;
27use rand::thread_rng;
28use serde::{Deserialize, Serialize};
29use serde_json::json;
30use tokio::sync::Mutex;
31use tracing::info;
32
33// Key prefixes for the unified database
34#[repr(u8)]
35#[derive(Clone, Copy, Debug)]
36pub enum DbKeyPrefix {
37    ClientDatabase = 0x00,
38    Mnemonic = 0x01,
39}
40
41#[derive(Debug, Clone, Encodable, Decodable, Eq, PartialEq, Hash)]
42pub struct MnemonicKey;
43
44impl_db_record!(
45    key = MnemonicKey,
46    value = Vec<u8>,
47    db_prefix = DbKeyPrefix::Mnemonic,
48);
49
50/// Parsed details from an OOB note.
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct ParsedNoteDetails {
53    /// Total amount of all notes in the OOB notes
54    pub total_amount: Amount,
55    /// Federation ID prefix (always present)
56    pub federation_id_prefix: FederationIdPrefix,
57    /// Full federation ID (if invite is present)
58    pub federation_id: Option<FederationId>,
59    /// Invite code to join the federation (if present)
60    pub invite_code: Option<InviteCode>,
61    /// Number of notes per denomination
62    pub note_counts: TieredCounts,
63}
64
65#[derive(Serialize, Deserialize)]
66#[serde(rename_all = "snake_case")]
67pub struct RpcRequest {
68    pub request_id: u64,
69    #[serde(flatten)]
70    pub kind: RpcRequestKind,
71}
72
73#[derive(Serialize, Deserialize)]
74#[serde(tag = "type", rename_all = "snake_case")]
75pub enum RpcRequestKind {
76    SetMnemonic {
77        words: Vec<String>,
78    },
79    GenerateMnemonic,
80    GetMnemonic,
81    /// Join federation (requires mnemonic to be set first)
82    JoinFederation {
83        invite_code: String,
84        force_recover: bool,
85        client_name: String,
86    },
87    OpenClient {
88        client_name: String,
89    },
90    CloseClient {
91        client_name: String,
92    },
93    ClientRpc {
94        client_name: String,
95        module: String,
96        method: String,
97        payload: serde_json::Value,
98    },
99    CancelRpc {
100        cancel_request_id: u64,
101    },
102    ParseInviteCode {
103        invite_code: String,
104    },
105    ParseBolt11Invoice {
106        invoice: String,
107    },
108    PreviewFederation {
109        invite_code: String,
110    },
111    ParseOobNotes {
112        oob_notes: String,
113    },
114}
115
116#[derive(Serialize, Deserialize, Clone, Debug)]
117pub struct RpcResponse {
118    pub request_id: u64,
119    #[serde(flatten)]
120    pub kind: RpcResponseKind,
121}
122
123#[derive(Serialize, Deserialize, Clone, Debug)]
124#[serde(tag = "type", rename_all = "snake_case")]
125pub enum RpcResponseKind {
126    Data { data: serde_json::Value },
127    Error { error: String },
128    Aborted {},
129    End {},
130}
131
132pub trait RpcResponseHandler: MaybeSend + MaybeSync {
133    fn handle_response(&self, response: RpcResponse);
134}
135
136pub struct RpcGlobalState {
137    clients: Mutex<HashMap<String, ClientHandleArc>>,
138    rpc_handles: std::sync::Mutex<HashMap<u64, AbortHandle>>,
139    unified_database: Database,
140}
141
142pub struct HandledRpc<'a> {
143    pub task: Option<BoxFuture<'a, ()>>,
144}
145
146impl RpcGlobalState {
147    pub fn new(unified_database: Database) -> Self {
148        Self {
149            clients: Mutex::new(HashMap::new()),
150            rpc_handles: std::sync::Mutex::new(HashMap::new()),
151            unified_database,
152        }
153    }
154
155    async fn add_client(&self, client_name: String, client: ClientHandleArc) {
156        let mut clients = self.clients.lock().await;
157        clients.insert(client_name, client);
158    }
159
160    async fn get_client(&self, client_name: &str) -> Option<ClientHandleArc> {
161        let clients = self.clients.lock().await;
162        clients.get(client_name).cloned()
163    }
164
165    fn add_rpc_handle(&self, request_id: u64, handle: AbortHandle) {
166        let mut handles = self.rpc_handles.lock().unwrap();
167        if handles.insert(request_id, handle).is_some() {
168            tracing::error!("RPC CLIENT ERROR: request id reuse detected");
169        }
170    }
171
172    fn remove_rpc_handle(&self, request_id: u64) -> Option<AbortHandle> {
173        let mut handles = self.rpc_handles.lock().unwrap();
174        handles.remove(&request_id)
175    }
176
177    async fn client_builder() -> Result<fedimint_client::ClientBuilder, anyhow::Error> {
178        let mut builder = fedimint_client::Client::builder().await?;
179        builder.with_module(MintClientInit);
180        builder.with_module(LightningClientInit::default());
181        builder.with_module(WalletClientInit(None));
182        builder.with_module(MetaClientInit);
183        builder.with_primary_module_kind(fedimint_mint_client::KIND);
184        Ok(builder)
185    }
186
187    /// Get client-specific database with proper prefix
188    async fn client_db(&self, client_name: String) -> anyhow::Result<Database> {
189        assert_eq!(client_name.len(), 36);
190
191        let unified_db = &self.unified_database;
192        let mut client_prefix = vec![DbKeyPrefix::ClientDatabase as u8];
193        client_prefix.extend_from_slice(client_name.as_bytes());
194        Ok(unified_db.with_prefix(client_prefix))
195    }
196
197    /// Handle joining federation using unified database
198    async fn handle_join_federation(
199        &self,
200        invite_code: String,
201        client_name: String,
202        force_recover: bool,
203    ) -> anyhow::Result<()> {
204        // Check if wallet mnemonic is set
205        let mnemonic = self
206            .get_mnemonic_from_db()
207            .await?
208            .context("No wallet mnemonic set. Please set or generate a mnemonic first.")?;
209
210        let client_db = self.client_db(client_name.clone()).await?;
211
212        let invite_code = InviteCode::from_str(&invite_code)?;
213        let federation_id = invite_code.federation_id();
214
215        // Derive federation-specific secret from wallet mnemonic
216        let federation_secret = self.derive_federation_secret(&mnemonic, &federation_id);
217
218        let builder = Self::client_builder().await?;
219        let preview = builder.preview(&invite_code).await?;
220
221        // Check if backup exists
222        let backup = preview
223            .download_backup_from_federation(RootSecret::StandardDoubleDerive(
224                federation_secret.clone(),
225            ))
226            .await?;
227
228        let client = if force_recover || backup.is_some() {
229            Arc::new(
230                preview
231                    .recover(
232                        client_db,
233                        RootSecret::StandardDoubleDerive(federation_secret),
234                        backup,
235                    )
236                    .await?,
237            )
238        } else {
239            Arc::new(
240                preview
241                    .join(
242                        client_db,
243                        RootSecret::StandardDoubleDerive(federation_secret),
244                    )
245                    .await?,
246            )
247        };
248
249        self.add_client(client_name, client).await;
250        Ok(())
251    }
252
253    async fn handle_open_client(&self, client_name: String) -> anyhow::Result<()> {
254        // Check if wallet mnemonic is set
255        let mnemonic = self
256            .get_mnemonic_from_db()
257            .await?
258            .context("No wallet mnemonic set. Please set or generate a mnemonic first.")?;
259
260        let client_db = self.client_db(client_name.clone()).await?;
261
262        if !fedimint_client::Client::is_initialized(&client_db).await {
263            anyhow::bail!("client is not initialized for this database");
264        }
265
266        // Get the client config to retrieve the federation ID
267        let client_config = fedimint_client::Client::get_config_from_db(&client_db)
268            .await
269            .context("Client config not found in database")?;
270
271        let federation_id = client_config.calculate_federation_id();
272
273        // Derive federation-specific secret from wallet mnemonic
274        let federation_secret = self.derive_federation_secret(&mnemonic, &federation_id);
275
276        let builder = Self::client_builder().await?;
277        let client = Arc::new(
278            builder
279                .open(
280                    client_db,
281                    RootSecret::StandardDoubleDerive(federation_secret),
282                )
283                .await?,
284        );
285
286        self.add_client(client_name, client).await;
287        Ok(())
288    }
289
290    async fn handle_close_client(&self, client_name: String) -> anyhow::Result<()> {
291        let mut clients = self.clients.lock().await;
292        let mut client = clients.remove(&client_name).context("client not found")?;
293
294        // RPC calls might have cloned the client Arc before we remove the client.
295        for attempt in 0.. {
296            info!(attempt, "waiting for RPCs to drop the federation object");
297            match Arc::try_unwrap(client) {
298                Ok(client) => {
299                    client.shutdown().await;
300                    break;
301                }
302                Err(client_val) => client = client_val,
303            }
304            fedimint_core::task::sleep(Duration::from_millis(100)).await;
305        }
306        Ok(())
307    }
308
309    fn handle_client_rpc(
310        self: Arc<Self>,
311        client_name: String,
312        module: String,
313        method: String,
314        payload: serde_json::Value,
315    ) -> BoxStream<'static, anyhow::Result<serde_json::Value>> {
316        Box::pin(try_stream! {
317            let client = self
318                .get_client(&client_name)
319                .await
320                .with_context(|| format!("Client not found: {client_name}"))?;
321            match module.as_str() {
322                "" => {
323                    let mut stream = client.handle_global_rpc(method, payload);
324                    while let Some(item) = stream.next().await {
325                        yield item?;
326                    }
327                }
328                "ln" => {
329                    let ln = client.get_first_module::<LightningClientModule>()?.inner();
330                    let mut stream = ln.handle_rpc(method, payload).await;
331                    while let Some(item) = stream.next().await {
332                        yield item?;
333                    }
334                }
335                "mint" => {
336                    let mint = client.get_first_module::<MintClientModule>()?.inner();
337                    let mut stream = mint.handle_rpc(method, payload).await;
338                    while let Some(item) = stream.next().await {
339                        yield item?;
340                    }
341                }
342                "wallet" => {
343                    let wallet = client
344                        .get_first_module::<WalletClientModule>()?
345                        .inner();
346                    let mut stream = wallet.handle_rpc(method, payload).await;
347                    while let Some(item) = stream.next().await {
348                        yield item?;
349                    }
350                }
351                _ => {
352                    Err(anyhow::format_err!("module not found: {module}"))?;
353                },
354            };
355        })
356    }
357
358    fn parse_invite_code(&self, invite_code: String) -> anyhow::Result<serde_json::Value> {
359        let invite_code = InviteCode::from_str(&invite_code)?;
360
361        Ok(json!({
362            "url": invite_code.url(),
363            "federation_id": invite_code.federation_id(),
364        }))
365    }
366
367    fn parse_bolt11_invoice(&self, invoice_str: String) -> anyhow::Result<serde_json::Value> {
368        let invoice = lightning_invoice::Bolt11Invoice::from_str(&invoice_str)
369            .map_err(|e| anyhow::anyhow!("Failed to parse Lightning invoice: {}", e))?;
370
371        let amount_msat = invoice.amount_milli_satoshis().unwrap_or(0);
372        let amount_sat = amount_msat as f64 / 1000.0;
373
374        let expiry_seconds = invoice.expiry_time().as_secs();
375
376        // memo
377        let description = match invoice.description() {
378            Bolt11InvoiceDescriptionRef::Direct(desc) => desc.to_string(),
379            Bolt11InvoiceDescriptionRef::Hash(_) => "Description hash only".to_string(),
380        };
381
382        Ok(json!({
383            "amount": amount_sat,
384            "expiry": expiry_seconds,
385            "memo": description,
386        }))
387    }
388
389    async fn preview_federation(&self, invite_code: String) -> anyhow::Result<serde_json::Value> {
390        let invite = InviteCode::from_str(&invite_code)?;
391        let (client_config, _) = fedimint_api_client::api::net::Connector::default()
392            .download_from_invite_code(
393                &invite, /* TODO: how should rpc clients control this? */ false, false,
394            )
395            .await?;
396        let json_config = client_config.to_json();
397        let federation_id = client_config.calculate_federation_id();
398
399        Ok(json!({
400            "config": json_config,
401            "federation_id": federation_id.to_string(),
402        }))
403    }
404
405    fn handle_rpc_inner(
406        self: Arc<Self>,
407        request: RpcRequest,
408    ) -> Option<BoxStream<'static, anyhow::Result<serde_json::Value>>> {
409        match request.kind {
410            RpcRequestKind::SetMnemonic { words } => Some(Box::pin(try_stream! {
411                self.set_mnemonic(words).await?;
412                yield serde_json::json!({ "success": true });
413            })),
414            RpcRequestKind::GenerateMnemonic => Some(Box::pin(try_stream! {
415                let words = self.generate_mnemonic().await?;
416                yield serde_json::json!({ "mnemonic": words });
417            })),
418            RpcRequestKind::GetMnemonic => Some(Box::pin(try_stream! {
419                let words = self.get_mnemonic_words().await?;
420                yield serde_json::json!({ "mnemonic": words });
421            })),
422            RpcRequestKind::JoinFederation {
423                invite_code,
424                client_name,
425                force_recover,
426            } => Some(Box::pin(try_stream! {
427                self.handle_join_federation(invite_code, client_name, force_recover)
428                    .await?;
429                yield serde_json::json!(null);
430            })),
431            RpcRequestKind::OpenClient { client_name } => Some(Box::pin(try_stream! {
432                self.handle_open_client(client_name).await?;
433                yield serde_json::json!(null);
434            })),
435            RpcRequestKind::CloseClient { client_name } => Some(Box::pin(try_stream! {
436                self.handle_close_client(client_name).await?;
437                yield serde_json::json!(null);
438            })),
439            RpcRequestKind::ClientRpc {
440                client_name,
441                module,
442                method,
443                payload,
444            } => Some(self.handle_client_rpc(client_name, module, method, payload)),
445            RpcRequestKind::ParseInviteCode { invite_code } => Some(Box::pin(try_stream! {
446                let result = self.parse_invite_code(invite_code)?;
447                yield result;
448            })),
449            RpcRequestKind::ParseBolt11Invoice { invoice } => Some(Box::pin(try_stream! {
450                let result = self.parse_bolt11_invoice(invoice)?;
451                yield result;
452            })),
453            RpcRequestKind::PreviewFederation { invite_code } => Some(Box::pin(try_stream! {
454                let result = self.preview_federation(invite_code).await?;
455                yield result;
456            })),
457            RpcRequestKind::ParseOobNotes { oob_notes } => Some(Box::pin(try_stream! {
458                let parsed = parse_oob_notes(&oob_notes)?;
459                yield serde_json::to_value(parsed)?;
460            })),
461            RpcRequestKind::CancelRpc { cancel_request_id } => {
462                if let Some(handle) = self.remove_rpc_handle(cancel_request_id) {
463                    handle.abort();
464                }
465                None
466            }
467        }
468    }
469
470    pub fn handle_rpc(
471        self: Arc<Self>,
472        request: RpcRequest,
473        handler: impl RpcResponseHandler + 'static,
474    ) -> HandledRpc<'static> {
475        let request_id = request.request_id;
476
477        let Some(stream) = self.clone().handle_rpc_inner(request) else {
478            return HandledRpc { task: None };
479        };
480
481        let (abort_handle, abort_registration) = AbortHandle::new_pair();
482        self.add_rpc_handle(request_id, abort_handle);
483
484        let task = Box::pin(async move {
485            let mut stream = Abortable::new(stream, abort_registration);
486
487            while let Some(result) = stream.next().await {
488                let response = match result {
489                    Ok(value) => RpcResponse {
490                        request_id,
491                        kind: RpcResponseKind::Data { data: value },
492                    },
493                    Err(e) => RpcResponse {
494                        request_id,
495                        kind: RpcResponseKind::Error {
496                            error: e.to_string(),
497                        },
498                    },
499                };
500                handler.handle_response(response);
501            }
502
503            // Clean up abort handle and send end message
504            let _ = self.remove_rpc_handle(request_id);
505            handler.handle_response(RpcResponse {
506                request_id,
507                kind: if stream.is_aborted() {
508                    RpcResponseKind::Aborted {}
509                } else {
510                    RpcResponseKind::End {}
511                },
512            });
513        });
514
515        HandledRpc { task: Some(task) }
516    }
517
518    /// Retrieve the wallet-level mnemonic words.
519    /// Returns the mnemonic as a vector of words, or None if no mnemonic is
520    /// set.
521    async fn get_mnemonic_words(&self) -> anyhow::Result<Option<Vec<String>>> {
522        let mnemonic = self.get_mnemonic_from_db().await?;
523
524        if let Some(mnemonic) = mnemonic {
525            let words = mnemonic.words().map(|w| w.to_string()).collect();
526            Ok(Some(words))
527        } else {
528            Ok(None)
529        }
530    }
531    /// Set a mnemonic from user-provided words
532    /// Returns an error if a mnemonic is already set
533    async fn set_mnemonic(&self, words: Vec<String>) -> anyhow::Result<()> {
534        let all_words = words.join(" ");
535        let mnemonic =
536            Mnemonic::parse_in_normalized(fedimint_bip39::Language::English, &all_words)?;
537
538        let mut dbtx = self.unified_database.begin_transaction().await;
539
540        if dbtx.get_value(&MnemonicKey).await.is_some() {
541            anyhow::bail!(
542                "Wallet mnemonic already exists. Please clear existing data before setting a new mnemonic."
543            );
544        }
545
546        dbtx.insert_new_entry(&MnemonicKey, &mnemonic.to_entropy())
547            .await;
548
549        dbtx.commit_tx().await;
550
551        Ok(())
552    }
553
554    /// Generate a new random mnemonic and set it
555    /// Returns an error if a mnemonic is already set
556    async fn generate_mnemonic(&self) -> anyhow::Result<Vec<String>> {
557        let mnemonic = Bip39RootSecretStrategy::<12>::random(&mut thread_rng());
558        let words: Vec<String> = mnemonic.words().map(|w| w.to_string()).collect();
559
560        let mut dbtx = self.unified_database.begin_transaction().await;
561
562        if dbtx.get_value(&MnemonicKey).await.is_some() {
563            anyhow::bail!(
564                "Wallet mnemonic already exists. Please clear existing data before generating a new mnemonic."
565            );
566        }
567
568        dbtx.insert_new_entry(&MnemonicKey, &mnemonic.to_entropy())
569            .await;
570
571        dbtx.commit_tx().await;
572
573        Ok(words)
574    }
575
576    /// Derive federation-specific secret from wallet mnemonic
577    fn derive_federation_secret(
578        &self,
579        mnemonic: &Mnemonic,
580        federation_id: &FederationId,
581    ) -> DerivableSecret {
582        let global_root_secret = Bip39RootSecretStrategy::<12>::to_root_secret(mnemonic);
583        let multi_federation_root_secret = global_root_secret.child_key(ChildId(0));
584        let federation_root_secret = multi_federation_root_secret.federation_key(federation_id);
585        let federation_wallet_root_secret = federation_root_secret.child_key(ChildId(0));
586        federation_wallet_root_secret.child_key(ChildId(0))
587    }
588
589    /// Fetch mnemonic from database
590    async fn get_mnemonic_from_db(&self) -> anyhow::Result<Option<Mnemonic>> {
591        let mut dbtx = self.unified_database.begin_transaction_nc().await;
592
593        if let Some(mnemonic_entropy) = dbtx.get_value(&MnemonicKey).await {
594            let mnemonic = Mnemonic::from_entropy(&mnemonic_entropy)?;
595            Ok(Some(mnemonic))
596        } else {
597            Ok(None)
598        }
599    }
600}
601
602pub fn parse_oob_notes(oob_notes_str: &str) -> anyhow::Result<ParsedNoteDetails> {
603    let oob_notes =
604        OOBNotes::from_str(oob_notes_str).context("Failed to parse OOB notes string")?;
605
606    let total_amount = oob_notes.total_amount();
607    let federation_id_prefix = oob_notes.federation_id_prefix();
608    let invite_code = oob_notes.federation_invite();
609    let federation_id = invite_code.as_ref().map(|inv| inv.federation_id());
610
611    // Get note counts by denomination
612    let notes = oob_notes.notes();
613    let mut note_counts = TieredCounts::default();
614    for (amount, _note) in notes.iter_items() {
615        note_counts.inc(amount, 1);
616    }
617
618    Ok(ParsedNoteDetails {
619        total_amount,
620        federation_id_prefix,
621        federation_id,
622        invite_code,
623        note_counts,
624    })
625}