1#[cfg(any(
2 feature = "btc-esplora",
3 feature = "btc-core",
4 feature = "btc-electrum"
5))]
6use crate::provider::btc::BtcProvider;
7#[cfg(feature = "cashu")]
8use crate::provider::cashu::CashuProvider;
9#[cfg(feature = "evm")]
10use crate::provider::evm::EvmProvider;
11#[cfg(any(feature = "ln-nwc", feature = "ln-phoenixd", feature = "ln-lnbits"))]
12use crate::provider::ln::LnProvider;
13use crate::provider::remote::RemoteProvider;
14#[cfg(feature = "sol")]
15use crate::provider::sol::SolProvider;
16use crate::provider::{HistorySyncStats, PayError, PayProvider, StubProvider};
17use crate::spend::{SpendContext, SpendLedger};
18#[cfg(feature = "redb")]
19use crate::store::lock;
20use crate::store::wallet;
21use crate::store::{PayStore, StorageBackend};
22use crate::types::*;
23use std::collections::{HashMap, HashSet};
24use std::sync::atomic::{AtomicU64, Ordering};
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use tokio::sync::{mpsc, Mutex, RwLock};
28use tokio::task::JoinHandle;
29use tokio::time::sleep;
30
31const DEFAULT_WAIT_TIMEOUT_SECS: u64 = 300;
32const DEFAULT_WAIT_POLL_INTERVAL_MS: u64 = 1000;
33const DEFAULT_WAIT_SYNC_LIMIT: usize = 500;
34
35pub struct App {
36 pub config: RwLock<RuntimeConfig>,
37 pub providers: HashMap<Network, Box<dyn PayProvider>>,
38 pub writer: mpsc::Sender<Output>,
39 pub in_flight: Mutex<HashMap<String, JoinHandle<()>>>,
40 pub requests_total: AtomicU64,
41 pub start_time: Instant,
42 pub has_local_providers: bool,
44 pub enforce_limits: bool,
47 pub spend_ledger: SpendLedger,
48 pub store: Option<Arc<StorageBackend>>,
51}
52
53impl App {
54 pub fn new(
57 config: RuntimeConfig,
58 writer: mpsc::Sender<Output>,
59 enforce_limits_override: Option<bool>,
60 store: Option<StorageBackend>,
61 ) -> Self {
62 let store = store.map(Arc::new);
63 let mut providers: HashMap<Network, Box<dyn PayProvider>> = HashMap::new();
64
65 for network in &[
66 Network::Ln,
67 Network::Sol,
68 Network::Evm,
69 Network::Cashu,
70 Network::Btc,
71 ] {
72 let key = network.to_string();
73 if let Some(rpc_name) = config.providers.get(&key) {
74 if let Some(rpc_cfg) = config.afpay_rpc.get(rpc_name) {
76 let secret = rpc_cfg.endpoint_secret.as_deref().unwrap_or("");
77 providers.insert(
78 *network,
79 Box::new(RemoteProvider::new(&rpc_cfg.endpoint, secret, *network)),
80 );
81 } else {
82 providers.insert(*network, Box::new(StubProvider::new(*network)));
84 }
85 } else {
86 #[allow(unreachable_patterns)]
87 match network {
88 #[cfg(feature = "cashu")]
89 Network::Cashu => {
90 if let Some(s) = &store {
91 let pg_url = config
92 .postgres_url_secret
93 .clone()
94 .filter(|_| config.storage_backend.as_deref() == Some("postgres"));
95 providers.insert(
96 *network,
97 Box::new(CashuProvider::new(&config.data_dir, pg_url, s.clone())),
98 );
99 } else {
100 providers.insert(*network, Box::new(StubProvider::new(*network)));
101 }
102 }
103 #[cfg(any(feature = "ln-nwc", feature = "ln-phoenixd", feature = "ln-lnbits"))]
104 Network::Ln => {
105 providers.insert(*network, Box::new(LnProvider::new(&config.data_dir)));
106 }
107 #[cfg(feature = "sol")]
108 Network::Sol => {
109 providers.insert(*network, Box::new(SolProvider::new(&config.data_dir)));
110 }
111 #[cfg(feature = "evm")]
112 Network::Evm => {
113 providers.insert(*network, Box::new(EvmProvider::new(&config.data_dir)));
114 }
115 #[cfg(any(
116 feature = "btc-esplora",
117 feature = "btc-core",
118 feature = "btc-electrum"
119 ))]
120 Network::Btc => {
121 providers.insert(*network, Box::new(BtcProvider::new(&config.data_dir)));
122 }
123 _ => {
124 providers.insert(*network, Box::new(StubProvider::new(*network)));
125 }
126 }
127 }
128 }
129
130 let has_local = providers.values().any(|p| p.writes_locally());
131 let spend_ledger = match store.as_deref() {
132 #[cfg(feature = "postgres")]
133 Some(StorageBackend::Postgres(pg)) => {
134 SpendLedger::new_postgres(pg.pool().clone(), config.exchange_rate.clone())
135 }
136 _ => SpendLedger::new(&config.data_dir, config.exchange_rate.clone()),
137 };
138 Self {
139 config: RwLock::new(config),
140 providers,
141 writer,
142 in_flight: Mutex::new(HashMap::new()),
143 requests_total: AtomicU64::new(0),
144 start_time: Instant::now(),
145 has_local_providers: has_local,
146 enforce_limits: enforce_limits_override.unwrap_or(has_local),
147 spend_ledger,
148 store,
149 }
150 }
151}
152
153pub async fn startup_provider_validation_errors(config: &RuntimeConfig) -> Vec<Output> {
156 let mut errors = Vec::new();
157
158 for (network, rpc_name) in &config.providers {
160 if !config.afpay_rpc.contains_key(rpc_name) {
161 errors.push(Output::Error {
162 id: None,
163 error_code: "invalid_config".to_string(),
164 error: format!(
165 "providers.{network} references unknown afpay_rpc node '{rpc_name}'"
166 ),
167 hint: Some(format!(
168 "add [afpay_rpc.{rpc_name}] with endpoint and endpoint_secret to config.toml"
169 )),
170 retryable: false,
171 trace: Trace::from_duration(0),
172 });
173 }
174 }
175 if !errors.is_empty() {
176 return errors;
177 }
178
179 let mut pinged: std::collections::HashSet<String> = std::collections::HashSet::new();
181 for (rpc_name, rpc_cfg) in &config.afpay_rpc {
182 if !pinged.insert(rpc_cfg.endpoint.clone()) {
183 continue;
184 }
185 let network = config
187 .providers
188 .iter()
189 .find(|(_, name)| *name == rpc_name)
190 .and_then(|(k, _)| k.parse::<Network>().ok())
191 .unwrap_or(Network::Cashu);
192 let secret = rpc_cfg.endpoint_secret.as_deref().unwrap_or("");
193 let provider = RemoteProvider::new(&rpc_cfg.endpoint, secret, network);
194 if let Err(err) = provider.ping().await {
195 errors.push(Output::Error {
196 id: None,
197 error_code: "provider_unreachable".to_string(),
198 error: format!("afpay_rpc.{rpc_name} ({}): {err}", rpc_cfg.endpoint),
199 hint: Some("check endpoint address and that the daemon is running".to_string()),
200 retryable: true,
201 trace: Trace::from_duration(0),
202 });
203 }
204 }
205 errors
206}
207
208fn require_store(app: &App) -> Result<&StorageBackend, PayError> {
210 app.store
211 .as_deref()
212 .ok_or_else(|| PayError::NotImplemented("no storage backend available".to_string()))
213}
214
215#[cfg(feature = "redb")]
218async fn acquire_write_lock(app: &App) -> Result<lock::DataLock, PayError> {
219 let data_dir = app.config.read().await.data_dir.clone();
220 let lock = tokio::task::spawn_blocking(move || lock::acquire(&data_dir, None))
221 .await
222 .map_err(|e| PayError::InternalError(format!("lock task: {e}")))?
223 .map_err(PayError::InternalError)?;
224 Ok(lock)
225}
226
227fn needs_write_lock(input: &Input) -> bool {
228 matches!(
229 input,
230 Input::WalletCreate { .. }
231 | Input::LnWalletCreate { .. }
232 | Input::WalletClose { .. }
233 | Input::Receive { .. }
234 | Input::ReceiveClaim { .. }
235 | Input::CashuSend { .. }
236 | Input::CashuReceive { .. }
237 | Input::Send { .. }
238 | Input::Restore { .. }
239 | Input::LimitAdd { .. }
240 | Input::LimitRemove { .. }
241 | Input::LimitSet { .. }
242 | Input::HistoryUpdate { .. }
243 | Input::WalletConfigSet { .. }
244 | Input::WalletConfigTokenAdd { .. }
245 | Input::WalletConfigTokenRemove { .. }
246 )
247}
248
249macro_rules! try_provider {
251 ($providers:expr, |$p:ident| $call:expr) => {{
252 let mut _result: Option<Result<_, PayError>> = None;
253 for _prov in $providers.values() {
254 let $p = _prov.as_ref();
255 match $call.await {
256 Ok(v) => {
257 _result = Some(Ok(v));
258 break;
259 }
260 Err(PayError::NotImplemented(_)) => continue,
261 Err(e) => {
262 _result = Some(Err(e));
263 break;
264 }
265 }
266 }
267 match _result {
268 Some(r) => r,
269 None => Err(PayError::NotImplemented("network not enabled".to_string())),
270 }
271 }};
272}
273
274macro_rules! collect_all {
276 ($providers:expr, |$p:ident| $call:expr) => {{
277 let mut _all = Vec::new();
278 let mut _err: Option<PayError> = None;
279 for _prov in $providers.values() {
280 let $p = _prov.as_ref();
281 match $call.await {
282 Ok(mut items) => _all.append(&mut items),
283 Err(PayError::NotImplemented(_)) => {}
284 Err(e) => {
285 _err = Some(e);
286 break;
287 }
288 }
289 }
290 match _err {
291 Some(e) => Err(e),
292 None => Ok(_all),
293 }
294 }};
295}
296
297fn resolve_wallet_labels(input: &mut Input, store: &dyn PayStore) -> Result<(), PayError> {
300 fn resolve(store: &dyn PayStore, w: &mut String) -> Result<(), PayError> {
301 if !w.starts_with("w_") {
302 *w = store.resolve_wallet_id(w)?;
303 }
304 Ok(())
305 }
306 fn resolve_opt(store: &dyn PayStore, w: &mut Option<String>) -> Result<(), PayError> {
307 if let Some(val) = w.as_mut() {
308 if !val.starts_with("w_") {
309 *val = store.resolve_wallet_id(val)?;
310 }
311 }
312 Ok(())
313 }
314 match input {
315 Input::WalletClose { wallet, .. } => resolve(store, wallet),
316 Input::Balance { wallet, .. } => resolve_opt(store, wallet),
317 Input::Receive { wallet, .. } => resolve(store, wallet),
318 Input::ReceiveClaim { wallet, .. } => resolve(store, wallet),
319 Input::CashuSend { wallet, .. } => resolve_opt(store, wallet),
320 Input::CashuReceive { wallet, .. } => resolve_opt(store, wallet),
321 Input::Send { wallet, .. } => resolve_opt(store, wallet),
322 Input::Restore { wallet, .. } => resolve(store, wallet),
323 Input::WalletShowSeed { wallet, .. } => resolve(store, wallet),
324 Input::HistoryList { wallet, .. } | Input::HistoryUpdate { wallet, .. } => {
325 resolve_opt(store, wallet)
326 }
327 Input::WalletConfigShow { wallet, .. } => resolve(store, wallet),
328 Input::WalletConfigSet { wallet, .. } => resolve(store, wallet),
329 Input::WalletConfigTokenAdd { wallet, .. } => resolve(store, wallet),
330 Input::WalletConfigTokenRemove { wallet, .. } => resolve(store, wallet),
331 Input::LimitAdd { limit, .. } => resolve_opt(store, &mut limit.wallet),
332 Input::LimitSet { limits, .. } => {
333 for limit in limits.iter_mut() {
334 resolve_opt(store, &mut limit.wallet)?;
335 }
336 Ok(())
337 }
338 _ => Ok(()),
339 }
340}
341
342pub async fn dispatch(app: &App, input: Input) {
343 #[cfg(feature = "redb")]
346 let _lock = if app.has_local_providers
347 && needs_write_lock(&input)
348 && matches!(app.store.as_deref(), Some(StorageBackend::Redb(..)) | None)
349 {
350 match acquire_write_lock(app).await {
351 Ok(guard) => Some(guard),
352 Err(e) => {
353 let id = extract_id(&input);
354 emit_error(&app.writer, id, &e, Instant::now()).await;
355 return;
356 }
357 }
358 } else {
359 None
360 };
361
362 let mut input = input;
364 if let Some(store) = &app.store {
365 if let Err(e) = resolve_wallet_labels(&mut input, store.as_ref()) {
366 let id = extract_id(&input);
367 emit_error(&app.writer, id, &e, Instant::now()).await;
368 return;
369 }
370 }
371
372 match input {
373 Input::WalletCreate {
374 id,
375 network,
376 label,
377 mint_url,
378 rpc_endpoints,
379 chain_id,
380 mnemonic_secret,
381 btc_esplora_url,
382 btc_network,
383 btc_address_type,
384 btc_backend,
385 btc_core_url,
386 btc_core_auth_secret,
387 btc_electrum_url,
388 } => {
389 let start = Instant::now();
390 let mut log_args = serde_json::json!({
391 "operation": "wallet_create",
392 "network": network.to_string(),
393 "label": label.as_deref().unwrap_or("default"),
394 });
395 if let Some(object) = log_args.as_object_mut() {
396 if !rpc_endpoints.is_empty() {
397 object.insert(
398 "rpc_endpoints".to_string(),
399 serde_json::json!(rpc_endpoints),
400 );
401 }
402 if let Some(cid) = chain_id {
403 object.insert("chain_id".to_string(), serde_json::json!(cid));
404 }
405 if let Some(url) = mint_url.as_deref() {
406 object.insert("mint_url".to_string(), serde_json::json!(url));
407 }
408 object.insert(
409 "use_recovery_mnemonic".to_string(),
410 serde_json::json!(mnemonic_secret.is_some()),
411 );
412 }
413 emit_log(app, "wallet", Some(id.clone()), log_args).await;
414 let request = WalletCreateRequest {
415 label: label.unwrap_or_else(|| "default".to_string()),
416 mint_url,
417 rpc_endpoints,
418 chain_id,
419 mnemonic_secret,
420 btc_esplora_url,
421 btc_network,
422 btc_address_type,
423 btc_backend,
424 btc_core_url,
425 btc_core_auth_secret,
426 btc_electrum_url,
427 };
428 match get_provider(&app.providers, network) {
429 Some(p) => match p.create_wallet(&request).await {
430 Ok(info) => {
431 #[cfg(feature = "redb")]
433 if let Some(store) = app.store.as_ref() {
434 if let Ok(meta) = wallet::load_wallet_metadata(
435 &app.config.read().await.data_dir,
436 &info.id,
437 ) {
438 let _ = store.save_wallet_metadata(&meta);
439 }
440 }
441 let _ = app
442 .writer
443 .send(Output::WalletCreated {
444 id,
445 wallet: info.id,
446 network: info.network,
447 address: info.address,
448 mnemonic: None,
449 trace: trace_from(start),
450 })
451 .await;
452 }
453 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
454 },
455 None => {
456 emit_error(
457 &app.writer,
458 Some(id),
459 &PayError::NotImplemented(format!("no provider for {network}")),
460 start,
461 )
462 .await;
463 }
464 }
465 }
466
467 Input::LnWalletCreate { id, request } => {
468 let start = Instant::now();
469 let mut log_args =
470 serde_json::to_value(&request).unwrap_or_else(|_| serde_json::json!({}));
471 if let Some(object) = log_args.as_object_mut() {
472 object.insert(
473 "operation".to_string(),
474 serde_json::json!("ln_wallet_create"),
475 );
476 object.insert("network".to_string(), serde_json::json!("ln"));
477 }
478 emit_log(app, "wallet", Some(id.clone()), log_args).await;
479
480 match get_provider(&app.providers, Network::Ln) {
481 Some(p) => match p.create_ln_wallet(request).await {
482 Ok(info) => {
483 #[cfg(feature = "redb")]
484 if let Some(store) = app.store.as_ref() {
485 if let Ok(meta) = wallet::load_wallet_metadata(
486 &app.config.read().await.data_dir,
487 &info.id,
488 ) {
489 let _ = store.save_wallet_metadata(&meta);
490 }
491 }
492 let _ = app
493 .writer
494 .send(Output::WalletCreated {
495 id,
496 wallet: info.id,
497 network: info.network,
498 address: info.address,
499 mnemonic: None,
500 trace: trace_from(start),
501 })
502 .await;
503 }
504 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
505 },
506 None => {
507 emit_error(
508 &app.writer,
509 Some(id),
510 &PayError::NotImplemented("no provider for ln".to_string()),
511 start,
512 )
513 .await;
514 }
515 }
516 }
517
518 Input::WalletClose {
519 id,
520 wallet,
521 dangerously_skip_balance_check_and_may_lose_money,
522 } => {
523 let start = Instant::now();
524 emit_log(
525 app,
526 "wallet",
527 Some(id.clone()),
528 serde_json::json!({
529 "operation": "wallet_close",
530 "wallet": &wallet,
531 "dangerously_skip_balance_check_and_may_lose_money": dangerously_skip_balance_check_and_may_lose_money,
532 }),
533 )
534 .await;
535 let close_result = if dangerously_skip_balance_check_and_may_lose_money {
536 match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
537 Ok(_) => require_store(app).and_then(|s| s.delete_wallet_metadata(&wallet)).map(|_| ()),
538 Err(PayError::WalletNotFound(_)) => Err(PayError::WalletNotFound(format!(
539 "wallet {wallet} not found locally; dangerous skip balance check only supports local wallets"
540 ))),
541 Err(error) => Err(error),
542 }
543 } else {
544 match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
545 Ok(meta) => match get_provider(&app.providers, meta.network) {
546 Some(provider) => provider.close_wallet(&wallet).await,
547 None => Err(PayError::NotImplemented(format!(
548 "no provider for {}",
549 meta.network
550 ))),
551 },
552 Err(PayError::WalletNotFound(_)) => {
553 try_provider!(&app.providers, |p| p.close_wallet(&wallet))
555 }
556 Err(error) => Err(error),
557 }
558 };
559
560 match close_result {
561 Ok(()) => {
562 let _ = app
563 .writer
564 .send(Output::WalletClosed {
565 id,
566 wallet,
567 trace: trace_from(start),
568 })
569 .await;
570 }
571 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
572 }
573 }
574
575 Input::WalletList { id, network } => {
576 let start = Instant::now();
577 emit_log(
578 app,
579 "wallet",
580 Some(id.clone()),
581 serde_json::json!({
582 "operation": "wallet_list",
583 "network": network.map(|c| c.to_string()).unwrap_or_else(|| "all".to_string()),
584 }),
585 )
586 .await;
587 if let Some(network) = network {
588 match get_provider(&app.providers, network) {
589 Some(p) => match p.list_wallets().await {
590 Ok(wallets) => {
591 let _ = app
592 .writer
593 .send(Output::WalletList {
594 id,
595 wallets,
596 trace: trace_from(start),
597 })
598 .await;
599 }
600 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
601 },
602 None => {
603 emit_error(
604 &app.writer,
605 Some(id),
606 &PayError::NotImplemented(format!("no provider for {network}")),
607 start,
608 )
609 .await;
610 }
611 }
612 } else {
613 let wallets = collect_all!(&app.providers, |p| p.list_wallets());
614 match wallets {
615 Ok(all) => {
616 let _ = app
617 .writer
618 .send(Output::WalletList {
619 id,
620 wallets: all,
621 trace: trace_from(start),
622 })
623 .await;
624 }
625 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
626 }
627 }
628 }
629
630 Input::Balance {
631 id,
632 wallet,
633 network,
634 check,
635 } => {
636 let start = Instant::now();
637 emit_log(
638 app,
639 "wallet",
640 Some(id.clone()),
641 serde_json::json!({
642 "operation": "balance", "wallet": wallet.as_deref().unwrap_or("all"), "check": check,
643 }),
644 )
645 .await;
646 if let Some(wallet_id) = wallet {
647 let meta_opt = require_store(app)
648 .and_then(|s| s.load_wallet_metadata(&wallet_id))
649 .ok();
650 let result = if let Some(ref meta) = meta_opt {
651 match get_provider(&app.providers, meta.network) {
652 Some(provider) => {
653 if check {
654 match provider.check_balance(&wallet_id).await {
655 Err(PayError::NotImplemented(_)) => {
656 provider.balance(&wallet_id).await
657 }
658 other => other,
659 }
660 } else {
661 provider.balance(&wallet_id).await
662 }
663 }
664 None => Err(PayError::NotImplemented(format!(
665 "no provider for {}",
666 meta.network
667 ))),
668 }
669 } else {
670 if check {
672 try_provider!(&app.providers, |p| async {
673 match p.check_balance(&wallet_id).await {
674 Err(PayError::NotImplemented(_)) => p.balance(&wallet_id).await,
675 other => other,
676 }
677 })
678 } else {
679 try_provider!(&app.providers, |p| p.balance(&wallet_id))
680 }
681 };
682 match result {
683 Ok(balance) => {
684 let summary = if let Some(meta) = meta_opt {
685 wallet_summary_from_meta(&meta, &wallet_id)
686 } else {
687 resolve_wallet_summary(app, &wallet_id).await
688 };
689 let _ = app
690 .writer
691 .send(Output::WalletBalances {
692 id,
693 wallets: vec![WalletBalanceItem {
694 wallet: summary,
695 balance: Some(balance),
696 error: None,
697 }],
698 trace: trace_from(start),
699 })
700 .await;
701 }
702 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
703 }
704 } else {
705 match collect_all!(&app.providers, |p| p.balance_all()) {
706 Ok(wallets) => {
707 let filtered = if let Some(network) = network {
708 wallets
709 .into_iter()
710 .filter(|w| w.wallet.network == network)
711 .collect()
712 } else {
713 wallets
714 };
715 let _ = app
716 .writer
717 .send(Output::WalletBalances {
718 id,
719 wallets: filtered,
720 trace: trace_from(start),
721 })
722 .await;
723 }
724 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
725 }
726 }
727 }
728
729 Input::Receive {
730 id,
731 wallet,
732 network,
733 amount,
734 onchain_memo,
735 wait_until_paid,
736 wait_timeout_s,
737 wait_poll_interval_ms,
738 wait_sync_limit,
739 write_qr_svg_file: _,
740 min_confirmations,
741 } => {
742 let start = Instant::now();
743 let wait_requested = wait_until_paid
744 || wait_timeout_s.is_some()
745 || wait_poll_interval_ms.is_some()
746 || wait_sync_limit.is_some();
747 emit_log(
748 app,
749 "wallet",
750 Some(id.clone()),
751 serde_json::json!({
752 "operation": "receive",
753 "wallet": &wallet,
754 "network": network.map(|c| c.to_string()).unwrap_or_else(|| "auto".to_string()),
755 "amount": amount.as_ref().map(|a| a.value),
756 "onchain_memo": onchain_memo.as_deref().unwrap_or(""),
757 "wait_until_paid": wait_requested,
758 "wait_timeout_s": wait_timeout_s,
759 "wait_poll_interval_ms": wait_poll_interval_ms,
760 "wait_sync_limit": wait_sync_limit,
761 }),
762 )
763 .await;
764
765 let (target_network, wallet_for_call) = if wallet.trim().is_empty() {
766 let wallets = match require_store(app).and_then(|s| s.list_wallet_metadata(network))
767 {
768 Ok(v) => v,
769 Err(e) => {
770 emit_error(&app.writer, Some(id), &e, start).await;
771 return;
772 }
773 };
774 match wallets.len() {
775 0 => {
776 let msg = match network {
777 Some(network) => format!("no {network} wallet found"),
778 None => "no wallet found".to_string(),
779 };
780 emit_error(&app.writer, Some(id), &PayError::WalletNotFound(msg), start)
781 .await;
782 return;
783 }
784 1 => (wallets[0].network, wallets[0].id.clone()),
785 _ => {
786 let msg = match network {
787 Some(network) => {
788 format!("multiple {network} wallets found; pass --wallet")
789 }
790 None => "multiple wallets found; pass --wallet".to_string(),
791 };
792 emit_error(&app.writer, Some(id), &PayError::InvalidAmount(msg), start)
793 .await;
794 return;
795 }
796 }
797 } else {
798 let meta = match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
799 Ok(m) => m,
800 Err(e) => {
801 emit_error(&app.writer, Some(id), &e, start).await;
802 return;
803 }
804 };
805 if let Some(expected) = network {
806 if meta.network != expected {
807 emit_error(
808 &app.writer,
809 Some(id),
810 &PayError::InvalidAmount(format!(
811 "wallet {wallet} is {}, not {expected}",
812 meta.network
813 )),
814 start,
815 )
816 .await;
817 return;
818 }
819 }
820 (meta.network, wallet.clone())
821 };
822
823 let Some(provider) = get_provider(&app.providers, target_network) else {
824 emit_error(
825 &app.writer,
826 Some(id),
827 &PayError::NotImplemented(format!("no provider for {target_network}")),
828 start,
829 )
830 .await;
831 return;
832 };
833
834 match provider
835 .receive_info(&wallet_for_call, amount.clone())
836 .await
837 {
838 Ok(receive_info) => {
839 let quote_id = receive_info.quote_id.clone();
840 let _ = app
841 .writer
842 .send(Output::ReceiveInfo {
843 id: id.clone(),
844 wallet: wallet_for_call.clone(),
845 receive_info,
846 trace: trace_from(start),
847 })
848 .await;
849
850 if !wait_requested {
851 return;
852 }
853
854 let timeout_secs = wait_timeout_s.unwrap_or(DEFAULT_WAIT_TIMEOUT_SECS);
855 if timeout_secs == 0 {
856 emit_error(
857 &app.writer,
858 Some(id),
859 &PayError::InvalidAmount("wait_timeout_s must be >= 1".to_string()),
860 start,
861 )
862 .await;
863 return;
864 }
865 let poll_interval_ms =
866 wait_poll_interval_ms.unwrap_or(DEFAULT_WAIT_POLL_INTERVAL_MS);
867 if poll_interval_ms == 0 {
868 emit_error(
869 &app.writer,
870 Some(id),
871 &PayError::InvalidAmount(
872 "wait_poll_interval_ms must be >= 1".to_string(),
873 ),
874 start,
875 )
876 .await;
877 return;
878 }
879 let sync_limit = wait_sync_limit
880 .unwrap_or(DEFAULT_WAIT_SYNC_LIMIT)
881 .clamp(1, 5000);
882
883 if target_network == Network::Sol {
884 let memo_to_watch = onchain_memo
885 .as_deref()
886 .map(str::trim)
887 .filter(|text| !text.is_empty())
888 .map(str::to_owned);
889 let amount_to_watch = amount.as_ref().map(|a| a.value);
890
891 if memo_to_watch.is_none() && amount_to_watch.is_none() {
892 emit_error_hint(
893 &app.writer,
894 Some(id),
895 &PayError::InvalidAmount(
896 "sol receive --wait requires a match condition".to_string(),
897 ),
898 start,
899 Some("pass --onchain-memo or --amount"),
900 )
901 .await;
902 return;
903 }
904
905 let deadline = Instant::now() + Duration::from_secs(timeout_secs);
906 loop {
907 match provider.history_list(&wallet_for_call, 200, 0).await {
908 Ok(items) => {
909 let matched = items.into_iter().find(|item| {
910 if item.direction != Direction::Receive {
911 return false;
912 }
913 if let Some(ref m) = memo_to_watch {
914 item.onchain_memo.as_deref() == Some(m.as_str())
915 } else if let Some(expected) = amount_to_watch {
916 item.amount.value == expected
917 } else {
918 false
919 }
920 });
921 if let Some(item) = matched {
922 if let Some(min_conf) = min_confirmations {
924 match provider
925 .history_status(&item.transaction_id)
926 .await
927 {
928 Ok(status_info) => {
929 let confs = status_info
930 .confirmations
931 .unwrap_or_else(|| {
932 if status_info.status
933 == TxStatus::Confirmed
934 {
935 min_conf
936 } else {
937 0
938 }
939 });
940 if confs < min_conf {
941 if Instant::now() >= deadline {
943 let criteria = if let Some(ref m) =
944 memo_to_watch
945 {
946 format!("memo '{m}'")
947 } else {
948 format!(
949 "amount {}",
950 amount_to_watch.unwrap_or(0)
951 )
952 };
953 emit_error(
954 &app.writer,
955 Some(id),
956 &PayError::NetworkError(format!(
957 "wait timeout after {timeout_secs}s: sol transaction {tx} matching {criteria} has {confs}/{min_conf} confirmations",
958 tx = item.transaction_id,
959 )),
960 start,
961 )
962 .await;
963 break;
964 }
965 sleep(Duration::from_millis(
966 poll_interval_ms,
967 ))
968 .await;
969 continue;
970 }
971 let transaction_id =
973 item.transaction_id.clone();
974 let _ = app
975 .writer
976 .send(Output::HistoryStatus {
977 id,
978 transaction_id,
979 status: item.status,
980 confirmations: Some(confs),
981 preimage: item.preimage.clone(),
982 item: Some(item),
983 trace: trace_from(start),
984 })
985 .await;
986 break;
987 }
988 Err(e) if e.retryable() => {
989 sleep(Duration::from_millis(poll_interval_ms))
990 .await;
991 continue;
992 }
993 Err(e) => {
994 emit_error(&app.writer, Some(id), &e, start)
995 .await;
996 break;
997 }
998 }
999 } else {
1000 let transaction_id = item.transaction_id.clone();
1001 let _ = app
1002 .writer
1003 .send(Output::HistoryStatus {
1004 id,
1005 transaction_id,
1006 status: item.status,
1007 confirmations: None,
1008 preimage: item.preimage.clone(),
1009 item: Some(item),
1010 trace: trace_from(start),
1011 })
1012 .await;
1013 break;
1014 }
1015 }
1016 if Instant::now() >= deadline {
1017 let criteria = if let Some(ref m) = memo_to_watch {
1018 format!("memo '{m}'")
1019 } else {
1020 format!("amount {}", amount_to_watch.unwrap_or(0))
1021 };
1022 emit_error(
1023 &app.writer,
1024 Some(id),
1025 &PayError::NetworkError(format!(
1026 "wait timeout after {timeout_secs}s: no incoming sol transaction matching {criteria}"
1027 )),
1028 start,
1029 )
1030 .await;
1031 break;
1032 }
1033 sleep(Duration::from_millis(poll_interval_ms)).await;
1034 }
1035 Err(e) if e.retryable() => {
1036 if Instant::now() >= deadline {
1037 let criteria = if let Some(ref m) = memo_to_watch {
1038 format!("memo '{m}'")
1039 } else {
1040 format!("amount {}", amount_to_watch.unwrap_or(0))
1041 };
1042 emit_error(
1043 &app.writer,
1044 Some(id),
1045 &PayError::NetworkError(format!(
1046 "wait timeout after {timeout_secs}s: no incoming sol transaction matching {criteria}"
1047 )),
1048 start,
1049 )
1050 .await;
1051 break;
1052 }
1053 sleep(Duration::from_millis(poll_interval_ms)).await;
1054 }
1055 Err(e) => {
1056 emit_error(&app.writer, Some(id), &e, start).await;
1057 break;
1058 }
1059 }
1060 }
1061 return;
1062 }
1063
1064 if target_network == Network::Evm {
1066 let memo_to_watch = onchain_memo
1067 .as_deref()
1068 .map(str::trim)
1069 .filter(|text| !text.is_empty())
1070 .map(str::to_owned);
1071 let amount_to_watch = amount.as_ref().map(|a| a.value);
1072 let token_to_watch = amount.as_ref().map(|a| a.token.to_ascii_lowercase());
1073
1074 if amount_to_watch.is_none() {
1075 emit_error_hint(
1076 &app.writer,
1077 Some(id),
1078 &PayError::InvalidAmount(
1079 "evm receive --wait requires --amount".to_string(),
1080 ),
1081 start,
1082 Some("pass --amount"),
1083 )
1084 .await;
1085 return;
1086 }
1087 let wait_criteria = if let Some(ref memo) = memo_to_watch {
1088 format!("amount {} and memo '{memo}'", amount_to_watch.unwrap_or(0))
1089 } else {
1090 format!("amount {}", amount_to_watch.unwrap_or(0))
1091 };
1092
1093 let mut known_receive_ids: HashSet<String> =
1095 match provider.history_list(&wallet_for_call, 1000, 0).await {
1096 Ok(items) => items
1097 .into_iter()
1098 .filter(|item| item.direction == Direction::Receive)
1099 .map(|item| item.transaction_id)
1100 .collect(),
1101 Err(_) => HashSet::new(),
1102 };
1103
1104 let initial_balance = match provider.balance(&wallet_for_call).await {
1105 Ok(b) => b,
1106 Err(e) => {
1107 emit_error(&app.writer, Some(id), &e, start).await;
1108 return;
1109 }
1110 };
1111
1112 let deadline = Instant::now() + Duration::from_secs(timeout_secs);
1113 'evm_wait: loop {
1114 sleep(Duration::from_millis(poll_interval_ms)).await;
1115 if Instant::now() >= deadline {
1116 emit_error(
1117 &app.writer,
1118 Some(id),
1119 &PayError::NetworkError(format!(
1120 "wait timeout after {timeout_secs}s: no incoming evm deposit matching {wait_criteria}"
1121 )),
1122 start,
1123 )
1124 .await;
1125 break;
1126 }
1127
1128 let current = match provider.balance(&wallet_for_call).await {
1129 Ok(current) => current,
1130 Err(e) if e.retryable() => continue,
1131 Err(e) => {
1132 emit_error(&app.writer, Some(id), &e, start).await;
1133 break;
1134 }
1135 };
1136
1137 let native_increase =
1138 current.confirmed.saturating_sub(initial_balance.confirmed);
1139 let token_increase =
1140 current.additional.iter().find_map(|(key, &cur)| {
1141 let init =
1142 initial_balance.additional.get(key).copied().unwrap_or(0);
1143 (cur > init).then_some((key.clone(), cur - init))
1144 });
1145 if native_increase == 0 && token_increase.is_none() {
1146 continue;
1147 }
1148
1149 let observed_value = token_increase
1150 .as_ref()
1151 .map(|(_, delta)| *delta)
1152 .unwrap_or(native_increase);
1153 if let Some(expected) = amount_to_watch {
1154 if observed_value != expected {
1155 continue;
1156 }
1157 }
1158
1159 match provider.history_sync(&wallet_for_call, sync_limit).await {
1160 Ok(_)
1161 | Err(PayError::NotImplemented(_))
1162 | Err(PayError::WalletNotFound(_)) => {}
1163 Err(e) if e.retryable() => continue,
1164 Err(e) => {
1165 emit_error(&app.writer, Some(id), &e, start).await;
1166 break;
1167 }
1168 }
1169
1170 let recent = match provider
1171 .history_list(&wallet_for_call, sync_limit, 0)
1172 .await
1173 {
1174 Ok(items) => items,
1175 Err(e) if e.retryable() => continue,
1176 Err(e) => {
1177 emit_error(&app.writer, Some(id), &e, start).await;
1178 break;
1179 }
1180 };
1181
1182 let mut matched: Option<HistoryRecord> = None;
1183 let mut memo_lookup_error: Option<PayError> = None;
1184 for item in recent.into_iter() {
1185 if item.direction != Direction::Receive {
1186 continue;
1187 }
1188 if known_receive_ids.contains(&item.transaction_id) {
1189 continue;
1190 }
1191 if let Some(expected) = amount_to_watch {
1192 if item.amount.value != expected {
1193 continue;
1194 }
1195 }
1196 if let Some(expected_token) = token_to_watch.as_deref() {
1197 if !evm_receive_token_matches(
1198 expected_token,
1199 &item.amount.token,
1200 ) {
1201 continue;
1202 }
1203 }
1204 if let Some(expected_memo) = memo_to_watch.as_deref() {
1205 let mut memo_matches =
1206 item.onchain_memo.as_deref() == Some(expected_memo);
1207 if !memo_matches {
1208 match provider
1209 .history_onchain_memo(
1210 &wallet_for_call,
1211 &item.transaction_id,
1212 )
1213 .await
1214 {
1215 Ok(Some(chain_memo)) => {
1216 memo_matches = chain_memo == expected_memo;
1217 }
1218 Ok(None)
1219 | Err(PayError::NotImplemented(_))
1220 | Err(PayError::WalletNotFound(_)) => {}
1221 Err(e) if e.retryable() => continue 'evm_wait,
1222 Err(e) => {
1223 memo_lookup_error = Some(e);
1224 break;
1225 }
1226 }
1227 }
1228 if !memo_matches {
1229 continue;
1230 }
1231 }
1232 matched = Some(item);
1233 break;
1234 }
1235 if let Some(e) = memo_lookup_error {
1236 emit_error(&app.writer, Some(id), &e, start).await;
1237 break;
1238 }
1239 let Some(item) = matched else {
1240 continue;
1241 };
1242
1243 known_receive_ids.insert(item.transaction_id.clone());
1244 if let Some(min_conf) = min_confirmations {
1245 loop {
1246 match provider.history_status(&item.transaction_id).await {
1247 Ok(status_info) => {
1248 let confs =
1249 status_info.confirmations.unwrap_or_else(|| {
1250 if status_info.status == TxStatus::Confirmed {
1251 min_conf
1252 } else {
1253 0
1254 }
1255 });
1256 if confs >= min_conf {
1257 let _ = app
1258 .writer
1259 .send(Output::HistoryStatus {
1260 id,
1261 transaction_id: status_info.transaction_id,
1262 status: status_info.status,
1263 confirmations: Some(confs),
1264 preimage: status_info.preimage,
1265 item: status_info.item.or(Some(item)),
1266 trace: trace_from(start),
1267 })
1268 .await;
1269 break 'evm_wait;
1270 }
1271 if Instant::now() >= deadline {
1272 emit_error(
1273 &app.writer,
1274 Some(id),
1275 &PayError::NetworkError(format!(
1276 "wait timeout after {timeout_secs}s: evm transaction {tx} matching {wait_criteria} has {confs}/{min_conf} confirmations",
1277 tx = item.transaction_id
1278 )),
1279 start,
1280 )
1281 .await;
1282 break 'evm_wait;
1283 }
1284 sleep(Duration::from_millis(poll_interval_ms)).await;
1285 }
1286 Err(e) if e.retryable() => {
1287 if Instant::now() >= deadline {
1288 emit_error(&app.writer, Some(id), &e, start).await;
1289 break 'evm_wait;
1290 }
1291 sleep(Duration::from_millis(poll_interval_ms)).await;
1292 }
1293 Err(e) => {
1294 emit_error(&app.writer, Some(id), &e, start).await;
1295 break 'evm_wait;
1296 }
1297 }
1298 }
1299 } else {
1300 match provider.history_status(&item.transaction_id).await {
1301 Ok(status_info) => {
1302 let _ = app
1303 .writer
1304 .send(Output::HistoryStatus {
1305 id,
1306 transaction_id: status_info.transaction_id,
1307 status: status_info.status,
1308 confirmations: status_info.confirmations,
1309 preimage: status_info.preimage,
1310 item: status_info.item.or(Some(item)),
1311 trace: trace_from(start),
1312 })
1313 .await;
1314 break;
1315 }
1316 Err(e) if e.retryable() => continue,
1317 Err(e) => {
1318 emit_error(&app.writer, Some(id), &e, start).await;
1319 break;
1320 }
1321 }
1322 }
1323 }
1324 return;
1325 }
1326
1327 if target_network == Network::Btc {
1329 let amount_to_watch = amount.as_ref().map(|a| a.value).filter(|v| *v > 0);
1330 let mut known_receive_ids: HashSet<String> =
1331 match provider.history_list(&wallet_for_call, 1000, 0).await {
1332 Ok(items) => items
1333 .into_iter()
1334 .filter(|item| item.direction == Direction::Receive)
1335 .map(|item| item.transaction_id)
1336 .collect(),
1337 Err(_) => HashSet::new(),
1338 };
1339 let initial_balance = match provider.balance(&wallet_for_call).await {
1340 Ok(b) => b,
1341 Err(e) => {
1342 emit_error(&app.writer, Some(id), &e, start).await;
1343 return;
1344 }
1345 };
1346
1347 let deadline = Instant::now() + Duration::from_secs(timeout_secs);
1348 loop {
1349 sleep(Duration::from_millis(poll_interval_ms)).await;
1350 if Instant::now() >= deadline {
1351 let criteria = if let Some(expected) = amount_to_watch {
1352 format!("amount {expected}")
1353 } else {
1354 "any incoming amount".to_string()
1355 };
1356 emit_error(
1357 &app.writer,
1358 Some(id),
1359 &PayError::NetworkError(format!(
1360 "wait timeout after {timeout_secs}s: no incoming btc transaction matching {criteria}"
1361 )),
1362 start,
1363 )
1364 .await;
1365 break;
1366 }
1367
1368 let current = match provider.balance(&wallet_for_call).await {
1369 Ok(current) => current,
1370 Err(e) if e.retryable() => continue,
1371 Err(e) => {
1372 emit_error(&app.writer, Some(id), &e, start).await;
1373 break;
1374 }
1375 };
1376 let confirmed_delta =
1377 current.confirmed.saturating_sub(initial_balance.confirmed);
1378 let pending_delta =
1379 current.pending.saturating_sub(initial_balance.pending);
1380 let observed_delta = confirmed_delta.saturating_add(pending_delta);
1381 if observed_delta == 0 {
1382 continue;
1383 }
1384 if let Some(expected) = amount_to_watch {
1385 if observed_delta != expected {
1386 continue;
1387 }
1388 }
1389
1390 match provider.history_sync(&wallet_for_call, sync_limit).await {
1391 Ok(_)
1392 | Err(PayError::NotImplemented(_))
1393 | Err(PayError::WalletNotFound(_)) => {}
1394 Err(e) if e.retryable() => continue,
1395 Err(e) => {
1396 emit_error(&app.writer, Some(id), &e, start).await;
1397 break;
1398 }
1399 }
1400
1401 let recent = match provider
1402 .history_list(&wallet_for_call, sync_limit, 0)
1403 .await
1404 {
1405 Ok(items) => items,
1406 Err(e) if e.retryable() => continue,
1407 Err(e) => {
1408 emit_error(&app.writer, Some(id), &e, start).await;
1409 break;
1410 }
1411 };
1412
1413 let matched = recent.into_iter().find(|item| {
1414 if item.direction != Direction::Receive {
1415 return false;
1416 }
1417 if known_receive_ids.contains(&item.transaction_id) {
1418 return false;
1419 }
1420 if let Some(expected) = amount_to_watch {
1421 if item.amount.value != expected {
1422 return false;
1423 }
1424 }
1425 true
1426 });
1427
1428 let Some(item) = matched else {
1429 continue;
1430 };
1431
1432 known_receive_ids.insert(item.transaction_id.clone());
1433 match provider.history_status(&item.transaction_id).await {
1434 Ok(status_info) => {
1435 let _ = app
1436 .writer
1437 .send(Output::HistoryStatus {
1438 id,
1439 transaction_id: status_info.transaction_id,
1440 status: status_info.status,
1441 confirmations: status_info.confirmations,
1442 preimage: status_info.preimage,
1443 item: status_info.item.or(Some(item)),
1444 trace: trace_from(start),
1445 })
1446 .await;
1447 break;
1448 }
1449 Err(e) if e.retryable() => continue,
1450 Err(e) => {
1451 emit_error(&app.writer, Some(id), &e, start).await;
1452 break;
1453 }
1454 }
1455 }
1456 return;
1457 }
1458
1459 let Some(quote_id) = quote_id else {
1460 emit_error(
1461 &app.writer,
1462 Some(id),
1463 &PayError::InternalError(
1464 "deposit response missing quote_id/payment_hash".to_string(),
1465 ),
1466 start,
1467 )
1468 .await;
1469 return;
1470 };
1471
1472 let deadline = Instant::now() + Duration::from_secs(timeout_secs);
1473 loop {
1474 match provider.receive_claim(&wallet_for_call, "e_id).await {
1475 Ok(claimed) => {
1476 let _ = app
1477 .writer
1478 .send(Output::ReceiveClaimed {
1479 id,
1480 wallet: wallet_for_call.clone(),
1481 amount: Amount {
1482 value: claimed,
1483 token: "sats".to_string(),
1484 },
1485 trace: trace_from(start),
1486 })
1487 .await;
1488 break;
1489 }
1490 Err(e) if e.retryable() => {
1491 if Instant::now() >= deadline {
1492 emit_error(
1493 &app.writer,
1494 Some(id),
1495 &PayError::NetworkError(format!(
1496 "wait-until-paid timeout after {timeout_secs}s"
1497 )),
1498 start,
1499 )
1500 .await;
1501 break;
1502 }
1503 sleep(Duration::from_millis(poll_interval_ms)).await;
1504 }
1505 Err(e) => {
1506 emit_error(&app.writer, Some(id), &e, start).await;
1507 break;
1508 }
1509 }
1510 }
1511 }
1512 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
1513 }
1514 }
1515
1516 Input::ReceiveClaim {
1517 id,
1518 wallet,
1519 quote_id,
1520 } => {
1521 let start = Instant::now();
1522 emit_log(
1523 app,
1524 "wallet",
1525 Some(id.clone()),
1526 serde_json::json!({
1527 "operation": "receive_claim", "wallet": &wallet, "quote_id": "e_id,
1528 }),
1529 )
1530 .await;
1531 let meta = match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
1532 Ok(m) => m,
1533 Err(e) => {
1534 emit_error(&app.writer, Some(id), &e, start).await;
1535 return;
1536 }
1537 };
1538 let Some(provider) = get_provider(&app.providers, meta.network) else {
1539 emit_error(
1540 &app.writer,
1541 Some(id),
1542 &PayError::NotImplemented(format!("no provider for {}", meta.network)),
1543 start,
1544 )
1545 .await;
1546 return;
1547 };
1548
1549 match provider.receive_claim(&wallet, "e_id).await {
1550 Ok(claimed) => {
1551 let _ = app
1552 .writer
1553 .send(Output::ReceiveClaimed {
1554 id,
1555 wallet,
1556 amount: Amount {
1557 value: claimed,
1558 token: "sats".to_string(),
1559 },
1560 trace: trace_from(start),
1561 })
1562 .await;
1563 }
1564 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
1565 }
1566 }
1567
1568 Input::CashuSend {
1569 id,
1570 wallet,
1571 amount,
1572 onchain_memo,
1573 local_memo,
1574 mints,
1575 } => {
1576 let start = Instant::now();
1577 emit_log(
1578 app,
1579 "pay",
1580 Some(id.clone()),
1581 serde_json::json!({
1582 "operation": "cashu_send", "wallet": wallet.as_deref().unwrap_or("auto"),
1583 "amount": amount.value, "onchain_memo": onchain_memo.as_deref().unwrap_or(""),
1584 "mints": mints.as_deref().unwrap_or(&[]),
1585 }),
1586 )
1587 .await;
1588
1589 let wallet_str = wallet.unwrap_or_default();
1590 let mints_ref = mints.as_deref();
1591 let Some(provider) = get_provider(&app.providers, Network::Cashu) else {
1592 emit_error(
1593 &app.writer,
1594 Some(id),
1595 &PayError::NotImplemented("no provider for cashu".to_string()),
1596 start,
1597 )
1598 .await;
1599 return;
1600 };
1601
1602 let mut reservation_id: Option<u64> = None;
1603 if app.enforce_limits {
1604 let spend_ctx = SpendContext {
1605 network: "cashu".to_string(),
1606 wallet: if wallet_str.is_empty() {
1607 None
1608 } else {
1609 Some(wallet_str.clone())
1610 },
1611 amount_native: amount.value,
1612 token: None,
1613 };
1614 match app
1615 .spend_ledger
1616 .reserve(&format!("cashu_send:{id}"), &spend_ctx)
1617 .await
1618 {
1619 Ok(rid) => reservation_id = Some(rid),
1620 Err(e) => {
1621 if let PayError::LimitExceeded {
1622 rule_id,
1623 scope,
1624 scope_key,
1625 spent,
1626 max_spend,
1627 token,
1628 remaining_s,
1629 origin,
1630 } = &e
1631 {
1632 let _ = app
1633 .writer
1634 .send(Output::LimitExceeded {
1635 id,
1636 rule_id: rule_id.clone(),
1637 scope: *scope,
1638 scope_key: scope_key.clone(),
1639 spent: *spent,
1640 max_spend: *max_spend,
1641 token: token.clone(),
1642 remaining_s: *remaining_s,
1643 origin: origin.clone(),
1644 trace: trace_from(start),
1645 })
1646 .await;
1647 } else {
1648 emit_error(&app.writer, Some(id), &e, start).await;
1649 }
1650 return;
1651 }
1652 }
1653 }
1654
1655 match provider
1656 .cashu_send(
1657 &wallet_str,
1658 amount.clone(),
1659 onchain_memo.as_deref(),
1660 mints_ref,
1661 )
1662 .await
1663 {
1664 Ok(r) => {
1665 if let Some(rid) = reservation_id {
1666 let _ = app.spend_ledger.confirm(rid).await;
1667 }
1668 if local_memo.is_some() {
1669 if let Some(s) = &app.store {
1670 let _ = s.update_transaction_record_memo(
1671 &r.transaction_id,
1672 local_memo.as_ref(),
1673 );
1674 }
1675 }
1676 let _ = app
1677 .writer
1678 .send(Output::CashuSent {
1679 id,
1680 wallet: r.wallet,
1681 transaction_id: r.transaction_id,
1682 status: r.status,
1683 fee: r.fee,
1684 token: r.token,
1685 trace: trace_from(start),
1686 })
1687 .await;
1688 }
1689 Err(e) => {
1690 if let Some(rid) = reservation_id {
1691 let _ = app.spend_ledger.cancel(rid).await;
1692 }
1693 emit_error(&app.writer, Some(id), &e, start).await
1694 }
1695 }
1696 }
1697
1698 Input::CashuReceive { id, wallet, token } => {
1699 let start = Instant::now();
1700 let token_preview = if token.len() > 20 {
1701 format!("{}...", &token[..20])
1702 } else {
1703 token.clone()
1704 };
1705 emit_log(
1706 app,
1707 "pay",
1708 Some(id.clone()),
1709 serde_json::json!({
1710 "operation": "cashu_receive", "wallet": wallet.as_deref().unwrap_or("auto"), "token": token_preview,
1711 }),
1712 )
1713 .await;
1714 let wallet_str = wallet.unwrap_or_default();
1715 let Some(provider) = get_provider(&app.providers, Network::Cashu) else {
1716 emit_error(
1717 &app.writer,
1718 Some(id),
1719 &PayError::NotImplemented("no provider for cashu".to_string()),
1720 start,
1721 )
1722 .await;
1723 return;
1724 };
1725 match provider.cashu_receive(&wallet_str, &token).await {
1726 Ok(r) => {
1727 let _ = app
1728 .writer
1729 .send(Output::CashuReceived {
1730 id,
1731 wallet: r.wallet,
1732 amount: r.amount,
1733 trace: trace_from(start),
1734 })
1735 .await;
1736 }
1737 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
1738 }
1739 }
1740
1741 Input::Send {
1742 id,
1743 wallet,
1744 network,
1745 to,
1746 onchain_memo,
1747 local_memo,
1748 mints,
1749 } => {
1750 let start = Instant::now();
1751 let operation_name = "send";
1752 let to_preview = if to.len() > 20 {
1753 format!("{}...", &to[..20])
1754 } else {
1755 to.clone()
1756 };
1757 emit_log(
1758 app,
1759 "pay",
1760 Some(id.clone()),
1761 serde_json::json!({
1762 "operation": operation_name, "wallet": wallet.as_deref().unwrap_or("auto"),
1763 "network": network.map(|c| c.to_string()).unwrap_or_else(|| "auto".to_string()),
1764 "to": to_preview, "onchain_memo": onchain_memo.as_deref().unwrap_or(""),
1765 }),
1766 )
1767 .await;
1768
1769 let (target_network, wallet_for_call) = if let Some(w) =
1770 wallet.filter(|w| !w.is_empty())
1771 {
1772 let meta = match require_store(app).and_then(|s| s.load_wallet_metadata(&w)) {
1773 Ok(m) => m,
1774 Err(e) => {
1775 emit_error(&app.writer, Some(id), &e, start).await;
1776 return;
1777 }
1778 };
1779 if let Some(expected) = network {
1780 if meta.network != expected {
1781 emit_error(
1782 &app.writer,
1783 Some(id),
1784 &PayError::InvalidAmount(format!(
1785 "wallet {w} is {}, not {expected}",
1786 meta.network
1787 )),
1788 start,
1789 )
1790 .await;
1791 return;
1792 }
1793 }
1794 (meta.network, w)
1795 } else {
1796 let wallets = match require_store(app).and_then(|s| s.list_wallet_metadata(network))
1797 {
1798 Ok(v) => v,
1799 Err(e) => {
1800 emit_error(&app.writer, Some(id), &e, start).await;
1801 return;
1802 }
1803 };
1804
1805 let is_cashu = matches!(network, Some(Network::Cashu));
1808 let filtered: Vec<_> = if is_cashu {
1809 if let Some(ref mint_list) = mints {
1810 wallets
1811 .into_iter()
1812 .filter(|w| {
1813 w.mint_url.as_deref().is_some_and(|u| {
1814 let nu = u.trim().trim_end_matches('/');
1815 mint_list
1816 .iter()
1817 .any(|m| m.trim().trim_end_matches('/') == nu)
1818 })
1819 })
1820 .collect()
1821 } else {
1822 wallets
1823 }
1824 } else {
1825 wallets
1826 };
1827
1828 match filtered.len() {
1829 0 => {
1830 let msg = if mints.is_some() {
1831 "no cashu wallet found matching --cashu-mint".to_string()
1832 } else {
1833 match network {
1834 Some(network) => format!("no {network} wallet found"),
1835 None => "no wallet found".to_string(),
1836 }
1837 };
1838 emit_error(&app.writer, Some(id), &PayError::WalletNotFound(msg), start)
1839 .await;
1840 return;
1841 }
1842 1 => (filtered[0].network, filtered[0].id.clone()),
1843 _ if is_cashu => {
1844 (Network::Cashu, String::new())
1847 }
1848 _ => {
1849 let msg = match network {
1850 Some(network) => {
1851 format!("multiple {network} wallets found; pass --wallet")
1852 }
1853 None => "multiple wallets found; pass --wallet".to_string(),
1854 };
1855 emit_error(&app.writer, Some(id), &PayError::InvalidAmount(msg), start)
1856 .await;
1857 return;
1858 }
1859 }
1860 };
1861
1862 let Some(provider) = get_provider(&app.providers, target_network) else {
1863 emit_error(
1864 &app.writer,
1865 Some(id),
1866 &PayError::NotImplemented(format!("no provider for {target_network}")),
1867 start,
1868 )
1869 .await;
1870 return;
1871 };
1872
1873 let mut reservation_id: Option<u64> = None;
1874 if app.enforce_limits {
1875 let quote = match provider
1876 .send_quote(&wallet_for_call, &to, mints.as_deref())
1877 .await
1878 {
1879 Ok(q) => q,
1880 Err(e) => {
1881 emit_error(&app.writer, Some(id), &e, start).await;
1882 return;
1883 }
1884 };
1885 let spend_amount = quote.amount_native + quote.fee_estimate_native;
1886 let provider_key = require_store(app)
1887 .and_then(|s| s.load_wallet_metadata("e.wallet))
1888 .ok()
1889 .map(|meta| wallet_provider_key(&meta))
1890 .unwrap_or_else(|| target_network.to_string());
1891 let spend_ctx = SpendContext {
1892 network: provider_key,
1893 wallet: Some(quote.wallet.clone()),
1894 amount_native: spend_amount,
1895 token: extract_token_from_target(&to),
1896 };
1897 match app
1898 .spend_ledger
1899 .reserve(&format!("send:{id}"), &spend_ctx)
1900 .await
1901 {
1902 Ok(rid) => reservation_id = Some(rid),
1903 Err(e) => {
1904 if let PayError::LimitExceeded {
1905 rule_id,
1906 scope,
1907 scope_key,
1908 spent,
1909 max_spend,
1910 token,
1911 remaining_s,
1912 origin,
1913 } = &e
1914 {
1915 let _ = app
1916 .writer
1917 .send(Output::LimitExceeded {
1918 id,
1919 rule_id: rule_id.clone(),
1920 scope: *scope,
1921 scope_key: scope_key.clone(),
1922 spent: *spent,
1923 max_spend: *max_spend,
1924 token: token.clone(),
1925 remaining_s: *remaining_s,
1926 origin: origin.clone(),
1927 trace: trace_from(start),
1928 })
1929 .await;
1930 } else {
1931 emit_error(&app.writer, Some(id), &e, start).await;
1932 }
1933 return;
1934 }
1935 }
1936 }
1937 match provider
1938 .send(
1939 &wallet_for_call,
1940 &to,
1941 onchain_memo.as_deref(),
1942 mints.as_deref(),
1943 )
1944 .await
1945 {
1946 Ok(r) => {
1947 if let Some(rid) = reservation_id {
1948 let _ = app.spend_ledger.confirm(rid).await;
1949 }
1950 if local_memo.is_some() {
1951 if let Some(s) = &app.store {
1952 let _ = s.update_transaction_record_memo(
1953 &r.transaction_id,
1954 local_memo.as_ref(),
1955 );
1956 }
1957 }
1958 let _ = app
1959 .writer
1960 .send(Output::Sent {
1961 id,
1962 wallet: r.wallet,
1963 transaction_id: r.transaction_id,
1964 amount: r.amount,
1965 fee: r.fee,
1966 preimage: r.preimage,
1967 trace: trace_from(start),
1968 })
1969 .await;
1970 }
1971 Err(e) => {
1972 if let Some(rid) = reservation_id {
1973 let _ = app.spend_ledger.cancel(rid).await;
1974 }
1975 emit_error(&app.writer, Some(id), &e, start).await
1976 }
1977 }
1978 }
1979
1980 Input::Restore { id, wallet } => {
1981 let start = Instant::now();
1982 emit_log(
1983 app,
1984 "wallet",
1985 Some(id.clone()),
1986 serde_json::json!({
1987 "operation": "restore", "wallet": &wallet,
1988 }),
1989 )
1990 .await;
1991 match try_provider!(&app.providers, |p| p.restore(&wallet)) {
1992 Ok(r) => {
1993 let _ = app
1994 .writer
1995 .send(Output::Restored {
1996 id,
1997 wallet: r.wallet,
1998 unspent: r.unspent,
1999 spent: r.spent,
2000 pending: r.pending,
2001 unit: r.unit,
2002 trace: trace_from(start),
2003 })
2004 .await;
2005 }
2006 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2007 }
2008 }
2009
2010 Input::WalletShowSeed { id, wallet } => {
2011 let start = Instant::now();
2012 emit_log(
2013 app,
2014 "wallet",
2015 Some(id.clone()),
2016 serde_json::json!({
2017 "operation": "wallet_show_seed", "wallet": &wallet,
2018 }),
2019 )
2020 .await;
2021 match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
2022 Ok(meta) => match meta.network {
2023 Network::Cashu => match meta.seed_secret {
2024 Some(mnemonic) => {
2025 let _ = app
2026 .writer
2027 .send(Output::WalletSeed {
2028 id,
2029 wallet,
2030 mnemonic_secret: mnemonic,
2031 trace: trace_from(start),
2032 })
2033 .await;
2034 }
2035 None => {
2036 emit_error(
2037 &app.writer,
2038 Some(id),
2039 &PayError::InternalError("wallet has no seed".to_string()),
2040 start,
2041 )
2042 .await;
2043 }
2044 },
2045 Network::Sol => match meta.seed_secret {
2046 Some(secret) => {
2047 if looks_like_bip39_mnemonic(&secret) {
2048 let _ = app
2049 .writer
2050 .send(Output::WalletSeed {
2051 id,
2052 wallet,
2053 mnemonic_secret: secret,
2054 trace: trace_from(start),
2055 })
2056 .await;
2057 } else {
2058 emit_error(
2059 &app.writer,
2060 Some(id),
2061 &PayError::InvalidAmount(
2062 "this sol wallet was created before mnemonic support; create a new sol wallet to get 12-word backup".to_string(),
2063 ),
2064 start,
2065 )
2066 .await;
2067 }
2068 }
2069 None => {
2070 emit_error(
2071 &app.writer,
2072 Some(id),
2073 &PayError::InternalError("wallet has no seed".to_string()),
2074 start,
2075 )
2076 .await;
2077 }
2078 },
2079 Network::Ln => {
2080 emit_error(
2081 &app.writer,
2082 Some(id),
2083 &PayError::InvalidAmount(
2084 "ln wallets do not have mnemonic words; they store backend credentials (nwc-uri/password/admin-key)".to_string(),
2085 ),
2086 start,
2087 )
2088 .await;
2089 }
2090 Network::Evm | Network::Btc => match meta.seed_secret {
2091 Some(secret) => {
2092 if looks_like_bip39_mnemonic(&secret) {
2093 let _ = app
2094 .writer
2095 .send(Output::WalletSeed {
2096 id,
2097 wallet,
2098 mnemonic_secret: secret,
2099 trace: trace_from(start),
2100 })
2101 .await;
2102 } else {
2103 emit_error(
2104 &app.writer,
2105 Some(id),
2106 &PayError::InvalidAmount(
2107 "this wallet was created before mnemonic support; create a new wallet to get 12-word backup".to_string(),
2108 ),
2109 start,
2110 )
2111 .await;
2112 }
2113 }
2114 None => {
2115 emit_error(
2116 &app.writer,
2117 Some(id),
2118 &PayError::InternalError("wallet has no seed".to_string()),
2119 start,
2120 )
2121 .await;
2122 }
2123 },
2124 },
2125 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2126 }
2127 }
2128
2129 Input::HistoryList {
2130 id,
2131 wallet,
2132 network,
2133 onchain_memo,
2134 limit,
2135 offset,
2136 since_epoch_s,
2137 until_epoch_s,
2138 } => {
2139 let start = Instant::now();
2140 let lim = limit.unwrap_or(20);
2141 let off = offset.unwrap_or(0);
2142 let memo_filter = onchain_memo
2143 .as_deref()
2144 .map(str::trim)
2145 .filter(|value| !value.is_empty())
2146 .map(str::to_owned);
2147 let store = match require_store(app) {
2148 Ok(store) => store,
2149 Err(e) => {
2150 emit_error(&app.writer, Some(id), &e, start).await;
2151 return;
2152 }
2153 };
2154
2155 let mut all_txs = Vec::new();
2156 if let Some(wallet_id) = wallet {
2157 let meta = match store.load_wallet_metadata(&wallet_id) {
2158 Ok(meta) => meta,
2159 Err(e) => {
2160 emit_error(&app.writer, Some(id), &e, start).await;
2161 return;
2162 }
2163 };
2164 if let Some(expected_network) = network {
2165 if meta.network != expected_network {
2166 let _ = app
2167 .writer
2168 .send(Output::History {
2169 id,
2170 items: Vec::new(),
2171 trace: trace_from(start),
2172 })
2173 .await;
2174 return;
2175 }
2176 }
2177 match store.load_wallet_transaction_records(&wallet_id) {
2178 Ok(mut records) => all_txs.append(&mut records),
2179 Err(e) => {
2180 emit_error(&app.writer, Some(id), &e, start).await;
2181 return;
2182 }
2183 }
2184 } else {
2185 let wallets = match store.list_wallet_metadata(network) {
2186 Ok(wallets) => wallets,
2187 Err(e) => {
2188 emit_error(&app.writer, Some(id), &e, start).await;
2189 return;
2190 }
2191 };
2192 for wallet_meta in wallets {
2193 match store.load_wallet_transaction_records(&wallet_meta.id) {
2194 Ok(mut records) => all_txs.append(&mut records),
2195 Err(e) => {
2196 emit_error(&app.writer, Some(id.clone()), &e, start).await;
2197 return;
2198 }
2199 }
2200 }
2201 }
2202
2203 if let Some(expected_network) = network {
2204 all_txs.retain(|item| item.network == expected_network);
2205 }
2206 if let Some(since) = since_epoch_s {
2207 all_txs.retain(|item| item.created_at_epoch_s >= since);
2208 }
2209 if let Some(until) = until_epoch_s {
2210 all_txs.retain(|item| item.created_at_epoch_s < until);
2211 }
2212 if let Some(filter) = memo_filter.as_deref() {
2213 all_txs.retain(|item| item.onchain_memo.as_deref() == Some(filter));
2214 }
2215 all_txs.sort_by(|a, b| b.created_at_epoch_s.cmp(&a.created_at_epoch_s));
2216 let start_idx = all_txs.len().min(off);
2217 let end_idx = all_txs.len().min(off.saturating_add(lim));
2218 let items = all_txs[start_idx..end_idx].to_vec();
2219 let _ = app
2220 .writer
2221 .send(Output::History {
2222 id,
2223 items,
2224 trace: trace_from(start),
2225 })
2226 .await;
2227 }
2228
2229 Input::HistoryStatus { id, transaction_id } => {
2230 let start = Instant::now();
2231 let mut routed: Option<Result<HistoryStatusInfo, PayError>> = None;
2232 for provider in app.providers.values() {
2233 match provider.history_status(&transaction_id).await {
2234 Ok(info) => {
2235 routed = Some(Ok(info));
2236 break;
2237 }
2238 Err(PayError::NotImplemented(_)) | Err(PayError::WalletNotFound(_)) => {}
2239 Err(err) => {
2240 routed = Some(Err(err));
2241 break;
2242 }
2243 }
2244 }
2245 match routed.unwrap_or_else(|| {
2246 Err(PayError::WalletNotFound(format!(
2247 "transaction {transaction_id} not found"
2248 )))
2249 }) {
2250 Ok(info) => {
2251 let _ = app
2252 .writer
2253 .send(Output::HistoryStatus {
2254 id,
2255 transaction_id: info.transaction_id,
2256 status: info.status,
2257 confirmations: info.confirmations,
2258 preimage: info.preimage,
2259 item: info.item,
2260 trace: trace_from(start),
2261 })
2262 .await;
2263 }
2264 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2265 }
2266 }
2267
2268 Input::HistoryUpdate {
2269 id,
2270 wallet,
2271 network,
2272 limit,
2273 } => {
2274 let start = Instant::now();
2275 if let Err(e) = require_store(app) {
2276 emit_error(&app.writer, Some(id), &e, start).await;
2277 return;
2278 }
2279
2280 let sync_limit = limit.unwrap_or(200).clamp(1, 5000);
2281 let mut totals = HistorySyncStats::default();
2282 let mut wallets_synced = 0usize;
2283
2284 if let Some(wallet_id) = wallet {
2285 let sync_result = if let Some(expected_network) = network {
2286 match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet_id)) {
2287 Ok(meta) if meta.network != expected_network => {
2288 Err(PayError::InvalidAmount(format!(
2289 "wallet {wallet_id} belongs to {}, not {expected_network}",
2290 meta.network
2291 )))
2292 }
2293 Ok(_) => match get_provider(&app.providers, expected_network) {
2294 Some(provider) => provider.history_sync(&wallet_id, sync_limit).await,
2295 None => Err(PayError::NotImplemented(format!(
2296 "network {expected_network} not enabled"
2297 ))),
2298 },
2299 Err(e) => Err(e),
2300 }
2301 } else {
2302 match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet_id)) {
2303 Ok(meta) => match get_provider(&app.providers, meta.network) {
2304 Some(provider) => provider.history_sync(&wallet_id, sync_limit).await,
2305 None => Err(PayError::NotImplemented(format!(
2306 "network {} not enabled",
2307 meta.network
2308 ))),
2309 },
2310 Err(e) => Err(e),
2311 }
2312 };
2313
2314 match sync_result {
2315 Ok(stats) => {
2316 wallets_synced = 1;
2317 totals.records_scanned =
2318 totals.records_scanned.saturating_add(stats.records_scanned);
2319 totals.records_added =
2320 totals.records_added.saturating_add(stats.records_added);
2321 totals.records_updated =
2322 totals.records_updated.saturating_add(stats.records_updated);
2323 }
2324 Err(e) => {
2325 emit_error(&app.writer, Some(id), &e, start).await;
2326 return;
2327 }
2328 }
2329 } else {
2330 let target_networks: Vec<Network> = if let Some(single) = network {
2331 vec![single]
2332 } else {
2333 vec![
2334 Network::Cashu,
2335 Network::Ln,
2336 Network::Sol,
2337 Network::Evm,
2338 Network::Btc,
2339 ]
2340 };
2341
2342 let wallets = match require_store(app).and_then(|s| s.list_wallet_metadata(None)) {
2343 Ok(wallets) => wallets,
2344 Err(e) => {
2345 emit_error(&app.writer, Some(id), &e, start).await;
2346 return;
2347 }
2348 };
2349
2350 for network_key in target_networks {
2351 let Some(provider) = get_provider(&app.providers, network_key) else {
2352 if network.is_some() {
2353 emit_error(
2354 &app.writer,
2355 Some(id),
2356 &PayError::NotImplemented(format!(
2357 "network {network_key} not enabled"
2358 )),
2359 start,
2360 )
2361 .await;
2362 return;
2363 }
2364 continue;
2365 };
2366 for wallet_meta in &wallets {
2367 if wallet_meta.network != network_key {
2368 continue;
2369 }
2370 match provider.history_sync(&wallet_meta.id, sync_limit).await {
2371 Ok(stats) => {
2372 wallets_synced = wallets_synced.saturating_add(1);
2373 totals.records_scanned =
2374 totals.records_scanned.saturating_add(stats.records_scanned);
2375 totals.records_added =
2376 totals.records_added.saturating_add(stats.records_added);
2377 totals.records_updated =
2378 totals.records_updated.saturating_add(stats.records_updated);
2379 }
2380 Err(PayError::NotImplemented(_)) | Err(PayError::WalletNotFound(_)) => {
2381 }
2382 Err(e) => {
2383 emit_error(&app.writer, Some(id), &e, start).await;
2384 return;
2385 }
2386 }
2387 }
2388 }
2389 }
2390
2391 let _ = app
2392 .writer
2393 .send(Output::HistoryUpdated {
2394 id,
2395 wallets_synced,
2396 records_scanned: totals.records_scanned,
2397 records_added: totals.records_added,
2398 records_updated: totals.records_updated,
2399 trace: trace_from(start),
2400 })
2401 .await;
2402 }
2403
2404 Input::LimitAdd { id, mut limit } => {
2405 let start = Instant::now();
2406 if !app.enforce_limits {
2407 emit_error(
2408 &app.writer,
2409 Some(id),
2410 &PayError::NotImplemented(
2411 "limit_add is unavailable when limits are not enforced locally; configure limits on the RPC daemon"
2412 .to_string(),
2413 ),
2414 start,
2415 )
2416 .await;
2417 return;
2418 }
2419
2420 if limit.scope == SpendScope::Wallet && limit.network.is_none() {
2422 if let Some(wallet_id) = limit.wallet.as_deref() {
2423 match require_store(app).and_then(|s| s.load_wallet_metadata(wallet_id)) {
2424 Ok(meta) => {
2425 limit.network = Some(meta.network.to_string());
2426 }
2427 Err(e) => {
2428 emit_error(&app.writer, Some(id), &e, start).await;
2429 return;
2430 }
2431 }
2432 }
2433 }
2434
2435 match app.spend_ledger.add_limit(&mut limit).await {
2436 Ok(rule_id) => {
2437 let _ = app
2438 .writer
2439 .send(Output::LimitAdded {
2440 id,
2441 rule_id,
2442 trace: trace_from(start),
2443 })
2444 .await;
2445 }
2446 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2447 }
2448 }
2449
2450 Input::LimitRemove { id, rule_id } => {
2451 let start = Instant::now();
2452 if !app.enforce_limits {
2453 emit_error(
2454 &app.writer,
2455 Some(id),
2456 &PayError::NotImplemented(
2457 "limit_remove is unavailable when limits are not enforced locally; configure limits on the RPC daemon"
2458 .to_string(),
2459 ),
2460 start,
2461 )
2462 .await;
2463 return;
2464 }
2465
2466 match app.spend_ledger.remove_limit(&rule_id).await {
2467 Ok(()) => {
2468 let _ = app
2469 .writer
2470 .send(Output::LimitRemoved {
2471 id,
2472 rule_id,
2473 trace: trace_from(start),
2474 })
2475 .await;
2476 }
2477 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2478 }
2479 }
2480
2481 Input::LimitList { id } => {
2482 let start = Instant::now();
2483 let local_limits = if app.enforce_limits {
2484 match app.spend_ledger.get_status().await {
2485 Ok(status) => status,
2486 Err(e) => {
2487 emit_error(&app.writer, Some(id), &e, start).await;
2488 return;
2489 }
2490 }
2491 } else {
2492 vec![]
2493 };
2494
2495 let config = app.config.read().await.clone();
2497 let downstream = query_downstream_limits(&config).await;
2498
2499 let _ = app
2500 .writer
2501 .send(Output::LimitStatus {
2502 id,
2503 limits: local_limits,
2504 downstream,
2505 trace: trace_from(start),
2506 })
2507 .await;
2508 }
2509
2510 Input::LimitSet { id, mut limits } => {
2511 let start = Instant::now();
2512 if !app.enforce_limits {
2513 emit_error(
2514 &app.writer,
2515 Some(id),
2516 &PayError::NotImplemented(
2517 "limit_set is unavailable when limits are not enforced locally; configure limits on the RPC daemon"
2518 .to_string(),
2519 ),
2520 start,
2521 )
2522 .await;
2523 return;
2524 }
2525
2526 for rule in &mut limits {
2528 if rule.scope == SpendScope::Wallet && rule.network.is_none() {
2529 if let Some(wallet_id) = rule.wallet.as_deref() {
2530 match require_store(app).and_then(|s| s.load_wallet_metadata(wallet_id)) {
2531 Ok(meta) => {
2532 rule.network = Some(meta.network.to_string());
2533 }
2534 Err(e) => {
2535 emit_error(&app.writer, Some(id), &e, start).await;
2536 return;
2537 }
2538 }
2539 }
2540 }
2541 }
2542
2543 match app.spend_ledger.set_limits(&limits).await {
2544 Ok(()) => match app.spend_ledger.get_status().await {
2545 Ok(status) => {
2546 let _ = app
2547 .writer
2548 .send(Output::LimitStatus {
2549 id,
2550 limits: status,
2551 downstream: vec![],
2552 trace: trace_from(start),
2553 })
2554 .await;
2555 }
2556 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2557 },
2558 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2559 }
2560 }
2561
2562 Input::WalletConfigShow { id, wallet } => {
2563 let start = Instant::now();
2564 match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
2565 Ok(meta) => {
2566 let resolved_wallet = meta.id.clone();
2567 let _ = app
2568 .writer
2569 .send(Output::WalletConfig {
2570 id,
2571 wallet: resolved_wallet,
2572 config: meta,
2573 trace: trace_from(start),
2574 })
2575 .await;
2576 }
2577 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2578 }
2579 }
2580
2581 Input::WalletConfigSet {
2582 id,
2583 wallet,
2584 label,
2585 rpc_endpoints,
2586 chain_id,
2587 } => {
2588 let start = Instant::now();
2589 match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
2590 Ok(mut meta) => {
2591 let resolved_wallet = meta.id.clone();
2592 let mut changed = false;
2593
2594 if let Some(new_label) = label {
2595 let trimmed = new_label.trim();
2596 meta.label = if trimmed.is_empty() {
2597 None
2598 } else {
2599 Some(trimmed.to_string())
2600 };
2601 changed = true;
2602 }
2603
2604 if !rpc_endpoints.is_empty() {
2605 match meta.network {
2606 Network::Sol => {
2607 meta.sol_rpc_endpoints = Some(rpc_endpoints);
2608 changed = true;
2609 }
2610 Network::Evm => {
2611 meta.evm_rpc_endpoints = Some(rpc_endpoints);
2612 changed = true;
2613 }
2614 _ => {
2615 emit_error(
2616 &app.writer,
2617 Some(id),
2618 &PayError::InvalidAmount(format!(
2619 "rpc-endpoint not supported for {} wallets",
2620 meta.network
2621 )),
2622 start,
2623 )
2624 .await;
2625 return;
2626 }
2627 }
2628 }
2629
2630 if let Some(cid) = chain_id {
2631 if meta.network != Network::Evm {
2632 emit_error(
2633 &app.writer,
2634 Some(id),
2635 &PayError::InvalidAmount(
2636 "chain-id is only supported for evm wallets".to_string(),
2637 ),
2638 start,
2639 )
2640 .await;
2641 return;
2642 }
2643 meta.evm_chain_id = Some(cid);
2644 changed = true;
2645 }
2646
2647 if !changed {
2648 emit_error(
2649 &app.writer,
2650 Some(id),
2651 &PayError::InvalidAmount(
2652 "no configuration changes specified".to_string(),
2653 ),
2654 start,
2655 )
2656 .await;
2657 return;
2658 }
2659
2660 match require_store(app).and_then(|s| s.save_wallet_metadata(&meta)) {
2661 Ok(()) => {
2662 let _ = app
2663 .writer
2664 .send(Output::WalletConfigUpdated {
2665 id,
2666 wallet: resolved_wallet,
2667 trace: trace_from(start),
2668 })
2669 .await;
2670 }
2671 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2672 }
2673 }
2674 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2675 }
2676 }
2677
2678 Input::WalletConfigTokenAdd {
2679 id,
2680 wallet,
2681 symbol,
2682 address,
2683 decimals,
2684 } => {
2685 let start = Instant::now();
2686 match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
2687 Ok(mut meta) => {
2688 let resolved_wallet = meta.id.clone();
2689 if !matches!(meta.network, Network::Evm | Network::Sol) {
2690 emit_error(
2691 &app.writer,
2692 Some(id),
2693 &PayError::InvalidAmount(format!(
2694 "custom tokens not supported for {} wallets",
2695 meta.network
2696 )),
2697 start,
2698 )
2699 .await;
2700 return;
2701 }
2702
2703 let lower_symbol = symbol.to_ascii_lowercase();
2704 let tokens = meta.custom_tokens.get_or_insert_with(Vec::new);
2705 if tokens
2706 .iter()
2707 .any(|t| t.symbol.to_ascii_lowercase() == lower_symbol)
2708 {
2709 emit_error(
2710 &app.writer,
2711 Some(id),
2712 &PayError::InvalidAmount(format!(
2713 "custom token '{lower_symbol}' already registered"
2714 )),
2715 start,
2716 )
2717 .await;
2718 return;
2719 }
2720
2721 tokens.push(wallet::CustomToken {
2722 symbol: lower_symbol.clone(),
2723 address: address.clone(),
2724 decimals,
2725 });
2726
2727 match require_store(app).and_then(|s| s.save_wallet_metadata(&meta)) {
2728 Ok(()) => {
2729 let _ = app
2730 .writer
2731 .send(Output::WalletConfigTokenAdded {
2732 id,
2733 wallet: resolved_wallet,
2734 symbol: lower_symbol,
2735 address,
2736 decimals,
2737 trace: trace_from(start),
2738 })
2739 .await;
2740 }
2741 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2742 }
2743 }
2744 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2745 }
2746 }
2747
2748 Input::WalletConfigTokenRemove { id, wallet, symbol } => {
2749 let start = Instant::now();
2750 match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
2751 Ok(mut meta) => {
2752 let resolved_wallet = meta.id.clone();
2753 let lower_symbol = symbol.to_ascii_lowercase();
2754 let tokens = meta.custom_tokens.get_or_insert_with(Vec::new);
2755 let before_len = tokens.len();
2756 tokens.retain(|t| t.symbol.to_ascii_lowercase() != lower_symbol);
2757 if tokens.len() == before_len {
2758 emit_error(
2759 &app.writer,
2760 Some(id),
2761 &PayError::InvalidAmount(format!(
2762 "custom token '{lower_symbol}' not found"
2763 )),
2764 start,
2765 )
2766 .await;
2767 return;
2768 }
2769 if tokens.is_empty() {
2770 meta.custom_tokens = None;
2771 }
2772
2773 match require_store(app).and_then(|s| s.save_wallet_metadata(&meta)) {
2774 Ok(()) => {
2775 let _ = app
2776 .writer
2777 .send(Output::WalletConfigTokenRemoved {
2778 id,
2779 wallet: resolved_wallet,
2780 symbol: lower_symbol,
2781 trace: trace_from(start),
2782 })
2783 .await;
2784 }
2785 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2786 }
2787 }
2788 Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2789 }
2790 }
2791
2792 Input::Config(patch) => {
2793 let start = Instant::now();
2794 let ConfigPatch {
2795 data_dir,
2796 limits,
2797 log,
2798 exchange_rate,
2799 afpay_rpc,
2800 providers,
2801 } = patch;
2802
2803 let mut unsupported = Vec::new();
2804 if data_dir.is_some() {
2805 unsupported.push("data_dir");
2806 }
2807 if afpay_rpc.is_some() {
2808 unsupported.push("afpay_rpc");
2809 }
2810 if providers.is_some() {
2811 unsupported.push("providers");
2812 }
2813 if exchange_rate.is_some() {
2814 unsupported.push("exchange_rate");
2815 }
2816 if !unsupported.is_empty() {
2817 let err = PayError::NotImplemented(format!(
2818 "runtime config only supports 'log' and 'limits'; unsupported fields: {}",
2819 unsupported.join(", ")
2820 ));
2821 emit_error(&app.writer, None, &err, start).await;
2822 return;
2823 }
2824
2825 if let Some(ref v) = limits {
2826 if !app.enforce_limits {
2827 let err = PayError::NotImplemented(
2828 "config.limits is unavailable when limits are not enforced locally; configure limits on the RPC daemon"
2829 .to_string(),
2830 );
2831 emit_error(&app.writer, None, &err, start).await;
2832 return;
2833 }
2834 if let Err(e) = app.spend_ledger.set_limits(v).await {
2835 emit_error(&app.writer, None, &e, start).await;
2836 return;
2837 }
2838 }
2839
2840 let mut cfg = app.config.write().await;
2841 if let Some(v) = limits {
2842 cfg.limits = v;
2843 }
2844 if let Some(v) = log {
2845 cfg.log = agent_first_data::cli_parse_log_filters(&v);
2846 }
2847 let _ = app.writer.send(Output::Config(cfg.clone())).await;
2848 }
2849
2850 Input::Version => {
2851 let _ = app
2852 .writer
2853 .send(Output::Version {
2854 version: crate::config::VERSION.to_string(),
2855 trace: PongTrace {
2856 uptime_s: app.start_time.elapsed().as_secs(),
2857 requests_total: app.requests_total.load(Ordering::Relaxed),
2858 in_flight: app.in_flight.lock().await.len(),
2859 },
2860 })
2861 .await;
2862 }
2863
2864 Input::Close => {
2865 }
2867 }
2868
2869 emit_migration_log(app).await;
2870}
2871
2872fn get_provider(
2877 providers: &HashMap<Network, Box<dyn PayProvider>>,
2878 network: Network,
2879) -> Option<&dyn PayProvider> {
2880 providers.get(&network).map(|p| p.as_ref())
2881}
2882
2883fn looks_like_bip39_mnemonic(secret: &str) -> bool {
2884 let words = secret.split_whitespace().count();
2885 words == 12 || words == 24
2886}
2887
2888fn evm_receive_token_matches(expected: &str, observed: &str) -> bool {
2889 let expected = expected.trim().to_ascii_lowercase();
2890 let observed = observed.trim().to_ascii_lowercase();
2891 if expected == "native" {
2892 return observed == "native" || observed == "gwei" || observed == "wei";
2893 }
2894 if observed == expected {
2895 return true;
2896 }
2897 if let Some(stripped) = observed.strip_suffix("_base_units") {
2898 return stripped == expected;
2899 }
2900 false
2901}
2902
2903async fn emit_error(
2904 writer: &mpsc::Sender<Output>,
2905 id: Option<String>,
2906 err: &PayError,
2907 start: Instant,
2908) {
2909 emit_error_hint(writer, id, err, start, None).await;
2910}
2911
2912async fn emit_error_hint(
2915 writer: &mpsc::Sender<Output>,
2916 id: Option<String>,
2917 err: &PayError,
2918 start: Instant,
2919 hint_override: Option<&str>,
2920) {
2921 let _ = writer
2922 .send(Output::Error {
2923 id,
2924 error_code: err.error_code().to_string(),
2925 error: err.to_string(),
2926 hint: hint_override.map(|h| h.to_string()).or_else(|| err.hint()),
2927 retryable: err.retryable(),
2928 trace: trace_from(start),
2929 })
2930 .await;
2931}
2932
2933fn extract_id(input: &Input) -> Option<String> {
2934 match input {
2935 Input::WalletCreate { id, .. }
2936 | Input::LnWalletCreate { id, .. }
2937 | Input::WalletClose { id, .. }
2938 | Input::WalletList { id, .. }
2939 | Input::Balance { id, .. }
2940 | Input::Receive { id, .. }
2941 | Input::ReceiveClaim { id, .. }
2942 | Input::CashuSend { id, .. }
2943 | Input::CashuReceive { id, .. }
2944 | Input::Send { id, .. }
2945 | Input::Restore { id, .. }
2946 | Input::WalletShowSeed { id, .. }
2947 | Input::HistoryList { id, .. }
2948 | Input::HistoryStatus { id, .. }
2949 | Input::HistoryUpdate { id, .. }
2950 | Input::LimitAdd { id, .. }
2951 | Input::LimitRemove { id, .. }
2952 | Input::LimitList { id, .. }
2953 | Input::LimitSet { id, .. }
2954 | Input::WalletConfigShow { id, .. }
2955 | Input::WalletConfigSet { id, .. }
2956 | Input::WalletConfigTokenAdd { id, .. }
2957 | Input::WalletConfigTokenRemove { id, .. } => Some(id.clone()),
2958 Input::Config(_) | Input::Version | Input::Close => None,
2959 }
2960}
2961
2962fn trace_from(start: Instant) -> Trace {
2963 Trace::from_duration(start.elapsed().as_millis() as u64)
2964}
2965
2966async fn query_downstream_limits(config: &RuntimeConfig) -> Vec<DownstreamLimitNode> {
2968 let mut result = Vec::new();
2969 let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
2970 for (name, rpc_cfg) in &config.afpay_rpc {
2971 if !seen.insert(rpc_cfg.endpoint.clone()) {
2972 continue;
2973 }
2974 let secret = rpc_cfg.endpoint_secret.as_deref().unwrap_or("");
2975 let limit_input = Input::LimitList {
2976 id: format!("downstream_{name}"),
2977 };
2978 let outputs =
2979 crate::provider::remote::rpc_call(&rpc_cfg.endpoint, secret, &limit_input).await;
2980 let mut node = DownstreamLimitNode {
2981 name: name.clone(),
2982 endpoint: rpc_cfg.endpoint.clone(),
2983 limits: vec![],
2984 error: None,
2985 downstream: vec![],
2986 };
2987 for value in &outputs {
2988 if value.get("code").and_then(|v| v.as_str()) == Some("error") {
2989 node.error = value
2990 .get("error")
2991 .and_then(|v| v.as_str())
2992 .map(|s| s.to_string());
2993 }
2994 if value.get("code").and_then(|v| v.as_str()) == Some("limit_status") {
2995 if let Some(limits) = value.get("limits") {
2996 node.limits = serde_json::from_value(limits.clone()).unwrap_or_default();
2997 }
2998 if let Some(ds) = value.get("downstream") {
2999 node.downstream = serde_json::from_value(ds.clone()).unwrap_or_default();
3000 }
3001 }
3002 }
3003 result.push(node);
3004 }
3005 result
3006}
3007
3008fn extract_token_from_target(to: &str) -> Option<String> {
3010 let query = to.split('?').nth(1)?;
3011 for part in query.split('&') {
3012 if let Some(val) = part.strip_prefix("token=") {
3013 if !val.is_empty() {
3014 return Some(val.to_string());
3015 }
3016 }
3017 }
3018 None
3019}
3020
3021fn wallet_provider_key(meta: &wallet::WalletMetadata) -> String {
3022 match meta.network {
3023 Network::Ln => meta
3024 .backend
3025 .as_deref()
3026 .map(|b| format!("ln-{}", b.to_ascii_lowercase()))
3027 .unwrap_or_else(|| "ln".to_string()),
3028 _ => meta.network.to_string(),
3029 }
3030}
3031
3032fn wallet_summary_from_meta(meta: &wallet::WalletMetadata, wallet_id: &str) -> WalletSummary {
3033 let (address, backend) = match meta.network {
3034 Network::Cashu => (meta.mint_url.clone().unwrap_or_default(), None),
3035 Network::Ln => {
3036 let b = meta
3037 .backend
3038 .clone()
3039 .unwrap_or_else(|| "unknown".to_string());
3040 (format!("ln:{b}"), Some(b))
3041 }
3042 _ => (wallet_id.to_string(), None),
3043 };
3044 WalletSummary {
3045 id: meta.id.clone(),
3046 network: meta.network,
3047 label: meta.label.clone(),
3048 address,
3049 backend,
3050 mint_url: meta.mint_url.clone(),
3051 rpc_endpoints: meta
3052 .sol_rpc_endpoints
3053 .clone()
3054 .or(meta.evm_rpc_endpoints.clone()),
3055 chain_id: meta.evm_chain_id,
3056 created_at_epoch_s: meta.created_at_epoch_s,
3057 }
3058}
3059
3060async fn resolve_wallet_summary(app: &App, wallet_id: &str) -> WalletSummary {
3061 if let Ok(meta) = require_store(app).and_then(|s| s.load_wallet_metadata(wallet_id)) {
3062 return wallet_summary_from_meta(&meta, wallet_id);
3063 }
3064 if let Ok(wallets) = collect_all!(&app.providers, |p| p.list_wallets()) {
3065 if let Some(summary) = wallets.into_iter().find(|w| w.id == wallet_id) {
3066 return summary;
3067 }
3068 }
3069 WalletSummary {
3070 id: wallet_id.to_string(),
3071 network: Network::Ln,
3072 label: None,
3073 address: String::new(),
3074 backend: None,
3075 mint_url: None,
3076 rpc_endpoints: None,
3077 chain_id: None,
3078 created_at_epoch_s: 0,
3079 }
3080}
3081
3082fn log_enabled(log: &[String], event: &str) -> bool {
3083 if log.is_empty() {
3084 return false;
3085 }
3086 let ev = event.to_ascii_lowercase();
3087 log.iter()
3088 .any(|f| f == "*" || f == "all" || ev.starts_with(f.as_str()))
3089}
3090
3091async fn emit_migration_log(app: &App) {
3092 let entries = app
3093 .store
3094 .as_ref()
3095 .map(|s| s.drain_migration_log())
3096 .unwrap_or_default();
3097 if entries.is_empty() {
3098 return;
3099 }
3100 for entry in entries {
3101 emit_log(
3102 app,
3103 "schema_migration",
3104 None,
3105 serde_json::json!({
3106 "database": entry.database,
3107 "from_version": entry.from_version,
3108 "to_version": entry.to_version,
3109 }),
3110 )
3111 .await;
3112 }
3113}
3114
3115async fn emit_log(app: &App, event: &str, request_id: Option<String>, args: serde_json::Value) {
3116 let log = app.config.read().await.log.clone();
3117 if !log_enabled(&log, event) {
3118 return;
3119 }
3120 let _ = app
3121 .writer
3122 .send(Output::Log {
3123 event: event.to_string(),
3124 request_id,
3125 version: None,
3126 argv: None,
3127 config: None,
3128 args: Some(args),
3129 env: None,
3130 trace: Trace::from_duration(0),
3131 })
3132 .await;
3133}