1use std::collections::HashMap;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::Context;
7use async_stream::try_stream;
8use fedimint_bip39::{Bip39RootSecretStrategy, Mnemonic};
9use fedimint_client::module::ClientModule;
10use fedimint_client::secret::RootSecretStrategy;
11use fedimint_client::{ClientHandleArc, RootSecret};
12use fedimint_core::config::{FederationId, FederationIdPrefix};
13use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
14use fedimint_core::encoding::{Decodable, Encodable};
15use fedimint_core::invite_code::InviteCode;
16use fedimint_core::task::{MaybeSend, MaybeSync};
17use fedimint_core::util::{BoxFuture, BoxStream};
18use fedimint_core::{Amount, TieredCounts, impl_db_record};
19use fedimint_derive_secret::{ChildId, DerivableSecret};
20use fedimint_ln_client::{LightningClientInit, LightningClientModule};
21use fedimint_meta_client::MetaClientInit;
22use fedimint_mint_client::{MintClientInit, MintClientModule, OOBNotes};
23use fedimint_wallet_client::{WalletClientInit, WalletClientModule};
24use futures::StreamExt;
25use futures::future::{AbortHandle, Abortable};
26use lightning_invoice::Bolt11InvoiceDescriptionRef;
27use rand::thread_rng;
28use serde::{Deserialize, Serialize};
29use serde_json::json;
30use tokio::sync::Mutex;
31use tracing::info;
32
33#[repr(u8)]
35#[derive(Clone, Copy, Debug)]
36pub enum DbKeyPrefix {
37 ClientDatabase = 0x00,
38 Mnemonic = 0x01,
39}
40
41#[derive(Debug, Clone, Encodable, Decodable, Eq, PartialEq, Hash)]
42pub struct MnemonicKey;
43
44impl_db_record!(
45 key = MnemonicKey,
46 value = Vec<u8>,
47 db_prefix = DbKeyPrefix::Mnemonic,
48);
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct ParsedNoteDetails {
53 pub total_amount: Amount,
55 pub federation_id_prefix: FederationIdPrefix,
57 pub federation_id: Option<FederationId>,
59 pub invite_code: Option<InviteCode>,
61 pub note_counts: TieredCounts,
63}
64
65#[derive(Serialize, Deserialize)]
66#[serde(rename_all = "snake_case")]
67pub struct RpcRequest {
68 pub request_id: u64,
69 #[serde(flatten)]
70 pub kind: RpcRequestKind,
71}
72
73#[derive(Serialize, Deserialize)]
74#[serde(tag = "type", rename_all = "snake_case")]
75pub enum RpcRequestKind {
76 SetMnemonic {
77 words: Vec<String>,
78 },
79 GenerateMnemonic,
80 GetMnemonic,
81 JoinFederation {
83 invite_code: String,
84 force_recover: bool,
85 client_name: String,
86 },
87 OpenClient {
88 client_name: String,
89 },
90 CloseClient {
91 client_name: String,
92 },
93 ClientRpc {
94 client_name: String,
95 module: String,
96 method: String,
97 payload: serde_json::Value,
98 },
99 CancelRpc {
100 cancel_request_id: u64,
101 },
102 ParseInviteCode {
103 invite_code: String,
104 },
105 ParseBolt11Invoice {
106 invoice: String,
107 },
108 PreviewFederation {
109 invite_code: String,
110 },
111 ParseOobNotes {
112 oob_notes: String,
113 },
114}
115
116#[derive(Serialize, Deserialize, Clone, Debug)]
117pub struct RpcResponse {
118 pub request_id: u64,
119 #[serde(flatten)]
120 pub kind: RpcResponseKind,
121}
122
123#[derive(Serialize, Deserialize, Clone, Debug)]
124#[serde(tag = "type", rename_all = "snake_case")]
125pub enum RpcResponseKind {
126 Data { data: serde_json::Value },
127 Error { error: String },
128 Aborted {},
129 End {},
130}
131
132pub trait RpcResponseHandler: MaybeSend + MaybeSync {
133 fn handle_response(&self, response: RpcResponse);
134}
135
136pub struct RpcGlobalState {
137 clients: Mutex<HashMap<String, ClientHandleArc>>,
138 rpc_handles: std::sync::Mutex<HashMap<u64, AbortHandle>>,
139 unified_database: Database,
140}
141
142pub struct HandledRpc<'a> {
143 pub task: Option<BoxFuture<'a, ()>>,
144}
145
146impl RpcGlobalState {
147 pub fn new(unified_database: Database) -> Self {
148 Self {
149 clients: Mutex::new(HashMap::new()),
150 rpc_handles: std::sync::Mutex::new(HashMap::new()),
151 unified_database,
152 }
153 }
154
155 async fn add_client(&self, client_name: String, client: ClientHandleArc) {
156 let mut clients = self.clients.lock().await;
157 clients.insert(client_name, client);
158 }
159
160 async fn get_client(&self, client_name: &str) -> Option<ClientHandleArc> {
161 let clients = self.clients.lock().await;
162 clients.get(client_name).cloned()
163 }
164
165 fn add_rpc_handle(&self, request_id: u64, handle: AbortHandle) {
166 let mut handles = self.rpc_handles.lock().unwrap();
167 if handles.insert(request_id, handle).is_some() {
168 tracing::error!("RPC CLIENT ERROR: request id reuse detected");
169 }
170 }
171
172 fn remove_rpc_handle(&self, request_id: u64) -> Option<AbortHandle> {
173 let mut handles = self.rpc_handles.lock().unwrap();
174 handles.remove(&request_id)
175 }
176
177 async fn client_builder() -> Result<fedimint_client::ClientBuilder, anyhow::Error> {
178 let mut builder = fedimint_client::Client::builder().await?;
179 builder.with_module(MintClientInit);
180 builder.with_module(LightningClientInit::default());
181 builder.with_module(WalletClientInit(None));
182 builder.with_module(MetaClientInit);
183 builder.with_primary_module_kind(fedimint_mint_client::KIND);
184 Ok(builder)
185 }
186
187 async fn client_db(&self, client_name: String) -> anyhow::Result<Database> {
189 assert_eq!(client_name.len(), 36);
190
191 let unified_db = &self.unified_database;
192 let mut client_prefix = vec![DbKeyPrefix::ClientDatabase as u8];
193 client_prefix.extend_from_slice(client_name.as_bytes());
194 Ok(unified_db.with_prefix(client_prefix))
195 }
196
197 async fn handle_join_federation(
199 &self,
200 invite_code: String,
201 client_name: String,
202 force_recover: bool,
203 ) -> anyhow::Result<()> {
204 let mnemonic = self
206 .get_mnemonic_from_db()
207 .await?
208 .context("No wallet mnemonic set. Please set or generate a mnemonic first.")?;
209
210 let client_db = self.client_db(client_name.clone()).await?;
211
212 let invite_code = InviteCode::from_str(&invite_code)?;
213 let federation_id = invite_code.federation_id();
214
215 let federation_secret = self.derive_federation_secret(&mnemonic, &federation_id);
217
218 let builder = Self::client_builder().await?;
219 let preview = builder.preview(&invite_code).await?;
220
221 let backup = preview
223 .download_backup_from_federation(RootSecret::StandardDoubleDerive(
224 federation_secret.clone(),
225 ))
226 .await?;
227
228 let client = if force_recover || backup.is_some() {
229 Arc::new(
230 preview
231 .recover(
232 client_db,
233 RootSecret::StandardDoubleDerive(federation_secret),
234 backup,
235 )
236 .await?,
237 )
238 } else {
239 Arc::new(
240 preview
241 .join(
242 client_db,
243 RootSecret::StandardDoubleDerive(federation_secret),
244 )
245 .await?,
246 )
247 };
248
249 self.add_client(client_name, client).await;
250 Ok(())
251 }
252
253 async fn handle_open_client(&self, client_name: String) -> anyhow::Result<()> {
254 let mnemonic = self
256 .get_mnemonic_from_db()
257 .await?
258 .context("No wallet mnemonic set. Please set or generate a mnemonic first.")?;
259
260 let client_db = self.client_db(client_name.clone()).await?;
261
262 if !fedimint_client::Client::is_initialized(&client_db).await {
263 anyhow::bail!("client is not initialized for this database");
264 }
265
266 let client_config = fedimint_client::Client::get_config_from_db(&client_db)
268 .await
269 .context("Client config not found in database")?;
270
271 let federation_id = client_config.calculate_federation_id();
272
273 let federation_secret = self.derive_federation_secret(&mnemonic, &federation_id);
275
276 let builder = Self::client_builder().await?;
277 let client = Arc::new(
278 builder
279 .open(
280 client_db,
281 RootSecret::StandardDoubleDerive(federation_secret),
282 )
283 .await?,
284 );
285
286 self.add_client(client_name, client).await;
287 Ok(())
288 }
289
290 async fn handle_close_client(&self, client_name: String) -> anyhow::Result<()> {
291 let mut clients = self.clients.lock().await;
292 let mut client = clients.remove(&client_name).context("client not found")?;
293
294 for attempt in 0.. {
296 info!(attempt, "waiting for RPCs to drop the federation object");
297 match Arc::try_unwrap(client) {
298 Ok(client) => {
299 client.shutdown().await;
300 break;
301 }
302 Err(client_val) => client = client_val,
303 }
304 fedimint_core::task::sleep(Duration::from_millis(100)).await;
305 }
306 Ok(())
307 }
308
309 fn handle_client_rpc(
310 self: Arc<Self>,
311 client_name: String,
312 module: String,
313 method: String,
314 payload: serde_json::Value,
315 ) -> BoxStream<'static, anyhow::Result<serde_json::Value>> {
316 Box::pin(try_stream! {
317 let client = self
318 .get_client(&client_name)
319 .await
320 .with_context(|| format!("Client not found: {client_name}"))?;
321 match module.as_str() {
322 "" => {
323 let mut stream = client.handle_global_rpc(method, payload);
324 while let Some(item) = stream.next().await {
325 yield item?;
326 }
327 }
328 "ln" => {
329 let ln = client.get_first_module::<LightningClientModule>()?.inner();
330 let mut stream = ln.handle_rpc(method, payload).await;
331 while let Some(item) = stream.next().await {
332 yield item?;
333 }
334 }
335 "mint" => {
336 let mint = client.get_first_module::<MintClientModule>()?.inner();
337 let mut stream = mint.handle_rpc(method, payload).await;
338 while let Some(item) = stream.next().await {
339 yield item?;
340 }
341 }
342 "wallet" => {
343 let wallet = client
344 .get_first_module::<WalletClientModule>()?
345 .inner();
346 let mut stream = wallet.handle_rpc(method, payload).await;
347 while let Some(item) = stream.next().await {
348 yield item?;
349 }
350 }
351 _ => {
352 Err(anyhow::format_err!("module not found: {module}"))?;
353 },
354 };
355 })
356 }
357
358 fn parse_invite_code(&self, invite_code: String) -> anyhow::Result<serde_json::Value> {
359 let invite_code = InviteCode::from_str(&invite_code)?;
360
361 Ok(json!({
362 "url": invite_code.url(),
363 "federation_id": invite_code.federation_id(),
364 }))
365 }
366
367 fn parse_bolt11_invoice(&self, invoice_str: String) -> anyhow::Result<serde_json::Value> {
368 let invoice = lightning_invoice::Bolt11Invoice::from_str(&invoice_str)
369 .map_err(|e| anyhow::anyhow!("Failed to parse Lightning invoice: {}", e))?;
370
371 let amount_msat = invoice.amount_milli_satoshis().unwrap_or(0);
372 let amount_sat = amount_msat as f64 / 1000.0;
373
374 let expiry_seconds = invoice.expiry_time().as_secs();
375
376 let description = match invoice.description() {
378 Bolt11InvoiceDescriptionRef::Direct(desc) => desc.to_string(),
379 Bolt11InvoiceDescriptionRef::Hash(_) => "Description hash only".to_string(),
380 };
381
382 Ok(json!({
383 "amount": amount_sat,
384 "expiry": expiry_seconds,
385 "memo": description,
386 }))
387 }
388
389 async fn preview_federation(&self, invite_code: String) -> anyhow::Result<serde_json::Value> {
390 let invite = InviteCode::from_str(&invite_code)?;
391 let (client_config, _) = fedimint_api_client::api::net::Connector::default()
392 .download_from_invite_code(
393 &invite, false, false,
394 )
395 .await?;
396 let json_config = client_config.to_json();
397 let federation_id = client_config.calculate_federation_id();
398
399 Ok(json!({
400 "config": json_config,
401 "federation_id": federation_id.to_string(),
402 }))
403 }
404
405 fn handle_rpc_inner(
406 self: Arc<Self>,
407 request: RpcRequest,
408 ) -> Option<BoxStream<'static, anyhow::Result<serde_json::Value>>> {
409 match request.kind {
410 RpcRequestKind::SetMnemonic { words } => Some(Box::pin(try_stream! {
411 self.set_mnemonic(words).await?;
412 yield serde_json::json!({ "success": true });
413 })),
414 RpcRequestKind::GenerateMnemonic => Some(Box::pin(try_stream! {
415 let words = self.generate_mnemonic().await?;
416 yield serde_json::json!({ "mnemonic": words });
417 })),
418 RpcRequestKind::GetMnemonic => Some(Box::pin(try_stream! {
419 let words = self.get_mnemonic_words().await?;
420 yield serde_json::json!({ "mnemonic": words });
421 })),
422 RpcRequestKind::JoinFederation {
423 invite_code,
424 client_name,
425 force_recover,
426 } => Some(Box::pin(try_stream! {
427 self.handle_join_federation(invite_code, client_name, force_recover)
428 .await?;
429 yield serde_json::json!(null);
430 })),
431 RpcRequestKind::OpenClient { client_name } => Some(Box::pin(try_stream! {
432 self.handle_open_client(client_name).await?;
433 yield serde_json::json!(null);
434 })),
435 RpcRequestKind::CloseClient { client_name } => Some(Box::pin(try_stream! {
436 self.handle_close_client(client_name).await?;
437 yield serde_json::json!(null);
438 })),
439 RpcRequestKind::ClientRpc {
440 client_name,
441 module,
442 method,
443 payload,
444 } => Some(self.handle_client_rpc(client_name, module, method, payload)),
445 RpcRequestKind::ParseInviteCode { invite_code } => Some(Box::pin(try_stream! {
446 let result = self.parse_invite_code(invite_code)?;
447 yield result;
448 })),
449 RpcRequestKind::ParseBolt11Invoice { invoice } => Some(Box::pin(try_stream! {
450 let result = self.parse_bolt11_invoice(invoice)?;
451 yield result;
452 })),
453 RpcRequestKind::PreviewFederation { invite_code } => Some(Box::pin(try_stream! {
454 let result = self.preview_federation(invite_code).await?;
455 yield result;
456 })),
457 RpcRequestKind::ParseOobNotes { oob_notes } => Some(Box::pin(try_stream! {
458 let parsed = parse_oob_notes(&oob_notes)?;
459 yield serde_json::to_value(parsed)?;
460 })),
461 RpcRequestKind::CancelRpc { cancel_request_id } => {
462 if let Some(handle) = self.remove_rpc_handle(cancel_request_id) {
463 handle.abort();
464 }
465 None
466 }
467 }
468 }
469
470 pub fn handle_rpc(
471 self: Arc<Self>,
472 request: RpcRequest,
473 handler: impl RpcResponseHandler + 'static,
474 ) -> HandledRpc<'static> {
475 let request_id = request.request_id;
476
477 let Some(stream) = self.clone().handle_rpc_inner(request) else {
478 return HandledRpc { task: None };
479 };
480
481 let (abort_handle, abort_registration) = AbortHandle::new_pair();
482 self.add_rpc_handle(request_id, abort_handle);
483
484 let task = Box::pin(async move {
485 let mut stream = Abortable::new(stream, abort_registration);
486
487 while let Some(result) = stream.next().await {
488 let response = match result {
489 Ok(value) => RpcResponse {
490 request_id,
491 kind: RpcResponseKind::Data { data: value },
492 },
493 Err(e) => RpcResponse {
494 request_id,
495 kind: RpcResponseKind::Error {
496 error: e.to_string(),
497 },
498 },
499 };
500 handler.handle_response(response);
501 }
502
503 let _ = self.remove_rpc_handle(request_id);
505 handler.handle_response(RpcResponse {
506 request_id,
507 kind: if stream.is_aborted() {
508 RpcResponseKind::Aborted {}
509 } else {
510 RpcResponseKind::End {}
511 },
512 });
513 });
514
515 HandledRpc { task: Some(task) }
516 }
517
518 async fn get_mnemonic_words(&self) -> anyhow::Result<Option<Vec<String>>> {
522 let mnemonic = self.get_mnemonic_from_db().await?;
523
524 if let Some(mnemonic) = mnemonic {
525 let words = mnemonic.words().map(|w| w.to_string()).collect();
526 Ok(Some(words))
527 } else {
528 Ok(None)
529 }
530 }
531 async fn set_mnemonic(&self, words: Vec<String>) -> anyhow::Result<()> {
534 let all_words = words.join(" ");
535 let mnemonic =
536 Mnemonic::parse_in_normalized(fedimint_bip39::Language::English, &all_words)?;
537
538 let mut dbtx = self.unified_database.begin_transaction().await;
539
540 if dbtx.get_value(&MnemonicKey).await.is_some() {
541 anyhow::bail!(
542 "Wallet mnemonic already exists. Please clear existing data before setting a new mnemonic."
543 );
544 }
545
546 dbtx.insert_new_entry(&MnemonicKey, &mnemonic.to_entropy())
547 .await;
548
549 dbtx.commit_tx().await;
550
551 Ok(())
552 }
553
554 async fn generate_mnemonic(&self) -> anyhow::Result<Vec<String>> {
557 let mnemonic = Bip39RootSecretStrategy::<12>::random(&mut thread_rng());
558 let words: Vec<String> = mnemonic.words().map(|w| w.to_string()).collect();
559
560 let mut dbtx = self.unified_database.begin_transaction().await;
561
562 if dbtx.get_value(&MnemonicKey).await.is_some() {
563 anyhow::bail!(
564 "Wallet mnemonic already exists. Please clear existing data before generating a new mnemonic."
565 );
566 }
567
568 dbtx.insert_new_entry(&MnemonicKey, &mnemonic.to_entropy())
569 .await;
570
571 dbtx.commit_tx().await;
572
573 Ok(words)
574 }
575
576 fn derive_federation_secret(
578 &self,
579 mnemonic: &Mnemonic,
580 federation_id: &FederationId,
581 ) -> DerivableSecret {
582 let global_root_secret = Bip39RootSecretStrategy::<12>::to_root_secret(mnemonic);
583 let multi_federation_root_secret = global_root_secret.child_key(ChildId(0));
584 let federation_root_secret = multi_federation_root_secret.federation_key(federation_id);
585 let federation_wallet_root_secret = federation_root_secret.child_key(ChildId(0));
586 federation_wallet_root_secret.child_key(ChildId(0))
587 }
588
589 async fn get_mnemonic_from_db(&self) -> anyhow::Result<Option<Mnemonic>> {
591 let mut dbtx = self.unified_database.begin_transaction_nc().await;
592
593 if let Some(mnemonic_entropy) = dbtx.get_value(&MnemonicKey).await {
594 let mnemonic = Mnemonic::from_entropy(&mnemonic_entropy)?;
595 Ok(Some(mnemonic))
596 } else {
597 Ok(None)
598 }
599 }
600}
601
602pub fn parse_oob_notes(oob_notes_str: &str) -> anyhow::Result<ParsedNoteDetails> {
603 let oob_notes =
604 OOBNotes::from_str(oob_notes_str).context("Failed to parse OOB notes string")?;
605
606 let total_amount = oob_notes.total_amount();
607 let federation_id_prefix = oob_notes.federation_id_prefix();
608 let invite_code = oob_notes.federation_invite();
609 let federation_id = invite_code.as_ref().map(|inv| inv.federation_id());
610
611 let notes = oob_notes.notes();
613 let mut note_counts = TieredCounts::default();
614 for (amount, _note) in notes.iter_items() {
615 note_counts.inc(amount, 1);
616 }
617
618 Ok(ParsedNoteDetails {
619 total_amount,
620 federation_id_prefix,
621 federation_id,
622 invite_code,
623 note_counts,
624 })
625}