1#[cfg(any(
2 feature = "btc-esplora",
3 feature = "btc-core",
4 feature = "btc-electrum"
5))]
6use crate::provider::btc::BtcProvider;
7#[cfg(feature = "cashu")]
8use crate::provider::cashu::CashuProvider;
9#[cfg(feature = "evm")]
10use crate::provider::evm::EvmProvider;
11#[cfg(any(feature = "ln-nwc", feature = "ln-phoenixd", feature = "ln-lnbits"))]
12use crate::provider::ln::LnProvider;
13use crate::provider::remote::RemoteProvider;
14#[cfg(feature = "sol")]
15use crate::provider::sol::SolProvider;
16use crate::provider::{HistorySyncStats, PayError, PayProvider, StubProvider};
17use crate::spend::{SpendContext, SpendLedger};
18#[cfg(feature = "redb")]
19use crate::store::lock;
20use crate::store::wallet;
21use crate::store::{PayStore, StorageBackend};
22use crate::types::*;
23use std::collections::HashMap;
24use std::sync::atomic::{AtomicU64, Ordering};
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use tokio::sync::{mpsc, Mutex, RwLock};
28use tokio::task::JoinHandle;
29use tokio::time::sleep;
30
31const DEFAULT_WAIT_TIMEOUT_SECS: u64 = 300;
32const DEFAULT_WAIT_POLL_INTERVAL_MS: u64 = 1000;
33
34pub struct App {
35 pub config: RwLock<RuntimeConfig>,
36 pub providers: HashMap<Network, Box<dyn PayProvider>>,
37 pub writer: mpsc::Sender<Output>,
38 pub in_flight: Mutex<HashMap<String, JoinHandle<()>>>,
39 pub requests_total: AtomicU64,
40 pub start_time: Instant,
41 pub has_local_providers: bool,
43 pub enforce_limits: bool,
46 pub spend_ledger: SpendLedger,
47 pub store: Option<Arc<StorageBackend>>,
50}
51
52impl App {
53 pub fn new(
56 config: RuntimeConfig,
57 writer: mpsc::Sender<Output>,
58 enforce_limits_override: Option<bool>,
59 store: Option<StorageBackend>,
60 ) -> Self {
61 let store = store.map(Arc::new);
62 let mut providers: HashMap<Network, Box<dyn PayProvider>> = HashMap::new();
63
64 for network in &[
65 Network::Ln,
66 Network::Sol,
67 Network::Evm,
68 Network::Cashu,
69 Network::Btc,
70 ] {
71 let key = network.to_string();
72 if let Some(rpc_name) = config.providers.get(&key) {
73 if let Some(rpc_cfg) = config.afpay_rpc.get(rpc_name) {
75 let secret = rpc_cfg.endpoint_secret.as_deref().unwrap_or("");
76 providers.insert(
77 *network,
78 Box::new(RemoteProvider::new(&rpc_cfg.endpoint, secret, *network)),
79 );
80 } else {
81 providers.insert(*network, Box::new(StubProvider::new(*network)));
83 }
84 } else {
85 #[allow(unreachable_patterns)]
86 match network {
87 #[cfg(feature = "cashu")]
88 Network::Cashu => {
89 if let Some(s) = &store {
90 let pg_url = config
91 .postgres_url_secret
92 .clone()
93 .filter(|_| config.storage_backend.as_deref() == Some("postgres"));
94 providers.insert(
95 *network,
96 Box::new(CashuProvider::new(&config.data_dir, pg_url, s.clone())),
97 );
98 } else {
99 providers.insert(*network, Box::new(StubProvider::new(*network)));
100 }
101 }
102 #[cfg(any(feature = "ln-nwc", feature = "ln-phoenixd", feature = "ln-lnbits"))]
103 Network::Ln => {
104 providers.insert(*network, Box::new(LnProvider::new(&config.data_dir)));
105 }
106 #[cfg(feature = "sol")]
107 Network::Sol => {
108 providers.insert(*network, Box::new(SolProvider::new(&config.data_dir)));
109 }
110 #[cfg(feature = "evm")]
111 Network::Evm => {
112 providers.insert(*network, Box::new(EvmProvider::new(&config.data_dir)));
113 }
114 #[cfg(any(
115 feature = "btc-esplora",
116 feature = "btc-core",
117 feature = "btc-electrum"
118 ))]
119 Network::Btc => {
120 providers.insert(*network, Box::new(BtcProvider::new(&config.data_dir)));
121 }
122 _ => {
123 providers.insert(*network, Box::new(StubProvider::new(*network)));
124 }
125 }
126 }
127 }
128
129 let has_local = providers.values().any(|p| p.writes_locally());
130 let spend_ledger = match store.as_deref() {
131 #[cfg(feature = "postgres")]
132 Some(StorageBackend::Postgres(pg)) => {
133 SpendLedger::new_postgres(pg.pool().clone(), config.exchange_rate.clone())
134 }
135 _ => SpendLedger::new(&config.data_dir, config.exchange_rate.clone()),
136 };
137 Self {
138 config: RwLock::new(config),
139 providers,
140 writer,
141 in_flight: Mutex::new(HashMap::new()),
142 requests_total: AtomicU64::new(0),
143 start_time: Instant::now(),
144 has_local_providers: has_local,
145 enforce_limits: enforce_limits_override.unwrap_or(has_local),
146 spend_ledger,
147 store,
148 }
149 }
150}
151
152pub async fn startup_provider_validation_errors(config: &RuntimeConfig) -> Vec<Output> {
155 let mut errors = Vec::new();
156
157 for (network, rpc_name) in &config.providers {
159 if !config.afpay_rpc.contains_key(rpc_name) {
160 errors.push(Output::Error {
161 id: None,
162 error_code: "invalid_config".to_string(),
163 error: format!(
164 "providers.{network} references unknown afpay_rpc node '{rpc_name}'"
165 ),
166 hint: Some(format!(
167 "add [afpay_rpc.{rpc_name}] with endpoint and endpoint_secret to config.toml"
168 )),
169 retryable: false,
170 trace: Trace::from_duration(0),
171 });
172 }
173 }
174 if !errors.is_empty() {
175 return errors;
176 }
177
178 let mut pinged: std::collections::HashSet<String> = std::collections::HashSet::new();
180 for (rpc_name, rpc_cfg) in &config.afpay_rpc {
181 if !pinged.insert(rpc_cfg.endpoint.clone()) {
182 continue;
183 }
184 let network = config
186 .providers
187 .iter()
188 .find(|(_, name)| *name == rpc_name)
189 .and_then(|(k, _)| k.parse::<Network>().ok())
190 .unwrap_or(Network::Cashu);
191 let secret = rpc_cfg.endpoint_secret.as_deref().unwrap_or("");
192 let provider = RemoteProvider::new(&rpc_cfg.endpoint, secret, network);
193 if let Err(err) = provider.ping().await {
194 errors.push(Output::Error {
195 id: None,
196 error_code: "provider_unreachable".to_string(),
197 error: format!("afpay_rpc.{rpc_name} ({}): {err}", rpc_cfg.endpoint),
198 hint: Some("check endpoint address and that the daemon is running".to_string()),
199 retryable: true,
200 trace: Trace::from_duration(0),
201 });
202 }
203 }
204 errors
205}
206
207fn require_store(app: &App) -> Result<&StorageBackend, PayError> {
209 app.store
210 .as_deref()
211 .ok_or_else(|| PayError::NotImplemented("no storage backend available".to_string()))
212}
213
214#[cfg(feature = "redb")]
217async fn acquire_write_lock(app: &App) -> Result<lock::DataLock, PayError> {
218 let data_dir = app.config.read().await.data_dir.clone();
219 let lock = tokio::task::spawn_blocking(move || lock::acquire(&data_dir, None))
220 .await
221 .map_err(|e| PayError::InternalError(format!("lock task: {e}")))?
222 .map_err(PayError::InternalError)?;
223 Ok(lock)
224}
225
226fn needs_write_lock(input: &Input) -> bool {
227 matches!(
228 input,
229 Input::WalletCreate { .. }
230 | Input::LnWalletCreate { .. }
231 | Input::WalletClose { .. }
232 | Input::Receive { .. }
233 | Input::ReceiveClaim { .. }
234 | Input::CashuSend { .. }
235 | Input::CashuReceive { .. }
236 | Input::Send { .. }
237 | Input::Restore { .. }
238 | Input::LimitAdd { .. }
239 | Input::LimitRemove { .. }
240 | Input::LimitSet { .. }
241 | Input::HistoryUpdate { .. }
242 | Input::WalletConfigSet { .. }
243 | Input::WalletConfigTokenAdd { .. }
244 | Input::WalletConfigTokenRemove { .. }
245 )
246}
247
248macro_rules! try_provider {
250 ($providers:expr, |$p:ident| $call:expr) => {{
251 let mut _result: Option<Result<_, PayError>> = None;
252 for _prov in $providers.values() {
253 let $p = _prov.as_ref();
254 match $call.await {
255 Ok(v) => {
256 _result = Some(Ok(v));
257 break;
258 }
259 Err(PayError::NotImplemented(_)) => continue,
260 Err(e) => {
261 _result = Some(Err(e));
262 break;
263 }
264 }
265 }
266 match _result {
267 Some(r) => r,
268 None => Err(PayError::NotImplemented("network not enabled".to_string())),
269 }
270 }};
271}
272
273macro_rules! collect_all {
275 ($providers:expr, |$p:ident| $call:expr) => {{
276 let mut _all = Vec::new();
277 let mut _err: Option<PayError> = None;
278 for _prov in $providers.values() {
279 let $p = _prov.as_ref();
280 match $call.await {
281 Ok(mut items) => _all.append(&mut items),
282 Err(PayError::NotImplemented(_)) => {}
283 Err(e) => {
284 _err = Some(e);
285 break;
286 }
287 }
288 }
289 match _err {
290 Some(e) => Err(e),
291 None => Ok(_all),
292 }
293 }};
294}
295
296fn resolve_wallet_labels(input: &mut Input, store: &dyn PayStore) -> Result<(), PayError> {
299 fn resolve(store: &dyn PayStore, w: &mut String) -> Result<(), PayError> {
300 if !w.starts_with("w_") {
301 *w = store.resolve_wallet_id(w)?;
302 }
303 Ok(())
304 }
305 fn resolve_opt(store: &dyn PayStore, w: &mut Option<String>) -> Result<(), PayError> {
306 if let Some(val) = w.as_mut() {
307 if !val.starts_with("w_") {
308 *val = store.resolve_wallet_id(val)?;
309 }
310 }
311 Ok(())
312 }
313 match input {
314 Input::WalletClose { wallet, .. } => resolve(store, wallet),
315 Input::Balance { wallet, .. } => resolve_opt(store, wallet),
316 Input::Receive { wallet, .. } => resolve(store, wallet),
317 Input::ReceiveClaim { wallet, .. } => resolve(store, wallet),
318 Input::CashuSend { wallet, .. } => resolve_opt(store, wallet),
319 Input::CashuReceive { wallet, .. } => resolve_opt(store, wallet),
320 Input::Send { wallet, .. } => resolve_opt(store, wallet),
321 Input::Restore { wallet, .. } => resolve(store, wallet),
322 Input::WalletShowSeed { wallet, .. } => resolve(store, wallet),
323 Input::HistoryList { wallet, .. } | Input::HistoryUpdate { wallet, .. } => {
324 resolve_opt(store, wallet)
325 }
326 Input::WalletConfigShow { wallet, .. } => resolve(store, wallet),
327 Input::WalletConfigSet { wallet, .. } => resolve(store, wallet),
328 Input::WalletConfigTokenAdd { wallet, .. } => resolve(store, wallet),
329 Input::WalletConfigTokenRemove { wallet, .. } => resolve(store, wallet),
330 Input::LimitAdd { limit, .. } => resolve_opt(store, &mut limit.wallet),
331 Input::LimitSet { limits, .. } => {
332 for limit in limits.iter_mut() {
333 resolve_opt(store, &mut limit.wallet)?;
334 }
335 Ok(())
336 }
337 _ => Ok(()),
338 }
339}
340
341pub async fn dispatch(app: &App, input: Input) {
342 #[cfg(feature = "redb")]
345 let _lock = if app.has_local_providers
346 && needs_write_lock(&input)
347 && matches!(app.store.as_deref(), Some(StorageBackend::Redb(..)) | None)
348 {
349 match acquire_write_lock(app).await {
350 Ok(guard) => Some(guard),
351 Err(e) => {
352 let id = extract_id(&input);
353 emit_error(&app.writer, id, &e, Instant::now()).await;
354 return;
355 }
356 }
357 } else {
358 None
359 };
360
361 let mut input = input;
363 if let Some(store) = &app.store {
364 if let Err(e) = resolve_wallet_labels(&mut input, store.as_ref()) {
365 let id = extract_id(&input);
366 emit_error(&app.writer, id, &e, Instant::now()).await;
367 return;
368 }
369 }
370
371 match input {
372 Input::WalletCreate {
373 id,
374 network,
375 label,
376 mint_url,
377 rpc_endpoints,
378 chain_id,
379 mnemonic_secret,
380 btc_esplora_url,
381 btc_network,
382 btc_address_type,
383 btc_backend,
384 btc_core_url,
385 btc_core_auth_secret,
386 btc_electrum_url,
387 } => {
388 let start = Instant::now();
389 let mut log_args = serde_json::json!({
390 "operation": "wallet_create",
391 "network": network.to_string(),
392 "label": label.as_deref().unwrap_or("default"),
393 });
394 if let Some(object) = log_args.as_object_mut() {
395 if !rpc_endpoints.is_empty() {
396 object.insert(
397 "rpc_endpoints".to_string(),
398 serde_json::json!(rpc_endpoints),
399 );
400 }
401 if let Some(cid) = chain_id {
402 object.insert("chain_id".to_string(), serde_json::json!(cid));
403 }
404 if let Some(url) = mint_url.as_deref() {
405 object.insert("mint_url".to_string(), serde_json::json!(url));
406 }
407 object.insert(
408 "use_recovery_mnemonic".to_string(),
409 serde_json::json!(mnemonic_secret.is_some()),
410 );
411 }
412 emit_log(app, "wallet", Some(id.clone()), log_args).await;
413 let request = WalletCreateRequest {
414 label: label.unwrap_or_else(|| "default".to_string()),
415 mint_url,
416 rpc_endpoints,
417 chain_id,
418 mnemonic_secret,
419 btc_esplora_url,
420 btc_network,
421 btc_address_type,
422 btc_backend,
423 btc_core_url,
424 btc_core_auth_secret,
425 btc_electrum_url,
426 };
427 match get_provider(&app.providers, network) {
428 Some(p) => match p.create_wallet(&request).await {
429 Ok(info) => {
430 #[cfg(feature = "redb")]
432 if let Some(store) = app.store.as_ref() {
433 if let Ok(meta) = wallet::load_wallet_metadata(
434 &app.config.read().await.data_dir,
435 &info.id,
436 ) {
437 let _ = store.save_wallet_metadata(&meta);
438 }
439 }
440 let _ = app
441 .writer
442 .send(Output::WalletCreated {
443 id,
444 wallet: info.id,
445 network: info.network,
446 address: info.address,
447 mnemonic: None,
448 trace: trace_from(start),
449 })
450 .await;
451 }
452 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
453 },
454 None => {
455 emit_error(
456 &app.writer,
457 Some(id),
458 &PayError::NotImplemented(format!("no provider for {network}")),
459 start,
460 )
461 .await;
462 }
463 }
464 }
465
466 Input::LnWalletCreate { id, request } => {
467 let start = Instant::now();
468 let mut log_args =
469 serde_json::to_value(&request).unwrap_or_else(|_| serde_json::json!({}));
470 if let Some(object) = log_args.as_object_mut() {
471 object.insert(
472 "operation".to_string(),
473 serde_json::json!("ln_wallet_create"),
474 );
475 object.insert("network".to_string(), serde_json::json!("ln"));
476 }
477 emit_log(app, "wallet", Some(id.clone()), log_args).await;
478
479 match get_provider(&app.providers, Network::Ln) {
480 Some(p) => match p.create_ln_wallet(request).await {
481 Ok(info) => {
482 #[cfg(feature = "redb")]
483 if let Some(store) = app.store.as_ref() {
484 if let Ok(meta) = wallet::load_wallet_metadata(
485 &app.config.read().await.data_dir,
486 &info.id,
487 ) {
488 let _ = store.save_wallet_metadata(&meta);
489 }
490 }
491 let _ = app
492 .writer
493 .send(Output::WalletCreated {
494 id,
495 wallet: info.id,
496 network: info.network,
497 address: info.address,
498 mnemonic: None,
499 trace: trace_from(start),
500 })
501 .await;
502 }
503 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
504 },
505 None => {
506 emit_error(
507 &app.writer,
508 Some(id),
509 &PayError::NotImplemented("no provider for ln".to_string()),
510 start,
511 )
512 .await;
513 }
514 }
515 }
516
517 Input::WalletClose {
518 id,
519 wallet,
520 dangerously_skip_balance_check_and_may_lose_money,
521 } => {
522 let start = Instant::now();
523 emit_log(
524 app,
525 "wallet",
526 Some(id.clone()),
527 serde_json::json!({
528 "operation": "wallet_close",
529 "wallet": &wallet,
530 "dangerously_skip_balance_check_and_may_lose_money": dangerously_skip_balance_check_and_may_lose_money,
531 }),
532 )
533 .await;
534 let close_result = if dangerously_skip_balance_check_and_may_lose_money {
535 match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
536 Ok(_) => require_store(app).and_then(|s| s.delete_wallet_metadata(&wallet)).map(|_| ()),
537 Err(PayError::WalletNotFound(_)) => Err(PayError::WalletNotFound(format!(
538 "wallet {wallet} not found locally; dangerous skip balance check only supports local wallets"
539 ))),
540 Err(error) => Err(error),
541 }
542 } else {
543 match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
544 Ok(meta) => match get_provider(&app.providers, meta.network) {
545 Some(provider) => provider.close_wallet(&wallet).await,
546 None => Err(PayError::NotImplemented(format!(
547 "no provider for {}",
548 meta.network
549 ))),
550 },
551 Err(PayError::WalletNotFound(_)) => {
552 try_provider!(&app.providers, |p| p.close_wallet(&wallet))
554 }
555 Err(error) => Err(error),
556 }
557 };
558
559 match close_result {
560 Ok(()) => {
561 let _ = app
562 .writer
563 .send(Output::WalletClosed {
564 id,
565 wallet,
566 trace: trace_from(start),
567 })
568 .await;
569 }
570 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
571 }
572 }
573
574 Input::WalletList { id, network } => {
575 let start = Instant::now();
576 emit_log(
577 app,
578 "wallet",
579 Some(id.clone()),
580 serde_json::json!({
581 "operation": "wallet_list",
582 "network": network.map(|c| c.to_string()).unwrap_or_else(|| "all".to_string()),
583 }),
584 )
585 .await;
586 if let Some(network) = network {
587 match get_provider(&app.providers, network) {
588 Some(p) => match p.list_wallets().await {
589 Ok(wallets) => {
590 let _ = app
591 .writer
592 .send(Output::WalletList {
593 id,
594 wallets,
595 trace: trace_from(start),
596 })
597 .await;
598 }
599 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
600 },
601 None => {
602 emit_error(
603 &app.writer,
604 Some(id),
605 &PayError::NotImplemented(format!("no provider for {network}")),
606 start,
607 )
608 .await;
609 }
610 }
611 } else {
612 let wallets = collect_all!(&app.providers, |p| p.list_wallets());
613 match wallets {
614 Ok(all) => {
615 let _ = app
616 .writer
617 .send(Output::WalletList {
618 id,
619 wallets: all,
620 trace: trace_from(start),
621 })
622 .await;
623 }
624 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
625 }
626 }
627 }
628
629 Input::Balance {
630 id,
631 wallet,
632 network,
633 check,
634 } => {
635 let start = Instant::now();
636 emit_log(
637 app,
638 "wallet",
639 Some(id.clone()),
640 serde_json::json!({
641 "operation": "balance", "wallet": wallet.as_deref().unwrap_or("all"), "check": check,
642 }),
643 )
644 .await;
645 if let Some(wallet_id) = wallet {
646 let meta_opt = require_store(app)
647 .and_then(|s| s.load_wallet_metadata(&wallet_id))
648 .ok();
649 let result = if let Some(ref meta) = meta_opt {
650 match get_provider(&app.providers, meta.network) {
651 Some(provider) => {
652 if check {
653 match provider.check_balance(&wallet_id).await {
654 Err(PayError::NotImplemented(_)) => {
655 provider.balance(&wallet_id).await
656 }
657 other => other,
658 }
659 } else {
660 provider.balance(&wallet_id).await
661 }
662 }
663 None => Err(PayError::NotImplemented(format!(
664 "no provider for {}",
665 meta.network
666 ))),
667 }
668 } else {
669 if check {
671 try_provider!(&app.providers, |p| async {
672 match p.check_balance(&wallet_id).await {
673 Err(PayError::NotImplemented(_)) => p.balance(&wallet_id).await,
674 other => other,
675 }
676 })
677 } else {
678 try_provider!(&app.providers, |p| p.balance(&wallet_id))
679 }
680 };
681 match result {
682 Ok(balance) => {
683 let summary = if let Some(meta) = meta_opt {
684 wallet_summary_from_meta(&meta, &wallet_id)
685 } else {
686 resolve_wallet_summary(app, &wallet_id).await
687 };
688 let _ = app
689 .writer
690 .send(Output::WalletBalances {
691 id,
692 wallets: vec![WalletBalanceItem {
693 wallet: summary,
694 balance: Some(balance),
695 error: None,
696 }],
697 trace: trace_from(start),
698 })
699 .await;
700 }
701 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
702 }
703 } else {
704 match collect_all!(&app.providers, |p| p.balance_all()) {
705 Ok(wallets) => {
706 let filtered = if let Some(network) = network {
707 wallets
708 .into_iter()
709 .filter(|w| w.wallet.network == network)
710 .collect()
711 } else {
712 wallets
713 };
714 let _ = app
715 .writer
716 .send(Output::WalletBalances {
717 id,
718 wallets: filtered,
719 trace: trace_from(start),
720 })
721 .await;
722 }
723 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
724 }
725 }
726 }
727
728 Input::Receive {
729 id,
730 wallet,
731 network,
732 amount,
733 onchain_memo,
734 wait_until_paid,
735 wait_timeout_s,
736 wait_poll_interval_ms,
737 write_qr_svg_file: _,
738 min_confirmations,
739 } => {
740 let start = Instant::now();
741 let wait_requested =
742 wait_until_paid || wait_timeout_s.is_some() || wait_poll_interval_ms.is_some();
743 emit_log(
744 app,
745 "wallet",
746 Some(id.clone()),
747 serde_json::json!({
748 "operation": "receive",
749 "wallet": &wallet,
750 "network": network.map(|c| c.to_string()).unwrap_or_else(|| "auto".to_string()),
751 "amount": amount.as_ref().map(|a| a.value),
752 "onchain_memo": onchain_memo.as_deref().unwrap_or(""),
753 "wait_until_paid": wait_requested,
754 "wait_timeout_s": wait_timeout_s,
755 "wait_poll_interval_ms": wait_poll_interval_ms,
756 }),
757 )
758 .await;
759
760 let (target_network, wallet_for_call) = if wallet.trim().is_empty() {
761 let wallets = match require_store(app).and_then(|s| s.list_wallet_metadata(network))
762 {
763 Ok(v) => v,
764 Err(e) => {
765 emit_error(&app.writer, Some(id), &e, start).await;
766 return;
767 }
768 };
769 match wallets.len() {
770 0 => {
771 let msg = match network {
772 Some(network) => format!("no {network} wallet found"),
773 None => "no wallet found".to_string(),
774 };
775 emit_error(&app.writer, Some(id), &PayError::WalletNotFound(msg), start)
776 .await;
777 return;
778 }
779 1 => (wallets[0].network, wallets[0].id.clone()),
780 _ => {
781 let msg = match network {
782 Some(network) => {
783 format!("multiple {network} wallets found; pass --wallet")
784 }
785 None => "multiple wallets found; pass --wallet".to_string(),
786 };
787 emit_error(&app.writer, Some(id), &PayError::InvalidAmount(msg), start)
788 .await;
789 return;
790 }
791 }
792 } else {
793 let meta = match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
794 Ok(m) => m,
795 Err(e) => {
796 emit_error(&app.writer, Some(id), &e, start).await;
797 return;
798 }
799 };
800 if let Some(expected) = network {
801 if meta.network != expected {
802 emit_error(
803 &app.writer,
804 Some(id),
805 &PayError::InvalidAmount(format!(
806 "wallet {wallet} is {}, not {expected}",
807 meta.network
808 )),
809 start,
810 )
811 .await;
812 return;
813 }
814 }
815 (meta.network, wallet.clone())
816 };
817
818 let Some(provider) = get_provider(&app.providers, target_network) else {
819 emit_error(
820 &app.writer,
821 Some(id),
822 &PayError::NotImplemented(format!("no provider for {target_network}")),
823 start,
824 )
825 .await;
826 return;
827 };
828
829 match provider
830 .receive_info(&wallet_for_call, amount.clone())
831 .await
832 {
833 Ok(receive_info) => {
834 let quote_id = receive_info.quote_id.clone();
835 let _ = app
836 .writer
837 .send(Output::ReceiveInfo {
838 id: id.clone(),
839 wallet: wallet_for_call.clone(),
840 receive_info,
841 trace: trace_from(start),
842 })
843 .await;
844
845 if !wait_requested {
846 return;
847 }
848
849 let timeout_secs = wait_timeout_s.unwrap_or(DEFAULT_WAIT_TIMEOUT_SECS);
850 if timeout_secs == 0 {
851 emit_error(
852 &app.writer,
853 Some(id),
854 &PayError::InvalidAmount("wait_timeout_s must be >= 1".to_string()),
855 start,
856 )
857 .await;
858 return;
859 }
860 let poll_interval_ms =
861 wait_poll_interval_ms.unwrap_or(DEFAULT_WAIT_POLL_INTERVAL_MS);
862 if poll_interval_ms == 0 {
863 emit_error(
864 &app.writer,
865 Some(id),
866 &PayError::InvalidAmount(
867 "wait_poll_interval_ms must be >= 1".to_string(),
868 ),
869 start,
870 )
871 .await;
872 return;
873 }
874
875 if target_network == Network::Sol {
876 let memo_to_watch = onchain_memo
877 .as_deref()
878 .map(str::trim)
879 .filter(|text| !text.is_empty())
880 .map(str::to_owned);
881 let amount_to_watch = amount.as_ref().map(|a| a.value);
882
883 if memo_to_watch.is_none() && amount_to_watch.is_none() {
884 emit_error_hint(
885 &app.writer,
886 Some(id),
887 &PayError::InvalidAmount(
888 "sol receive --wait requires a match condition".to_string(),
889 ),
890 start,
891 Some("pass --onchain-memo or --amount"),
892 )
893 .await;
894 return;
895 }
896
897 let deadline = Instant::now() + Duration::from_secs(timeout_secs);
898 loop {
899 match provider.history_list(&wallet_for_call, 200, 0).await {
900 Ok(items) => {
901 let matched = items.into_iter().find(|item| {
902 if item.direction != Direction::Receive {
903 return false;
904 }
905 if let Some(ref m) = memo_to_watch {
906 item.onchain_memo.as_deref() == Some(m.as_str())
907 } else if let Some(expected) = amount_to_watch {
908 item.amount.value == expected
909 } else {
910 false
911 }
912 });
913 if let Some(item) = matched {
914 if let Some(min_conf) = min_confirmations {
916 match provider
917 .history_status(&item.transaction_id)
918 .await
919 {
920 Ok(status_info) => {
921 let confs = status_info
922 .confirmations
923 .unwrap_or_else(|| {
924 if status_info.status
925 == TxStatus::Confirmed
926 {
927 min_conf
928 } else {
929 0
930 }
931 });
932 if confs < min_conf {
933 if Instant::now() >= deadline {
935 let criteria = if let Some(ref m) =
936 memo_to_watch
937 {
938 format!("memo '{m}'")
939 } else {
940 format!(
941 "amount {}",
942 amount_to_watch.unwrap_or(0)
943 )
944 };
945 emit_error(
946 &app.writer,
947 Some(id),
948 &PayError::NetworkError(format!(
949 "wait timeout after {timeout_secs}s: sol transaction {tx} matching {criteria} has {confs}/{min_conf} confirmations",
950 tx = item.transaction_id,
951 )),
952 start,
953 )
954 .await;
955 break;
956 }
957 sleep(Duration::from_millis(
958 poll_interval_ms,
959 ))
960 .await;
961 continue;
962 }
963 let transaction_id =
965 item.transaction_id.clone();
966 let _ = app
967 .writer
968 .send(Output::HistoryStatus {
969 id,
970 transaction_id,
971 status: item.status,
972 confirmations: Some(confs),
973 preimage: item.preimage.clone(),
974 item: Some(item),
975 trace: trace_from(start),
976 })
977 .await;
978 break;
979 }
980 Err(e) if e.retryable() => {
981 sleep(Duration::from_millis(poll_interval_ms))
982 .await;
983 continue;
984 }
985 Err(e) => {
986 emit_error(&app.writer, Some(id), &e, start)
987 .await;
988 break;
989 }
990 }
991 } else {
992 let transaction_id = item.transaction_id.clone();
993 let _ = app
994 .writer
995 .send(Output::HistoryStatus {
996 id,
997 transaction_id,
998 status: item.status,
999 confirmations: None,
1000 preimage: item.preimage.clone(),
1001 item: Some(item),
1002 trace: trace_from(start),
1003 })
1004 .await;
1005 break;
1006 }
1007 }
1008 if Instant::now() >= deadline {
1009 let criteria = if let Some(ref m) = memo_to_watch {
1010 format!("memo '{m}'")
1011 } else {
1012 format!("amount {}", amount_to_watch.unwrap_or(0))
1013 };
1014 emit_error(
1015 &app.writer,
1016 Some(id),
1017 &PayError::NetworkError(format!(
1018 "wait timeout after {timeout_secs}s: no incoming sol transaction matching {criteria}"
1019 )),
1020 start,
1021 )
1022 .await;
1023 break;
1024 }
1025 sleep(Duration::from_millis(poll_interval_ms)).await;
1026 }
1027 Err(e) if e.retryable() => {
1028 if Instant::now() >= deadline {
1029 let criteria = if let Some(ref m) = memo_to_watch {
1030 format!("memo '{m}'")
1031 } else {
1032 format!("amount {}", amount_to_watch.unwrap_or(0))
1033 };
1034 emit_error(
1035 &app.writer,
1036 Some(id),
1037 &PayError::NetworkError(format!(
1038 "wait timeout after {timeout_secs}s: no incoming sol transaction matching {criteria}"
1039 )),
1040 start,
1041 )
1042 .await;
1043 break;
1044 }
1045 sleep(Duration::from_millis(poll_interval_ms)).await;
1046 }
1047 Err(e) => {
1048 emit_error(&app.writer, Some(id), &e, start).await;
1049 break;
1050 }
1051 }
1052 }
1053 return;
1054 }
1055
1056 if target_network == Network::Evm {
1058 if onchain_memo
1059 .as_deref()
1060 .map(str::trim)
1061 .filter(|text| !text.is_empty())
1062 .is_some()
1063 {
1064 emit_error_hint(
1065 &app.writer,
1066 Some(id),
1067 &PayError::InvalidAmount(
1068 "evm receive --wait does not support --onchain-memo matching"
1069 .to_string(),
1070 ),
1071 start,
1072 Some("pass --amount to match incoming transfers"),
1073 )
1074 .await;
1075 return;
1076 }
1077 let amount_to_watch = amount.as_ref().map(|a| a.value);
1078
1079 if amount_to_watch.is_none() {
1080 emit_error_hint(
1081 &app.writer,
1082 Some(id),
1083 &PayError::InvalidAmount(
1084 "evm receive --wait requires --amount".to_string(),
1085 ),
1086 start,
1087 Some("pass --amount"),
1088 )
1089 .await;
1090 return;
1091 }
1092
1093 let initial_balance = match provider.balance(&wallet_for_call).await {
1095 Ok(b) => b,
1096 Err(e) => {
1097 emit_error(&app.writer, Some(id), &e, start).await;
1098 return;
1099 }
1100 };
1101
1102 let deadline = Instant::now() + Duration::from_secs(timeout_secs);
1103 loop {
1104 sleep(Duration::from_millis(poll_interval_ms)).await;
1105 if Instant::now() >= deadline {
1106 let criteria = format!("amount {}", amount_to_watch.unwrap_or(0));
1107 emit_error(
1108 &app.writer,
1109 Some(id),
1110 &PayError::NetworkError(format!(
1111 "wait timeout after {timeout_secs}s: no incoming evm deposit matching {criteria}"
1112 )),
1113 start,
1114 )
1115 .await;
1116 break;
1117 }
1118 match provider.balance(&wallet_for_call).await {
1119 Ok(current) => {
1120 let native_increase =
1122 current.confirmed.saturating_sub(initial_balance.confirmed);
1123 let mut token_increase: Option<(String, u64)> = None;
1125 for (key, &cur_val) in ¤t.additional {
1126 let init_val = initial_balance
1127 .additional
1128 .get(key)
1129 .copied()
1130 .unwrap_or(0);
1131 if cur_val > init_val {
1132 token_increase =
1133 Some((key.clone(), cur_val - init_val));
1134 break;
1135 }
1136 }
1137
1138 if native_increase > 0 || token_increase.is_some() {
1139 let (amount_value, amount_unit) =
1140 if let Some((token_key, delta)) = token_increase {
1141 (delta, token_key)
1142 } else {
1143 (native_increase, current.unit.clone())
1144 };
1145
1146 if let Some(expected) = amount_to_watch {
1148 if amount_value != expected {
1149 continue;
1150 }
1151 }
1152
1153 if let Some(min_conf) = min_confirmations {
1155 let chain_tx_id = match provider
1157 .history_list(&wallet_for_call, 20, 0)
1158 .await
1159 {
1160 Ok(items) => items
1161 .into_iter()
1162 .find(|i| {
1163 i.direction == Direction::Receive
1164 && i.amount.value == amount_value
1165 })
1166 .map(|i| i.transaction_id),
1167 Err(_) => None,
1168 };
1169 if let Some(ref ctxid) = chain_tx_id {
1170 loop {
1172 match provider.history_status(ctxid).await {
1173 Ok(status_info) => {
1174 let confs = status_info
1175 .confirmations
1176 .unwrap_or(0);
1177 if confs >= min_conf {
1178 let record = HistoryRecord {
1179 transaction_id: ctxid.clone(),
1180 wallet: wallet_for_call.clone(),
1181 network: Network::Evm,
1182 direction: Direction::Receive,
1183 amount: Amount {
1184 value: amount_value,
1185 token: amount_unit.clone(),
1186 },
1187 status: TxStatus::Confirmed,
1188 onchain_memo: None,
1189 local_memo: None,
1190 remote_addr: None,
1191 preimage: None,
1192 created_at_epoch_s:
1193 wallet::now_epoch_seconds(),
1194 confirmed_at_epoch_s: Some(
1195 wallet::now_epoch_seconds(),
1196 ),
1197 fee: None,
1198 };
1199 if let Some(s) = &app.store {
1200 let _ = s
1201 .append_transaction_record(
1202 &record,
1203 );
1204 }
1205 let _ = app
1206 .writer
1207 .send(Output::HistoryStatus {
1208 id,
1209 transaction_id: ctxid
1210 .clone(),
1211 status: TxStatus::Confirmed,
1212 confirmations: Some(confs),
1213 preimage: None,
1214 item: Some(record),
1215 trace: trace_from(start),
1216 })
1217 .await;
1218 break;
1219 }
1220 if Instant::now() >= deadline {
1221 let criteria = format!(
1222 "amount {}",
1223 amount_to_watch.unwrap_or(0)
1224 );
1225 emit_error(
1226 &app.writer,
1227 Some(id),
1228 &PayError::NetworkError(format!(
1229 "wait timeout after {timeout_secs}s: evm transaction {ctxid} matching {criteria} has {confs}/{min_conf} confirmations",
1230 )),
1231 start,
1232 )
1233 .await;
1234 break;
1235 }
1236 sleep(Duration::from_millis(
1237 poll_interval_ms,
1238 ))
1239 .await;
1240 }
1241 Err(e) if e.retryable() => {
1242 if Instant::now() >= deadline {
1243 emit_error(
1244 &app.writer,
1245 Some(id),
1246 &e,
1247 start,
1248 )
1249 .await;
1250 break;
1251 }
1252 sleep(Duration::from_millis(
1253 poll_interval_ms,
1254 ))
1255 .await;
1256 }
1257 Err(e) => {
1258 emit_error(
1259 &app.writer,
1260 Some(id),
1261 &e,
1262 start,
1263 )
1264 .await;
1265 break;
1266 }
1267 }
1268 }
1269 } else {
1270 let tx_id = format!(
1272 "evm_recv_{}",
1273 wallet::now_epoch_seconds()
1274 );
1275 let record = HistoryRecord {
1276 transaction_id: tx_id.clone(),
1277 wallet: wallet_for_call.clone(),
1278 network: Network::Evm,
1279 direction: Direction::Receive,
1280 amount: Amount {
1281 value: amount_value,
1282 token: amount_unit,
1283 },
1284 status: TxStatus::Confirmed,
1285 onchain_memo: None,
1286 local_memo: None,
1287 remote_addr: None,
1288 preimage: None,
1289 created_at_epoch_s: wallet::now_epoch_seconds(),
1290 confirmed_at_epoch_s: Some(
1291 wallet::now_epoch_seconds(),
1292 ),
1293 fee: None,
1294 };
1295 if let Some(s) = &app.store {
1296 let _ = s.append_transaction_record(&record);
1297 }
1298 let _ = app
1299 .writer
1300 .send(Output::HistoryStatus {
1301 id,
1302 transaction_id: tx_id,
1303 status: TxStatus::Confirmed,
1304 confirmations: None,
1305 preimage: None,
1306 item: Some(record),
1307 trace: trace_from(start),
1308 })
1309 .await;
1310 }
1311 } else {
1312 let tx_id =
1313 format!("evm_recv_{}", wallet::now_epoch_seconds());
1314 let record = HistoryRecord {
1315 transaction_id: tx_id.clone(),
1316 wallet: wallet_for_call.clone(),
1317 network: Network::Evm,
1318 direction: Direction::Receive,
1319 amount: Amount {
1320 value: amount_value,
1321 token: amount_unit,
1322 },
1323 status: TxStatus::Confirmed,
1324 onchain_memo: None,
1325 local_memo: None,
1326 remote_addr: None,
1327 preimage: None,
1328 created_at_epoch_s: wallet::now_epoch_seconds(),
1329 confirmed_at_epoch_s: Some(
1330 wallet::now_epoch_seconds(),
1331 ),
1332 fee: None,
1333 };
1334 if let Some(s) = &app.store {
1335 let _ = s.append_transaction_record(&record);
1336 }
1337 let _ = app
1338 .writer
1339 .send(Output::HistoryStatus {
1340 id,
1341 transaction_id: tx_id,
1342 status: TxStatus::Confirmed,
1343 confirmations: None,
1344 preimage: None,
1345 item: Some(record),
1346 trace: trace_from(start),
1347 })
1348 .await;
1349 }
1350 break;
1351 }
1352 }
1353 Err(e) if e.retryable() => {
1354 }
1356 Err(e) => {
1357 emit_error(&app.writer, Some(id), &e, start).await;
1358 break;
1359 }
1360 }
1361 }
1362 return;
1363 }
1364
1365 if target_network == Network::Btc {
1367 let amount_to_watch = amount.as_ref().map(|a| a.value).filter(|v| *v > 0);
1368 let initial_balance = match provider.balance(&wallet_for_call).await {
1369 Ok(b) => b,
1370 Err(e) => {
1371 emit_error(&app.writer, Some(id), &e, start).await;
1372 return;
1373 }
1374 };
1375
1376 let deadline = Instant::now() + Duration::from_secs(timeout_secs);
1377 loop {
1378 sleep(Duration::from_millis(poll_interval_ms)).await;
1379 if Instant::now() >= deadline {
1380 let criteria = if let Some(expected) = amount_to_watch {
1381 format!("amount {expected}")
1382 } else {
1383 "any incoming amount".to_string()
1384 };
1385 emit_error(
1386 &app.writer,
1387 Some(id),
1388 &PayError::NetworkError(format!(
1389 "wait timeout after {timeout_secs}s: no incoming btc transaction matching {criteria}"
1390 )),
1391 start,
1392 )
1393 .await;
1394 break;
1395 }
1396
1397 match provider.balance(&wallet_for_call).await {
1398 Ok(current) => {
1399 let confirmed_delta =
1400 current.confirmed.saturating_sub(initial_balance.confirmed);
1401 let pending_delta =
1402 current.pending.saturating_sub(initial_balance.pending);
1403 let observed_delta =
1404 confirmed_delta.saturating_add(pending_delta);
1405 if observed_delta == 0 {
1406 continue;
1407 }
1408 if let Some(expected) = amount_to_watch {
1409 if observed_delta != expected {
1410 continue;
1411 }
1412 }
1413
1414 let status = if confirmed_delta > 0 {
1415 TxStatus::Confirmed
1416 } else {
1417 TxStatus::Pending
1418 };
1419 let now = wallet::now_epoch_seconds();
1420 let tx_id = format!("btc_recv_{now}");
1421 let record = HistoryRecord {
1422 transaction_id: tx_id.clone(),
1423 wallet: wallet_for_call.clone(),
1424 network: Network::Btc,
1425 direction: Direction::Receive,
1426 amount: Amount {
1427 value: amount_to_watch.unwrap_or(observed_delta),
1428 token: current.unit.clone(),
1429 },
1430 status,
1431 onchain_memo: onchain_memo.clone(),
1432 local_memo: None,
1433 remote_addr: None,
1434 preimage: None,
1435 created_at_epoch_s: now,
1436 confirmed_at_epoch_s: if status == TxStatus::Confirmed {
1437 Some(now)
1438 } else {
1439 None
1440 },
1441 fee: None,
1442 };
1443 if let Some(s) = &app.store {
1444 if s.find_transaction_record_by_id(&tx_id)
1445 .ok()
1446 .flatten()
1447 .is_none()
1448 {
1449 let _ = s.append_transaction_record(&record);
1450 }
1451 }
1452 let _ = app
1453 .writer
1454 .send(Output::HistoryStatus {
1455 id,
1456 transaction_id: tx_id,
1457 status,
1458 confirmations: Some(if status == TxStatus::Confirmed {
1459 1
1460 } else {
1461 0
1462 }),
1463 preimage: None,
1464 item: Some(record),
1465 trace: trace_from(start),
1466 })
1467 .await;
1468 break;
1469 }
1470 Err(e) if e.retryable() => {
1471 }
1473 Err(e) => {
1474 emit_error(&app.writer, Some(id), &e, start).await;
1475 break;
1476 }
1477 }
1478 }
1479 return;
1480 }
1481
1482 let Some(quote_id) = quote_id else {
1483 emit_error(
1484 &app.writer,
1485 Some(id),
1486 &PayError::InternalError(
1487 "deposit response missing quote_id/payment_hash".to_string(),
1488 ),
1489 start,
1490 )
1491 .await;
1492 return;
1493 };
1494
1495 let deadline = Instant::now() + Duration::from_secs(timeout_secs);
1496 loop {
1497 match provider.receive_claim(&wallet_for_call, "e_id).await {
1498 Ok(claimed) => {
1499 let _ = app
1500 .writer
1501 .send(Output::ReceiveClaimed {
1502 id,
1503 wallet: wallet_for_call.clone(),
1504 amount: Amount {
1505 value: claimed,
1506 token: "sats".to_string(),
1507 },
1508 trace: trace_from(start),
1509 })
1510 .await;
1511 break;
1512 }
1513 Err(e) if e.retryable() => {
1514 if Instant::now() >= deadline {
1515 emit_error(
1516 &app.writer,
1517 Some(id),
1518 &PayError::NetworkError(format!(
1519 "wait-until-paid timeout after {timeout_secs}s"
1520 )),
1521 start,
1522 )
1523 .await;
1524 break;
1525 }
1526 sleep(Duration::from_millis(poll_interval_ms)).await;
1527 }
1528 Err(e) => {
1529 emit_error(&app.writer, Some(id), &e, start).await;
1530 break;
1531 }
1532 }
1533 }
1534 }
1535 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
1536 }
1537 }
1538
1539 Input::ReceiveClaim {
1540 id,
1541 wallet,
1542 quote_id,
1543 } => {
1544 let start = Instant::now();
1545 emit_log(
1546 app,
1547 "wallet",
1548 Some(id.clone()),
1549 serde_json::json!({
1550 "operation": "receive_claim", "wallet": &wallet, "quote_id": "e_id,
1551 }),
1552 )
1553 .await;
1554 let meta = match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
1555 Ok(m) => m,
1556 Err(e) => {
1557 emit_error(&app.writer, Some(id), &e, start).await;
1558 return;
1559 }
1560 };
1561 let Some(provider) = get_provider(&app.providers, meta.network) else {
1562 emit_error(
1563 &app.writer,
1564 Some(id),
1565 &PayError::NotImplemented(format!("no provider for {}", meta.network)),
1566 start,
1567 )
1568 .await;
1569 return;
1570 };
1571
1572 match provider.receive_claim(&wallet, "e_id).await {
1573 Ok(claimed) => {
1574 let _ = app
1575 .writer
1576 .send(Output::ReceiveClaimed {
1577 id,
1578 wallet,
1579 amount: Amount {
1580 value: claimed,
1581 token: "sats".to_string(),
1582 },
1583 trace: trace_from(start),
1584 })
1585 .await;
1586 }
1587 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
1588 }
1589 }
1590
1591 Input::CashuSend {
1592 id,
1593 wallet,
1594 amount,
1595 onchain_memo,
1596 local_memo,
1597 mints,
1598 } => {
1599 let start = Instant::now();
1600 emit_log(
1601 app,
1602 "pay",
1603 Some(id.clone()),
1604 serde_json::json!({
1605 "operation": "cashu_send", "wallet": wallet.as_deref().unwrap_or("auto"),
1606 "amount": amount.value, "onchain_memo": onchain_memo.as_deref().unwrap_or(""),
1607 "mints": mints.as_deref().unwrap_or(&[]),
1608 }),
1609 )
1610 .await;
1611
1612 let wallet_str = wallet.unwrap_or_default();
1613 let mints_ref = mints.as_deref();
1614 let Some(provider) = get_provider(&app.providers, Network::Cashu) else {
1615 emit_error(
1616 &app.writer,
1617 Some(id),
1618 &PayError::NotImplemented("no provider for cashu".to_string()),
1619 start,
1620 )
1621 .await;
1622 return;
1623 };
1624
1625 let mut reservation_id: Option<u64> = None;
1626 if app.enforce_limits {
1627 let spend_ctx = SpendContext {
1628 network: "cashu".to_string(),
1629 wallet: if wallet_str.is_empty() {
1630 None
1631 } else {
1632 Some(wallet_str.clone())
1633 },
1634 amount_native: amount.value,
1635 token: None,
1636 };
1637 match app
1638 .spend_ledger
1639 .reserve(&format!("cashu_send:{id}"), &spend_ctx)
1640 .await
1641 {
1642 Ok(rid) => reservation_id = Some(rid),
1643 Err(e) => {
1644 if let PayError::LimitExceeded {
1645 rule_id,
1646 scope,
1647 scope_key,
1648 spent,
1649 max_spend,
1650 token,
1651 remaining_s,
1652 origin,
1653 } = &e
1654 {
1655 let _ = app
1656 .writer
1657 .send(Output::LimitExceeded {
1658 id,
1659 rule_id: rule_id.clone(),
1660 scope: *scope,
1661 scope_key: scope_key.clone(),
1662 spent: *spent,
1663 max_spend: *max_spend,
1664 token: token.clone(),
1665 remaining_s: *remaining_s,
1666 origin: origin.clone(),
1667 trace: trace_from(start),
1668 })
1669 .await;
1670 } else {
1671 emit_error(&app.writer, Some(id), &e, start).await;
1672 }
1673 return;
1674 }
1675 }
1676 }
1677
1678 match provider
1679 .cashu_send(
1680 &wallet_str,
1681 amount.clone(),
1682 onchain_memo.as_deref(),
1683 mints_ref,
1684 )
1685 .await
1686 {
1687 Ok(r) => {
1688 if let Some(rid) = reservation_id {
1689 let _ = app.spend_ledger.confirm(rid).await;
1690 }
1691 if local_memo.is_some() {
1692 if let Some(s) = &app.store {
1693 let _ = s.update_transaction_record_memo(
1694 &r.transaction_id,
1695 local_memo.as_ref(),
1696 );
1697 }
1698 }
1699 let _ = app
1700 .writer
1701 .send(Output::CashuSent {
1702 id,
1703 wallet: r.wallet,
1704 transaction_id: r.transaction_id,
1705 status: r.status,
1706 fee: r.fee,
1707 token: r.token,
1708 trace: trace_from(start),
1709 })
1710 .await;
1711 }
1712 Err(e) => {
1713 if let Some(rid) = reservation_id {
1714 let _ = app.spend_ledger.cancel(rid).await;
1715 }
1716 emit_error(&app.writer, Some(id), &e, start).await
1717 }
1718 }
1719 }
1720
1721 Input::CashuReceive { id, wallet, token } => {
1722 let start = Instant::now();
1723 let token_preview = if token.len() > 20 {
1724 format!("{}...", &token[..20])
1725 } else {
1726 token.clone()
1727 };
1728 emit_log(
1729 app,
1730 "pay",
1731 Some(id.clone()),
1732 serde_json::json!({
1733 "operation": "cashu_receive", "wallet": wallet.as_deref().unwrap_or("auto"), "token": token_preview,
1734 }),
1735 )
1736 .await;
1737 let wallet_str = wallet.unwrap_or_default();
1738 let Some(provider) = get_provider(&app.providers, Network::Cashu) else {
1739 emit_error(
1740 &app.writer,
1741 Some(id),
1742 &PayError::NotImplemented("no provider for cashu".to_string()),
1743 start,
1744 )
1745 .await;
1746 return;
1747 };
1748 match provider.cashu_receive(&wallet_str, &token).await {
1749 Ok(r) => {
1750 let _ = app
1751 .writer
1752 .send(Output::CashuReceived {
1753 id,
1754 wallet: r.wallet,
1755 amount: r.amount,
1756 trace: trace_from(start),
1757 })
1758 .await;
1759 }
1760 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
1761 }
1762 }
1763
1764 Input::Send {
1765 id,
1766 wallet,
1767 network,
1768 to,
1769 onchain_memo,
1770 local_memo,
1771 mints,
1772 } => {
1773 let start = Instant::now();
1774 let operation_name = "send";
1775 let to_preview = if to.len() > 20 {
1776 format!("{}...", &to[..20])
1777 } else {
1778 to.clone()
1779 };
1780 emit_log(
1781 app,
1782 "pay",
1783 Some(id.clone()),
1784 serde_json::json!({
1785 "operation": operation_name, "wallet": wallet.as_deref().unwrap_or("auto"),
1786 "network": network.map(|c| c.to_string()).unwrap_or_else(|| "auto".to_string()),
1787 "to": to_preview, "onchain_memo": onchain_memo.as_deref().unwrap_or(""),
1788 }),
1789 )
1790 .await;
1791
1792 let (target_network, wallet_for_call) = if let Some(w) =
1793 wallet.filter(|w| !w.is_empty())
1794 {
1795 let meta = match require_store(app).and_then(|s| s.load_wallet_metadata(&w)) {
1796 Ok(m) => m,
1797 Err(e) => {
1798 emit_error(&app.writer, Some(id), &e, start).await;
1799 return;
1800 }
1801 };
1802 if let Some(expected) = network {
1803 if meta.network != expected {
1804 emit_error(
1805 &app.writer,
1806 Some(id),
1807 &PayError::InvalidAmount(format!(
1808 "wallet {w} is {}, not {expected}",
1809 meta.network
1810 )),
1811 start,
1812 )
1813 .await;
1814 return;
1815 }
1816 }
1817 (meta.network, w)
1818 } else {
1819 let wallets = match require_store(app).and_then(|s| s.list_wallet_metadata(network))
1820 {
1821 Ok(v) => v,
1822 Err(e) => {
1823 emit_error(&app.writer, Some(id), &e, start).await;
1824 return;
1825 }
1826 };
1827
1828 let is_cashu = matches!(network, Some(Network::Cashu));
1831 let filtered: Vec<_> = if is_cashu {
1832 if let Some(ref mint_list) = mints {
1833 wallets
1834 .into_iter()
1835 .filter(|w| {
1836 w.mint_url.as_deref().is_some_and(|u| {
1837 let nu = u.trim().trim_end_matches('/');
1838 mint_list
1839 .iter()
1840 .any(|m| m.trim().trim_end_matches('/') == nu)
1841 })
1842 })
1843 .collect()
1844 } else {
1845 wallets
1846 }
1847 } else {
1848 wallets
1849 };
1850
1851 match filtered.len() {
1852 0 => {
1853 let msg = if mints.is_some() {
1854 "no cashu wallet found matching --cashu-mint".to_string()
1855 } else {
1856 match network {
1857 Some(network) => format!("no {network} wallet found"),
1858 None => "no wallet found".to_string(),
1859 }
1860 };
1861 emit_error(&app.writer, Some(id), &PayError::WalletNotFound(msg), start)
1862 .await;
1863 return;
1864 }
1865 1 => (filtered[0].network, filtered[0].id.clone()),
1866 _ if is_cashu => {
1867 (Network::Cashu, String::new())
1870 }
1871 _ => {
1872 let msg = match network {
1873 Some(network) => {
1874 format!("multiple {network} wallets found; pass --wallet")
1875 }
1876 None => "multiple wallets found; pass --wallet".to_string(),
1877 };
1878 emit_error(&app.writer, Some(id), &PayError::InvalidAmount(msg), start)
1879 .await;
1880 return;
1881 }
1882 }
1883 };
1884
1885 let Some(provider) = get_provider(&app.providers, target_network) else {
1886 emit_error(
1887 &app.writer,
1888 Some(id),
1889 &PayError::NotImplemented(format!("no provider for {target_network}")),
1890 start,
1891 )
1892 .await;
1893 return;
1894 };
1895
1896 let mut reservation_id: Option<u64> = None;
1897 if app.enforce_limits {
1898 let quote = match provider
1899 .send_quote(&wallet_for_call, &to, mints.as_deref())
1900 .await
1901 {
1902 Ok(q) => q,
1903 Err(e) => {
1904 emit_error(&app.writer, Some(id), &e, start).await;
1905 return;
1906 }
1907 };
1908 let spend_amount = quote.amount_native + quote.fee_estimate_native;
1909 let provider_key = require_store(app)
1910 .and_then(|s| s.load_wallet_metadata("e.wallet))
1911 .ok()
1912 .map(|meta| wallet_provider_key(&meta))
1913 .unwrap_or_else(|| target_network.to_string());
1914 let spend_ctx = SpendContext {
1915 network: provider_key,
1916 wallet: Some(quote.wallet.clone()),
1917 amount_native: spend_amount,
1918 token: extract_token_from_target(&to),
1919 };
1920 match app
1921 .spend_ledger
1922 .reserve(&format!("send:{id}"), &spend_ctx)
1923 .await
1924 {
1925 Ok(rid) => reservation_id = Some(rid),
1926 Err(e) => {
1927 if let PayError::LimitExceeded {
1928 rule_id,
1929 scope,
1930 scope_key,
1931 spent,
1932 max_spend,
1933 token,
1934 remaining_s,
1935 origin,
1936 } = &e
1937 {
1938 let _ = app
1939 .writer
1940 .send(Output::LimitExceeded {
1941 id,
1942 rule_id: rule_id.clone(),
1943 scope: *scope,
1944 scope_key: scope_key.clone(),
1945 spent: *spent,
1946 max_spend: *max_spend,
1947 token: token.clone(),
1948 remaining_s: *remaining_s,
1949 origin: origin.clone(),
1950 trace: trace_from(start),
1951 })
1952 .await;
1953 } else {
1954 emit_error(&app.writer, Some(id), &e, start).await;
1955 }
1956 return;
1957 }
1958 }
1959 }
1960 match provider
1961 .send(
1962 &wallet_for_call,
1963 &to,
1964 onchain_memo.as_deref(),
1965 mints.as_deref(),
1966 )
1967 .await
1968 {
1969 Ok(r) => {
1970 if let Some(rid) = reservation_id {
1971 let _ = app.spend_ledger.confirm(rid).await;
1972 }
1973 if local_memo.is_some() {
1974 if let Some(s) = &app.store {
1975 let _ = s.update_transaction_record_memo(
1976 &r.transaction_id,
1977 local_memo.as_ref(),
1978 );
1979 }
1980 }
1981 let _ = app
1982 .writer
1983 .send(Output::Sent {
1984 id,
1985 wallet: r.wallet,
1986 transaction_id: r.transaction_id,
1987 amount: r.amount,
1988 fee: r.fee,
1989 preimage: r.preimage,
1990 trace: trace_from(start),
1991 })
1992 .await;
1993 }
1994 Err(e) => {
1995 if let Some(rid) = reservation_id {
1996 let _ = app.spend_ledger.cancel(rid).await;
1997 }
1998 emit_error(&app.writer, Some(id), &e, start).await
1999 }
2000 }
2001 }
2002
2003 Input::Restore { id, wallet } => {
2004 let start = Instant::now();
2005 emit_log(
2006 app,
2007 "wallet",
2008 Some(id.clone()),
2009 serde_json::json!({
2010 "operation": "restore", "wallet": &wallet,
2011 }),
2012 )
2013 .await;
2014 match try_provider!(&app.providers, |p| p.restore(&wallet)) {
2015 Ok(r) => {
2016 let _ = app
2017 .writer
2018 .send(Output::Restored {
2019 id,
2020 wallet: r.wallet,
2021 unspent: r.unspent,
2022 spent: r.spent,
2023 pending: r.pending,
2024 unit: r.unit,
2025 trace: trace_from(start),
2026 })
2027 .await;
2028 }
2029 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2030 }
2031 }
2032
2033 Input::WalletShowSeed { id, wallet } => {
2034 let start = Instant::now();
2035 emit_log(
2036 app,
2037 "wallet",
2038 Some(id.clone()),
2039 serde_json::json!({
2040 "operation": "wallet_show_seed", "wallet": &wallet,
2041 }),
2042 )
2043 .await;
2044 match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
2045 Ok(meta) => match meta.network {
2046 Network::Cashu => match meta.seed_secret {
2047 Some(mnemonic) => {
2048 let _ = app
2049 .writer
2050 .send(Output::WalletSeed {
2051 id,
2052 wallet,
2053 mnemonic_secret: mnemonic,
2054 trace: trace_from(start),
2055 })
2056 .await;
2057 }
2058 None => {
2059 emit_error(
2060 &app.writer,
2061 Some(id),
2062 &PayError::InternalError("wallet has no seed".to_string()),
2063 start,
2064 )
2065 .await;
2066 }
2067 },
2068 Network::Sol => match meta.seed_secret {
2069 Some(secret) => {
2070 if looks_like_bip39_mnemonic(&secret) {
2071 let _ = app
2072 .writer
2073 .send(Output::WalletSeed {
2074 id,
2075 wallet,
2076 mnemonic_secret: secret,
2077 trace: trace_from(start),
2078 })
2079 .await;
2080 } else {
2081 emit_error(
2082 &app.writer,
2083 Some(id),
2084 &PayError::InvalidAmount(
2085 "this sol wallet was created before mnemonic support; create a new sol wallet to get 12-word backup".to_string(),
2086 ),
2087 start,
2088 )
2089 .await;
2090 }
2091 }
2092 None => {
2093 emit_error(
2094 &app.writer,
2095 Some(id),
2096 &PayError::InternalError("wallet has no seed".to_string()),
2097 start,
2098 )
2099 .await;
2100 }
2101 },
2102 Network::Ln => {
2103 emit_error(
2104 &app.writer,
2105 Some(id),
2106 &PayError::InvalidAmount(
2107 "ln wallets do not have mnemonic words; they store backend credentials (nwc-uri/password/admin-key)".to_string(),
2108 ),
2109 start,
2110 )
2111 .await;
2112 }
2113 Network::Evm | Network::Btc => match meta.seed_secret {
2114 Some(secret) => {
2115 if looks_like_bip39_mnemonic(&secret) {
2116 let _ = app
2117 .writer
2118 .send(Output::WalletSeed {
2119 id,
2120 wallet,
2121 mnemonic_secret: secret,
2122 trace: trace_from(start),
2123 })
2124 .await;
2125 } else {
2126 emit_error(
2127 &app.writer,
2128 Some(id),
2129 &PayError::InvalidAmount(
2130 "this wallet was created before mnemonic support; create a new wallet to get 12-word backup".to_string(),
2131 ),
2132 start,
2133 )
2134 .await;
2135 }
2136 }
2137 None => {
2138 emit_error(
2139 &app.writer,
2140 Some(id),
2141 &PayError::InternalError("wallet has no seed".to_string()),
2142 start,
2143 )
2144 .await;
2145 }
2146 },
2147 },
2148 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2149 }
2150 }
2151
2152 Input::HistoryList {
2153 id,
2154 wallet,
2155 network,
2156 onchain_memo,
2157 limit,
2158 offset,
2159 since_epoch_s,
2160 until_epoch_s,
2161 } => {
2162 let start = Instant::now();
2163 let lim = limit.unwrap_or(20);
2164 let off = offset.unwrap_or(0);
2165 let memo_filter = onchain_memo
2166 .as_deref()
2167 .map(str::trim)
2168 .filter(|value| !value.is_empty())
2169 .map(str::to_owned);
2170 let store = match require_store(app) {
2171 Ok(store) => store,
2172 Err(e) => {
2173 emit_error(&app.writer, Some(id), &e, start).await;
2174 return;
2175 }
2176 };
2177
2178 let mut all_txs = Vec::new();
2179 if let Some(wallet_id) = wallet {
2180 let meta = match store.load_wallet_metadata(&wallet_id) {
2181 Ok(meta) => meta,
2182 Err(e) => {
2183 emit_error(&app.writer, Some(id), &e, start).await;
2184 return;
2185 }
2186 };
2187 if let Some(expected_network) = network {
2188 if meta.network != expected_network {
2189 let _ = app
2190 .writer
2191 .send(Output::History {
2192 id,
2193 items: Vec::new(),
2194 trace: trace_from(start),
2195 })
2196 .await;
2197 return;
2198 }
2199 }
2200 match store.load_wallet_transaction_records(&wallet_id) {
2201 Ok(mut records) => all_txs.append(&mut records),
2202 Err(e) => {
2203 emit_error(&app.writer, Some(id), &e, start).await;
2204 return;
2205 }
2206 }
2207 } else {
2208 let wallets = match store.list_wallet_metadata(network) {
2209 Ok(wallets) => wallets,
2210 Err(e) => {
2211 emit_error(&app.writer, Some(id), &e, start).await;
2212 return;
2213 }
2214 };
2215 for wallet_meta in wallets {
2216 match store.load_wallet_transaction_records(&wallet_meta.id) {
2217 Ok(mut records) => all_txs.append(&mut records),
2218 Err(e) => {
2219 emit_error(&app.writer, Some(id.clone()), &e, start).await;
2220 return;
2221 }
2222 }
2223 }
2224 }
2225
2226 if let Some(expected_network) = network {
2227 all_txs.retain(|item| item.network == expected_network);
2228 }
2229 if let Some(since) = since_epoch_s {
2230 all_txs.retain(|item| item.created_at_epoch_s >= since);
2231 }
2232 if let Some(until) = until_epoch_s {
2233 all_txs.retain(|item| item.created_at_epoch_s < until);
2234 }
2235 if let Some(filter) = memo_filter.as_deref() {
2236 all_txs.retain(|item| item.onchain_memo.as_deref() == Some(filter));
2237 }
2238 all_txs.sort_by(|a, b| b.created_at_epoch_s.cmp(&a.created_at_epoch_s));
2239 let start_idx = all_txs.len().min(off);
2240 let end_idx = all_txs.len().min(off.saturating_add(lim));
2241 let items = all_txs[start_idx..end_idx].to_vec();
2242 let _ = app
2243 .writer
2244 .send(Output::History {
2245 id,
2246 items,
2247 trace: trace_from(start),
2248 })
2249 .await;
2250 }
2251
2252 Input::HistoryStatus { id, transaction_id } => {
2253 let start = Instant::now();
2254 let mut routed: Option<Result<HistoryStatusInfo, PayError>> = None;
2255 for provider in app.providers.values() {
2256 match provider.history_status(&transaction_id).await {
2257 Ok(info) => {
2258 routed = Some(Ok(info));
2259 break;
2260 }
2261 Err(PayError::NotImplemented(_)) | Err(PayError::WalletNotFound(_)) => {}
2262 Err(err) => {
2263 routed = Some(Err(err));
2264 break;
2265 }
2266 }
2267 }
2268 match routed.unwrap_or_else(|| {
2269 Err(PayError::WalletNotFound(format!(
2270 "transaction {transaction_id} not found"
2271 )))
2272 }) {
2273 Ok(info) => {
2274 let _ = app
2275 .writer
2276 .send(Output::HistoryStatus {
2277 id,
2278 transaction_id: info.transaction_id,
2279 status: info.status,
2280 confirmations: info.confirmations,
2281 preimage: info.preimage,
2282 item: info.item,
2283 trace: trace_from(start),
2284 })
2285 .await;
2286 }
2287 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2288 }
2289 }
2290
2291 Input::HistoryUpdate {
2292 id,
2293 wallet,
2294 network,
2295 limit,
2296 } => {
2297 let start = Instant::now();
2298 if let Err(e) = require_store(app) {
2299 emit_error(&app.writer, Some(id), &e, start).await;
2300 return;
2301 }
2302
2303 let sync_limit = limit.unwrap_or(200).clamp(1, 5000);
2304 let mut totals = HistorySyncStats::default();
2305 let mut wallets_synced = 0usize;
2306
2307 if let Some(wallet_id) = wallet {
2308 let sync_result = if let Some(expected_network) = network {
2309 match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet_id)) {
2310 Ok(meta) if meta.network != expected_network => {
2311 Err(PayError::InvalidAmount(format!(
2312 "wallet {wallet_id} belongs to {}, not {expected_network}",
2313 meta.network
2314 )))
2315 }
2316 Ok(_) => match get_provider(&app.providers, expected_network) {
2317 Some(provider) => provider.history_sync(&wallet_id, sync_limit).await,
2318 None => Err(PayError::NotImplemented(format!(
2319 "network {expected_network} not enabled"
2320 ))),
2321 },
2322 Err(e) => Err(e),
2323 }
2324 } else {
2325 match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet_id)) {
2326 Ok(meta) => match get_provider(&app.providers, meta.network) {
2327 Some(provider) => provider.history_sync(&wallet_id, sync_limit).await,
2328 None => Err(PayError::NotImplemented(format!(
2329 "network {} not enabled",
2330 meta.network
2331 ))),
2332 },
2333 Err(e) => Err(e),
2334 }
2335 };
2336
2337 match sync_result {
2338 Ok(stats) => {
2339 wallets_synced = 1;
2340 totals.records_scanned =
2341 totals.records_scanned.saturating_add(stats.records_scanned);
2342 totals.records_added =
2343 totals.records_added.saturating_add(stats.records_added);
2344 totals.records_updated =
2345 totals.records_updated.saturating_add(stats.records_updated);
2346 }
2347 Err(e) => {
2348 emit_error(&app.writer, Some(id), &e, start).await;
2349 return;
2350 }
2351 }
2352 } else {
2353 let target_networks: Vec<Network> = if let Some(single) = network {
2354 vec![single]
2355 } else {
2356 vec![
2357 Network::Cashu,
2358 Network::Ln,
2359 Network::Sol,
2360 Network::Evm,
2361 Network::Btc,
2362 ]
2363 };
2364
2365 let wallets = match require_store(app).and_then(|s| s.list_wallet_metadata(None)) {
2366 Ok(wallets) => wallets,
2367 Err(e) => {
2368 emit_error(&app.writer, Some(id), &e, start).await;
2369 return;
2370 }
2371 };
2372
2373 for network_key in target_networks {
2374 let Some(provider) = get_provider(&app.providers, network_key) else {
2375 if network.is_some() {
2376 emit_error(
2377 &app.writer,
2378 Some(id),
2379 &PayError::NotImplemented(format!(
2380 "network {network_key} not enabled"
2381 )),
2382 start,
2383 )
2384 .await;
2385 return;
2386 }
2387 continue;
2388 };
2389 for wallet_meta in &wallets {
2390 if wallet_meta.network != network_key {
2391 continue;
2392 }
2393 match provider.history_sync(&wallet_meta.id, sync_limit).await {
2394 Ok(stats) => {
2395 wallets_synced = wallets_synced.saturating_add(1);
2396 totals.records_scanned =
2397 totals.records_scanned.saturating_add(stats.records_scanned);
2398 totals.records_added =
2399 totals.records_added.saturating_add(stats.records_added);
2400 totals.records_updated =
2401 totals.records_updated.saturating_add(stats.records_updated);
2402 }
2403 Err(PayError::NotImplemented(_)) | Err(PayError::WalletNotFound(_)) => {
2404 }
2405 Err(e) => {
2406 emit_error(&app.writer, Some(id), &e, start).await;
2407 return;
2408 }
2409 }
2410 }
2411 }
2412 }
2413
2414 let _ = app
2415 .writer
2416 .send(Output::HistoryUpdated {
2417 id,
2418 wallets_synced,
2419 records_scanned: totals.records_scanned,
2420 records_added: totals.records_added,
2421 records_updated: totals.records_updated,
2422 trace: trace_from(start),
2423 })
2424 .await;
2425 }
2426
2427 Input::LimitAdd { id, mut limit } => {
2428 let start = Instant::now();
2429 if !app.enforce_limits {
2430 emit_error(
2431 &app.writer,
2432 Some(id),
2433 &PayError::NotImplemented(
2434 "limit_add is unavailable when limits are not enforced locally; configure limits on the RPC daemon"
2435 .to_string(),
2436 ),
2437 start,
2438 )
2439 .await;
2440 return;
2441 }
2442
2443 if limit.scope == SpendScope::Wallet && limit.network.is_none() {
2445 if let Some(wallet_id) = limit.wallet.as_deref() {
2446 match require_store(app).and_then(|s| s.load_wallet_metadata(wallet_id)) {
2447 Ok(meta) => {
2448 limit.network = Some(meta.network.to_string());
2449 }
2450 Err(e) => {
2451 emit_error(&app.writer, Some(id), &e, start).await;
2452 return;
2453 }
2454 }
2455 }
2456 }
2457
2458 match app.spend_ledger.add_limit(&mut limit).await {
2459 Ok(rule_id) => {
2460 let _ = app
2461 .writer
2462 .send(Output::LimitAdded {
2463 id,
2464 rule_id,
2465 trace: trace_from(start),
2466 })
2467 .await;
2468 }
2469 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2470 }
2471 }
2472
2473 Input::LimitRemove { id, rule_id } => {
2474 let start = Instant::now();
2475 if !app.enforce_limits {
2476 emit_error(
2477 &app.writer,
2478 Some(id),
2479 &PayError::NotImplemented(
2480 "limit_remove is unavailable when limits are not enforced locally; configure limits on the RPC daemon"
2481 .to_string(),
2482 ),
2483 start,
2484 )
2485 .await;
2486 return;
2487 }
2488
2489 match app.spend_ledger.remove_limit(&rule_id).await {
2490 Ok(()) => {
2491 let _ = app
2492 .writer
2493 .send(Output::LimitRemoved {
2494 id,
2495 rule_id,
2496 trace: trace_from(start),
2497 })
2498 .await;
2499 }
2500 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2501 }
2502 }
2503
2504 Input::LimitList { id } => {
2505 let start = Instant::now();
2506 let local_limits = if app.enforce_limits {
2507 match app.spend_ledger.get_status().await {
2508 Ok(status) => status,
2509 Err(e) => {
2510 emit_error(&app.writer, Some(id), &e, start).await;
2511 return;
2512 }
2513 }
2514 } else {
2515 vec![]
2516 };
2517
2518 let config = app.config.read().await.clone();
2520 let downstream = query_downstream_limits(&config).await;
2521
2522 let _ = app
2523 .writer
2524 .send(Output::LimitStatus {
2525 id,
2526 limits: local_limits,
2527 downstream,
2528 trace: trace_from(start),
2529 })
2530 .await;
2531 }
2532
2533 Input::LimitSet { id, mut limits } => {
2534 let start = Instant::now();
2535 if !app.enforce_limits {
2536 emit_error(
2537 &app.writer,
2538 Some(id),
2539 &PayError::NotImplemented(
2540 "limit_set is unavailable when limits are not enforced locally; configure limits on the RPC daemon"
2541 .to_string(),
2542 ),
2543 start,
2544 )
2545 .await;
2546 return;
2547 }
2548
2549 for rule in &mut limits {
2551 if rule.scope == SpendScope::Wallet && rule.network.is_none() {
2552 if let Some(wallet_id) = rule.wallet.as_deref() {
2553 match require_store(app).and_then(|s| s.load_wallet_metadata(wallet_id)) {
2554 Ok(meta) => {
2555 rule.network = Some(meta.network.to_string());
2556 }
2557 Err(e) => {
2558 emit_error(&app.writer, Some(id), &e, start).await;
2559 return;
2560 }
2561 }
2562 }
2563 }
2564 }
2565
2566 match app.spend_ledger.set_limits(&limits).await {
2567 Ok(()) => match app.spend_ledger.get_status().await {
2568 Ok(status) => {
2569 let _ = app
2570 .writer
2571 .send(Output::LimitStatus {
2572 id,
2573 limits: status,
2574 downstream: vec![],
2575 trace: trace_from(start),
2576 })
2577 .await;
2578 }
2579 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2580 },
2581 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2582 }
2583 }
2584
2585 Input::WalletConfigShow { id, wallet } => {
2586 let start = Instant::now();
2587 match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
2588 Ok(meta) => {
2589 let resolved_wallet = meta.id.clone();
2590 let _ = app
2591 .writer
2592 .send(Output::WalletConfig {
2593 id,
2594 wallet: resolved_wallet,
2595 config: meta,
2596 trace: trace_from(start),
2597 })
2598 .await;
2599 }
2600 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2601 }
2602 }
2603
2604 Input::WalletConfigSet {
2605 id,
2606 wallet,
2607 label,
2608 rpc_endpoints,
2609 chain_id,
2610 } => {
2611 let start = Instant::now();
2612 match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
2613 Ok(mut meta) => {
2614 let resolved_wallet = meta.id.clone();
2615 let mut changed = false;
2616
2617 if let Some(new_label) = label {
2618 let trimmed = new_label.trim();
2619 meta.label = if trimmed.is_empty() {
2620 None
2621 } else {
2622 Some(trimmed.to_string())
2623 };
2624 changed = true;
2625 }
2626
2627 if !rpc_endpoints.is_empty() {
2628 match meta.network {
2629 Network::Sol => {
2630 meta.sol_rpc_endpoints = Some(rpc_endpoints);
2631 changed = true;
2632 }
2633 Network::Evm => {
2634 meta.evm_rpc_endpoints = Some(rpc_endpoints);
2635 changed = true;
2636 }
2637 _ => {
2638 emit_error(
2639 &app.writer,
2640 Some(id),
2641 &PayError::InvalidAmount(format!(
2642 "rpc-endpoint not supported for {} wallets",
2643 meta.network
2644 )),
2645 start,
2646 )
2647 .await;
2648 return;
2649 }
2650 }
2651 }
2652
2653 if let Some(cid) = chain_id {
2654 if meta.network != Network::Evm {
2655 emit_error(
2656 &app.writer,
2657 Some(id),
2658 &PayError::InvalidAmount(
2659 "chain-id is only supported for evm wallets".to_string(),
2660 ),
2661 start,
2662 )
2663 .await;
2664 return;
2665 }
2666 meta.evm_chain_id = Some(cid);
2667 changed = true;
2668 }
2669
2670 if !changed {
2671 emit_error(
2672 &app.writer,
2673 Some(id),
2674 &PayError::InvalidAmount(
2675 "no configuration changes specified".to_string(),
2676 ),
2677 start,
2678 )
2679 .await;
2680 return;
2681 }
2682
2683 match require_store(app).and_then(|s| s.save_wallet_metadata(&meta)) {
2684 Ok(()) => {
2685 let _ = app
2686 .writer
2687 .send(Output::WalletConfigUpdated {
2688 id,
2689 wallet: resolved_wallet,
2690 trace: trace_from(start),
2691 })
2692 .await;
2693 }
2694 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2695 }
2696 }
2697 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2698 }
2699 }
2700
2701 Input::WalletConfigTokenAdd {
2702 id,
2703 wallet,
2704 symbol,
2705 address,
2706 decimals,
2707 } => {
2708 let start = Instant::now();
2709 match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
2710 Ok(mut meta) => {
2711 let resolved_wallet = meta.id.clone();
2712 if !matches!(meta.network, Network::Evm | Network::Sol) {
2713 emit_error(
2714 &app.writer,
2715 Some(id),
2716 &PayError::InvalidAmount(format!(
2717 "custom tokens not supported for {} wallets",
2718 meta.network
2719 )),
2720 start,
2721 )
2722 .await;
2723 return;
2724 }
2725
2726 let lower_symbol = symbol.to_ascii_lowercase();
2727 let tokens = meta.custom_tokens.get_or_insert_with(Vec::new);
2728 if tokens
2729 .iter()
2730 .any(|t| t.symbol.to_ascii_lowercase() == lower_symbol)
2731 {
2732 emit_error(
2733 &app.writer,
2734 Some(id),
2735 &PayError::InvalidAmount(format!(
2736 "custom token '{lower_symbol}' already registered"
2737 )),
2738 start,
2739 )
2740 .await;
2741 return;
2742 }
2743
2744 tokens.push(wallet::CustomToken {
2745 symbol: lower_symbol.clone(),
2746 address: address.clone(),
2747 decimals,
2748 });
2749
2750 match require_store(app).and_then(|s| s.save_wallet_metadata(&meta)) {
2751 Ok(()) => {
2752 let _ = app
2753 .writer
2754 .send(Output::WalletConfigTokenAdded {
2755 id,
2756 wallet: resolved_wallet,
2757 symbol: lower_symbol,
2758 address,
2759 decimals,
2760 trace: trace_from(start),
2761 })
2762 .await;
2763 }
2764 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2765 }
2766 }
2767 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2768 }
2769 }
2770
2771 Input::WalletConfigTokenRemove { id, wallet, symbol } => {
2772 let start = Instant::now();
2773 match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
2774 Ok(mut meta) => {
2775 let resolved_wallet = meta.id.clone();
2776 let lower_symbol = symbol.to_ascii_lowercase();
2777 let tokens = meta.custom_tokens.get_or_insert_with(Vec::new);
2778 let before_len = tokens.len();
2779 tokens.retain(|t| t.symbol.to_ascii_lowercase() != lower_symbol);
2780 if tokens.len() == before_len {
2781 emit_error(
2782 &app.writer,
2783 Some(id),
2784 &PayError::InvalidAmount(format!(
2785 "custom token '{lower_symbol}' not found"
2786 )),
2787 start,
2788 )
2789 .await;
2790 return;
2791 }
2792 if tokens.is_empty() {
2793 meta.custom_tokens = None;
2794 }
2795
2796 match require_store(app).and_then(|s| s.save_wallet_metadata(&meta)) {
2797 Ok(()) => {
2798 let _ = app
2799 .writer
2800 .send(Output::WalletConfigTokenRemoved {
2801 id,
2802 wallet: resolved_wallet,
2803 symbol: lower_symbol,
2804 trace: trace_from(start),
2805 })
2806 .await;
2807 }
2808 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2809 }
2810 }
2811 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2812 }
2813 }
2814
2815 Input::Config(patch) => {
2816 let start = Instant::now();
2817 let ConfigPatch {
2818 data_dir,
2819 limits,
2820 log,
2821 exchange_rate,
2822 afpay_rpc,
2823 providers,
2824 } = patch;
2825
2826 let mut unsupported = Vec::new();
2827 if data_dir.is_some() {
2828 unsupported.push("data_dir");
2829 }
2830 if afpay_rpc.is_some() {
2831 unsupported.push("afpay_rpc");
2832 }
2833 if providers.is_some() {
2834 unsupported.push("providers");
2835 }
2836 if exchange_rate.is_some() {
2837 unsupported.push("exchange_rate");
2838 }
2839 if !unsupported.is_empty() {
2840 let err = PayError::NotImplemented(format!(
2841 "runtime config only supports 'log' and 'limits'; unsupported fields: {}",
2842 unsupported.join(", ")
2843 ));
2844 emit_error(&app.writer, None, &err, start).await;
2845 return;
2846 }
2847
2848 if let Some(ref v) = limits {
2849 if !app.enforce_limits {
2850 let err = PayError::NotImplemented(
2851 "config.limits is unavailable when limits are not enforced locally; configure limits on the RPC daemon"
2852 .to_string(),
2853 );
2854 emit_error(&app.writer, None, &err, start).await;
2855 return;
2856 }
2857 if let Err(e) = app.spend_ledger.set_limits(v).await {
2858 emit_error(&app.writer, None, &e, start).await;
2859 return;
2860 }
2861 }
2862
2863 let mut cfg = app.config.write().await;
2864 if let Some(v) = limits {
2865 cfg.limits = v;
2866 }
2867 if let Some(v) = log {
2868 cfg.log = agent_first_data::cli_parse_log_filters(&v);
2869 }
2870 let _ = app.writer.send(Output::Config(cfg.clone())).await;
2871 }
2872
2873 Input::Version => {
2874 let _ = app
2875 .writer
2876 .send(Output::Version {
2877 version: crate::config::VERSION.to_string(),
2878 trace: PongTrace {
2879 uptime_s: app.start_time.elapsed().as_secs(),
2880 requests_total: app.requests_total.load(Ordering::Relaxed),
2881 in_flight: app.in_flight.lock().await.len(),
2882 },
2883 })
2884 .await;
2885 }
2886
2887 Input::Close => {
2888 }
2890 }
2891
2892 emit_migration_log(app).await;
2893}
2894
2895fn get_provider(
2900 providers: &HashMap<Network, Box<dyn PayProvider>>,
2901 network: Network,
2902) -> Option<&dyn PayProvider> {
2903 providers.get(&network).map(|p| p.as_ref())
2904}
2905
2906fn looks_like_bip39_mnemonic(secret: &str) -> bool {
2907 let words = secret.split_whitespace().count();
2908 words == 12 || words == 24
2909}
2910
2911async fn emit_error(
2912 writer: &mpsc::Sender<Output>,
2913 id: Option<String>,
2914 err: &PayError,
2915 start: Instant,
2916) {
2917 emit_error_hint(writer, id, err, start, None).await;
2918}
2919
2920async fn emit_error_hint(
2923 writer: &mpsc::Sender<Output>,
2924 id: Option<String>,
2925 err: &PayError,
2926 start: Instant,
2927 hint_override: Option<&str>,
2928) {
2929 let _ = writer
2930 .send(Output::Error {
2931 id,
2932 error_code: err.error_code().to_string(),
2933 error: err.to_string(),
2934 hint: hint_override.map(|h| h.to_string()).or_else(|| err.hint()),
2935 retryable: err.retryable(),
2936 trace: trace_from(start),
2937 })
2938 .await;
2939}
2940
2941fn extract_id(input: &Input) -> Option<String> {
2942 match input {
2943 Input::WalletCreate { id, .. }
2944 | Input::LnWalletCreate { id, .. }
2945 | Input::WalletClose { id, .. }
2946 | Input::WalletList { id, .. }
2947 | Input::Balance { id, .. }
2948 | Input::Receive { id, .. }
2949 | Input::ReceiveClaim { id, .. }
2950 | Input::CashuSend { id, .. }
2951 | Input::CashuReceive { id, .. }
2952 | Input::Send { id, .. }
2953 | Input::Restore { id, .. }
2954 | Input::WalletShowSeed { id, .. }
2955 | Input::HistoryList { id, .. }
2956 | Input::HistoryStatus { id, .. }
2957 | Input::HistoryUpdate { id, .. }
2958 | Input::LimitAdd { id, .. }
2959 | Input::LimitRemove { id, .. }
2960 | Input::LimitList { id, .. }
2961 | Input::LimitSet { id, .. }
2962 | Input::WalletConfigShow { id, .. }
2963 | Input::WalletConfigSet { id, .. }
2964 | Input::WalletConfigTokenAdd { id, .. }
2965 | Input::WalletConfigTokenRemove { id, .. } => Some(id.clone()),
2966 Input::Config(_) | Input::Version | Input::Close => None,
2967 }
2968}
2969
2970fn trace_from(start: Instant) -> Trace {
2971 Trace::from_duration(start.elapsed().as_millis() as u64)
2972}
2973
2974async fn query_downstream_limits(config: &RuntimeConfig) -> Vec<DownstreamLimitNode> {
2976 let mut result = Vec::new();
2977 let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
2978 for (name, rpc_cfg) in &config.afpay_rpc {
2979 if !seen.insert(rpc_cfg.endpoint.clone()) {
2980 continue;
2981 }
2982 let secret = rpc_cfg.endpoint_secret.as_deref().unwrap_or("");
2983 let limit_input = Input::LimitList {
2984 id: format!("downstream_{name}"),
2985 };
2986 let outputs =
2987 crate::provider::remote::rpc_call(&rpc_cfg.endpoint, secret, &limit_input).await;
2988 let mut node = DownstreamLimitNode {
2989 name: name.clone(),
2990 endpoint: rpc_cfg.endpoint.clone(),
2991 limits: vec![],
2992 error: None,
2993 downstream: vec![],
2994 };
2995 for value in &outputs {
2996 if value.get("code").and_then(|v| v.as_str()) == Some("error") {
2997 node.error = value
2998 .get("error")
2999 .and_then(|v| v.as_str())
3000 .map(|s| s.to_string());
3001 }
3002 if value.get("code").and_then(|v| v.as_str()) == Some("limit_status") {
3003 if let Some(limits) = value.get("limits") {
3004 node.limits = serde_json::from_value(limits.clone()).unwrap_or_default();
3005 }
3006 if let Some(ds) = value.get("downstream") {
3007 node.downstream = serde_json::from_value(ds.clone()).unwrap_or_default();
3008 }
3009 }
3010 }
3011 result.push(node);
3012 }
3013 result
3014}
3015
3016fn extract_token_from_target(to: &str) -> Option<String> {
3018 let query = to.split('?').nth(1)?;
3019 for part in query.split('&') {
3020 if let Some(val) = part.strip_prefix("token=") {
3021 if !val.is_empty() {
3022 return Some(val.to_string());
3023 }
3024 }
3025 }
3026 None
3027}
3028
3029fn wallet_provider_key(meta: &wallet::WalletMetadata) -> String {
3030 match meta.network {
3031 Network::Ln => meta
3032 .backend
3033 .as_deref()
3034 .map(|b| format!("ln-{}", b.to_ascii_lowercase()))
3035 .unwrap_or_else(|| "ln".to_string()),
3036 _ => meta.network.to_string(),
3037 }
3038}
3039
3040fn wallet_summary_from_meta(meta: &wallet::WalletMetadata, wallet_id: &str) -> WalletSummary {
3041 let (address, backend) = match meta.network {
3042 Network::Cashu => (meta.mint_url.clone().unwrap_or_default(), None),
3043 Network::Ln => {
3044 let b = meta
3045 .backend
3046 .clone()
3047 .unwrap_or_else(|| "unknown".to_string());
3048 (format!("ln:{b}"), Some(b))
3049 }
3050 _ => (wallet_id.to_string(), None),
3051 };
3052 WalletSummary {
3053 id: meta.id.clone(),
3054 network: meta.network,
3055 label: meta.label.clone(),
3056 address,
3057 backend,
3058 mint_url: meta.mint_url.clone(),
3059 rpc_endpoints: meta
3060 .sol_rpc_endpoints
3061 .clone()
3062 .or(meta.evm_rpc_endpoints.clone()),
3063 chain_id: meta.evm_chain_id,
3064 created_at_epoch_s: meta.created_at_epoch_s,
3065 }
3066}
3067
3068async fn resolve_wallet_summary(app: &App, wallet_id: &str) -> WalletSummary {
3069 if let Ok(meta) = require_store(app).and_then(|s| s.load_wallet_metadata(wallet_id)) {
3070 return wallet_summary_from_meta(&meta, wallet_id);
3071 }
3072 if let Ok(wallets) = collect_all!(&app.providers, |p| p.list_wallets()) {
3073 if let Some(summary) = wallets.into_iter().find(|w| w.id == wallet_id) {
3074 return summary;
3075 }
3076 }
3077 WalletSummary {
3078 id: wallet_id.to_string(),
3079 network: Network::Ln,
3080 label: None,
3081 address: String::new(),
3082 backend: None,
3083 mint_url: None,
3084 rpc_endpoints: None,
3085 chain_id: None,
3086 created_at_epoch_s: 0,
3087 }
3088}
3089
3090fn log_enabled(log: &[String], event: &str) -> bool {
3091 if log.is_empty() {
3092 return false;
3093 }
3094 let ev = event.to_ascii_lowercase();
3095 log.iter()
3096 .any(|f| f == "*" || f == "all" || ev.starts_with(f.as_str()))
3097}
3098
3099async fn emit_migration_log(app: &App) {
3100 let entries = app
3101 .store
3102 .as_ref()
3103 .map(|s| s.drain_migration_log())
3104 .unwrap_or_default();
3105 if entries.is_empty() {
3106 return;
3107 }
3108 for entry in entries {
3109 emit_log(
3110 app,
3111 "schema_migration",
3112 None,
3113 serde_json::json!({
3114 "database": entry.database,
3115 "from_version": entry.from_version,
3116 "to_version": entry.to_version,
3117 }),
3118 )
3119 .await;
3120 }
3121}
3122
3123async fn emit_log(app: &App, event: &str, request_id: Option<String>, args: serde_json::Value) {
3124 let log = app.config.read().await.log.clone();
3125 if !log_enabled(&log, event) {
3126 return;
3127 }
3128 let _ = app
3129 .writer
3130 .send(Output::Log {
3131 event: event.to_string(),
3132 request_id,
3133 version: None,
3134 argv: None,
3135 config: None,
3136 args: Some(args),
3137 env: None,
3138 trace: Trace::from_duration(0),
3139 })
3140 .await;
3141}