Skip to main content

agent_first_pay/
handler.rs

1#[cfg(any(
2    feature = "btc-esplora",
3    feature = "btc-core",
4    feature = "btc-electrum"
5))]
6use crate::provider::btc::BtcProvider;
7#[cfg(feature = "cashu")]
8use crate::provider::cashu::CashuProvider;
9#[cfg(feature = "evm")]
10use crate::provider::evm::EvmProvider;
11#[cfg(any(feature = "ln-nwc", feature = "ln-phoenixd", feature = "ln-lnbits"))]
12use crate::provider::ln::LnProvider;
13use crate::provider::remote::RemoteProvider;
14#[cfg(feature = "sol")]
15use crate::provider::sol::SolProvider;
16use crate::provider::{HistorySyncStats, PayError, PayProvider, StubProvider};
17use crate::spend::{SpendContext, SpendLedger};
18#[cfg(feature = "redb")]
19use crate::store::lock;
20use crate::store::wallet;
21use crate::store::{PayStore, StorageBackend};
22use crate::types::*;
23use std::collections::{HashMap, HashSet};
24use std::sync::atomic::{AtomicU64, Ordering};
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use tokio::sync::{mpsc, Mutex, RwLock};
28use tokio::task::JoinHandle;
29use tokio::time::sleep;
30
31const DEFAULT_WAIT_TIMEOUT_SECS: u64 = 300;
32const DEFAULT_WAIT_POLL_INTERVAL_MS: u64 = 1000;
33const DEFAULT_WAIT_SYNC_LIMIT: usize = 500;
34
35pub struct App {
36    pub config: RwLock<RuntimeConfig>,
37    pub providers: HashMap<Network, Box<dyn PayProvider>>,
38    pub writer: mpsc::Sender<Output>,
39    pub in_flight: Mutex<HashMap<String, JoinHandle<()>>>,
40    pub requests_total: AtomicU64,
41    pub start_time: Instant,
42    /// True if any provider uses local data (needs data-dir lock for writes).
43    pub has_local_providers: bool,
44    /// Whether this node enforces spend limits.
45    /// RPC mode: always true. CLI/pipe with all remote: false. CLI/pipe with any local: true.
46    pub enforce_limits: bool,
47    pub spend_ledger: SpendLedger,
48    /// Storage backend for wallet metadata and transaction history.
49    /// None when running in frontend-only mode (no local DB, only remote RPC).
50    pub store: Option<Arc<StorageBackend>>,
51}
52
53impl App {
54    /// Create a new App. If `enforce_limits_override` is Some, use that value;
55    /// otherwise auto-detect: enforce if any provider writes locally.
56    pub fn new(
57        config: RuntimeConfig,
58        writer: mpsc::Sender<Output>,
59        enforce_limits_override: Option<bool>,
60        store: Option<StorageBackend>,
61    ) -> Self {
62        let store = store.map(Arc::new);
63        let mut providers: HashMap<Network, Box<dyn PayProvider>> = HashMap::new();
64
65        for network in &[
66            Network::Ln,
67            Network::Sol,
68            Network::Evm,
69            Network::Cashu,
70            Network::Btc,
71        ] {
72            let key = network.to_string();
73            if let Some(rpc_name) = config.providers.get(&key) {
74                // Look up the afpay_rpc node by name
75                if let Some(rpc_cfg) = config.afpay_rpc.get(rpc_name) {
76                    let secret = rpc_cfg.endpoint_secret.as_deref().unwrap_or("");
77                    providers.insert(
78                        *network,
79                        Box::new(RemoteProvider::new(&rpc_cfg.endpoint, secret, *network)),
80                    );
81                } else {
82                    // Unknown afpay_rpc name — insert stub so errors surface at runtime
83                    providers.insert(*network, Box::new(StubProvider::new(*network)));
84                }
85            } else {
86                #[allow(unreachable_patterns)]
87                match network {
88                    #[cfg(feature = "cashu")]
89                    Network::Cashu => {
90                        if let Some(s) = &store {
91                            let pg_url = config
92                                .postgres_url_secret
93                                .clone()
94                                .filter(|_| config.storage_backend.as_deref() == Some("postgres"));
95                            providers.insert(
96                                *network,
97                                Box::new(CashuProvider::new(&config.data_dir, pg_url, s.clone())),
98                            );
99                        } else {
100                            providers.insert(*network, Box::new(StubProvider::new(*network)));
101                        }
102                    }
103                    #[cfg(any(feature = "ln-nwc", feature = "ln-phoenixd", feature = "ln-lnbits"))]
104                    Network::Ln => {
105                        providers.insert(*network, Box::new(LnProvider::new(&config.data_dir)));
106                    }
107                    #[cfg(feature = "sol")]
108                    Network::Sol => {
109                        providers.insert(*network, Box::new(SolProvider::new(&config.data_dir)));
110                    }
111                    #[cfg(feature = "evm")]
112                    Network::Evm => {
113                        providers.insert(*network, Box::new(EvmProvider::new(&config.data_dir)));
114                    }
115                    #[cfg(any(
116                        feature = "btc-esplora",
117                        feature = "btc-core",
118                        feature = "btc-electrum"
119                    ))]
120                    Network::Btc => {
121                        providers.insert(*network, Box::new(BtcProvider::new(&config.data_dir)));
122                    }
123                    _ => {
124                        providers.insert(*network, Box::new(StubProvider::new(*network)));
125                    }
126                }
127            }
128        }
129
130        let has_local = providers.values().any(|p| p.writes_locally());
131        let spend_ledger = match store.as_deref() {
132            #[cfg(feature = "postgres")]
133            Some(StorageBackend::Postgres(pg)) => {
134                SpendLedger::new_postgres(pg.pool().clone(), config.exchange_rate.clone())
135            }
136            _ => SpendLedger::new(&config.data_dir, config.exchange_rate.clone()),
137        };
138        Self {
139            config: RwLock::new(config),
140            providers,
141            writer,
142            in_flight: Mutex::new(HashMap::new()),
143            requests_total: AtomicU64::new(0),
144            start_time: Instant::now(),
145            has_local_providers: has_local,
146            enforce_limits: enforce_limits_override.unwrap_or(has_local),
147            spend_ledger,
148            store,
149        }
150    }
151}
152
153/// Unified startup validation for long-lived modes.
154/// Pings all configured remote afpay_rpc nodes (deduplicated) and validates provider mappings.
155pub async fn startup_provider_validation_errors(config: &RuntimeConfig) -> Vec<Output> {
156    let mut errors = Vec::new();
157
158    // Validate that all provider values reference known afpay_rpc names
159    for (network, rpc_name) in &config.providers {
160        if !config.afpay_rpc.contains_key(rpc_name) {
161            errors.push(Output::Error {
162                id: None,
163                error_code: "invalid_config".to_string(),
164                error: format!(
165                    "providers.{network} references unknown afpay_rpc node '{rpc_name}'"
166                ),
167                hint: Some(format!(
168                    "add [afpay_rpc.{rpc_name}] with endpoint and endpoint_secret to config.toml"
169                )),
170                retryable: false,
171                trace: Trace::from_duration(0),
172            });
173        }
174    }
175    if !errors.is_empty() {
176        return errors;
177    }
178
179    // Ping each unique afpay_rpc endpoint once
180    let mut pinged: std::collections::HashSet<String> = std::collections::HashSet::new();
181    for (rpc_name, rpc_cfg) in &config.afpay_rpc {
182        if !pinged.insert(rpc_cfg.endpoint.clone()) {
183            continue;
184        }
185        // Find any network that maps to this rpc_name (for the RemoteProvider constructor)
186        let network = config
187            .providers
188            .iter()
189            .find(|(_, name)| *name == rpc_name)
190            .and_then(|(k, _)| k.parse::<Network>().ok())
191            .unwrap_or(Network::Cashu);
192        let secret = rpc_cfg.endpoint_secret.as_deref().unwrap_or("");
193        let provider = RemoteProvider::new(&rpc_cfg.endpoint, secret, network);
194        if let Err(err) = provider.ping().await {
195            errors.push(Output::Error {
196                id: None,
197                error_code: "provider_unreachable".to_string(),
198                error: format!("afpay_rpc.{rpc_name} ({}): {err}", rpc_cfg.endpoint),
199                hint: Some("check endpoint address and that the daemon is running".to_string()),
200                retryable: true,
201                trace: Trace::from_duration(0),
202            });
203        }
204    }
205    errors
206}
207
208/// Get a reference to the storage backend, or return NotImplemented.
209fn require_store(app: &App) -> Result<&StorageBackend, PayError> {
210    app.store
211        .as_deref()
212        .ok_or_else(|| PayError::NotImplemented("no storage backend available".to_string()))
213}
214
215/// Acquire the data-directory lock for a write operation.
216/// Returns the lock guard (dropped after operation) or emits an error.
217#[cfg(feature = "redb")]
218async fn acquire_write_lock(app: &App) -> Result<lock::DataLock, PayError> {
219    let data_dir = app.config.read().await.data_dir.clone();
220    let lock = tokio::task::spawn_blocking(move || lock::acquire(&data_dir, None))
221        .await
222        .map_err(|e| PayError::InternalError(format!("lock task: {e}")))?
223        .map_err(PayError::InternalError)?;
224    Ok(lock)
225}
226
227fn needs_write_lock(input: &Input) -> bool {
228    matches!(
229        input,
230        Input::WalletCreate { .. }
231            | Input::LnWalletCreate { .. }
232            | Input::WalletClose { .. }
233            | Input::Receive { .. }
234            | Input::ReceiveClaim { .. }
235            | Input::CashuSend { .. }
236            | Input::CashuReceive { .. }
237            | Input::Send { .. }
238            | Input::Restore { .. }
239            | Input::LimitAdd { .. }
240            | Input::LimitRemove { .. }
241            | Input::LimitSet { .. }
242            | Input::HistoryUpdate { .. }
243            | Input::WalletConfigSet { .. }
244            | Input::WalletConfigTokenAdd { .. }
245            | Input::WalletConfigTokenRemove { .. }
246    )
247}
248
249/// Try each provider until one succeeds. Skips NotImplemented, stops on first real result.
250macro_rules! try_provider {
251    ($providers:expr, |$p:ident| $call:expr) => {{
252        let mut _result: Option<Result<_, PayError>> = None;
253        for _prov in $providers.values() {
254            let $p = _prov.as_ref();
255            match $call.await {
256                Ok(v) => {
257                    _result = Some(Ok(v));
258                    break;
259                }
260                Err(PayError::NotImplemented(_)) => continue,
261                Err(e) => {
262                    _result = Some(Err(e));
263                    break;
264                }
265            }
266        }
267        match _result {
268            Some(r) => r,
269            None => Err(PayError::NotImplemented("network not enabled".to_string())),
270        }
271    }};
272}
273
274/// Collect results from all providers, skipping NotImplemented.
275macro_rules! collect_all {
276    ($providers:expr, |$p:ident| $call:expr) => {{
277        let mut _all = Vec::new();
278        let mut _err: Option<PayError> = None;
279        for _prov in $providers.values() {
280            let $p = _prov.as_ref();
281            match $call.await {
282                Ok(mut items) => _all.append(&mut items),
283                Err(PayError::NotImplemented(_)) => {}
284                Err(e) => {
285                    _err = Some(e);
286                    break;
287                }
288            }
289        }
290        match _err {
291            Some(e) => Err(e),
292            None => Ok(_all),
293        }
294    }};
295}
296
297/// Resolve wallet labels to wallet IDs in-place.
298/// If a wallet field does not start with "w_", treat it as a label and look it up.
299fn resolve_wallet_labels(input: &mut Input, store: &dyn PayStore) -> Result<(), PayError> {
300    fn resolve(store: &dyn PayStore, w: &mut String) -> Result<(), PayError> {
301        if !w.starts_with("w_") {
302            *w = store.resolve_wallet_id(w)?;
303        }
304        Ok(())
305    }
306    fn resolve_opt(store: &dyn PayStore, w: &mut Option<String>) -> Result<(), PayError> {
307        if let Some(val) = w.as_mut() {
308            if !val.starts_with("w_") {
309                *val = store.resolve_wallet_id(val)?;
310            }
311        }
312        Ok(())
313    }
314    match input {
315        Input::WalletClose { wallet, .. } => resolve(store, wallet),
316        Input::Balance { wallet, .. } => resolve_opt(store, wallet),
317        Input::Receive { wallet, .. } => resolve(store, wallet),
318        Input::ReceiveClaim { wallet, .. } => resolve(store, wallet),
319        Input::CashuSend { wallet, .. } => resolve_opt(store, wallet),
320        Input::CashuReceive { wallet, .. } => resolve_opt(store, wallet),
321        Input::Send { wallet, .. } => resolve_opt(store, wallet),
322        Input::Restore { wallet, .. } => resolve(store, wallet),
323        Input::WalletShowSeed { wallet, .. } => resolve(store, wallet),
324        Input::HistoryList { wallet, .. } | Input::HistoryUpdate { wallet, .. } => {
325            resolve_opt(store, wallet)
326        }
327        Input::WalletConfigShow { wallet, .. } => resolve(store, wallet),
328        Input::WalletConfigSet { wallet, .. } => resolve(store, wallet),
329        Input::WalletConfigTokenAdd { wallet, .. } => resolve(store, wallet),
330        Input::WalletConfigTokenRemove { wallet, .. } => resolve(store, wallet),
331        Input::LimitAdd { limit, .. } => resolve_opt(store, &mut limit.wallet),
332        Input::LimitSet { limits, .. } => {
333            for limit in limits.iter_mut() {
334                resolve_opt(store, &mut limit.wallet)?;
335            }
336            Ok(())
337        }
338        _ => Ok(()),
339    }
340}
341
342pub async fn dispatch(app: &App, input: Input) {
343    // Acquire per-operation file lock for redb write operations.
344    // Postgres handles its own concurrency; no file lock needed.
345    #[cfg(feature = "redb")]
346    let _lock = if app.has_local_providers
347        && needs_write_lock(&input)
348        && matches!(app.store.as_deref(), Some(StorageBackend::Redb(..)) | None)
349    {
350        match acquire_write_lock(app).await {
351            Ok(guard) => Some(guard),
352            Err(e) => {
353                let id = extract_id(&input);
354                emit_error(&app.writer, id, &e, Instant::now()).await;
355                return;
356            }
357        }
358    } else {
359        None
360    };
361
362    // Resolve wallet labels → wallet IDs before dispatch
363    let mut input = input;
364    if let Some(store) = &app.store {
365        if let Err(e) = resolve_wallet_labels(&mut input, store.as_ref()) {
366            let id = extract_id(&input);
367            emit_error(&app.writer, id, &e, Instant::now()).await;
368            return;
369        }
370    }
371
372    match input {
373        Input::WalletCreate {
374            id,
375            network,
376            label,
377            mint_url,
378            rpc_endpoints,
379            chain_id,
380            mnemonic_secret,
381            btc_esplora_url,
382            btc_network,
383            btc_address_type,
384            btc_backend,
385            btc_core_url,
386            btc_core_auth_secret,
387            btc_electrum_url,
388        } => {
389            let start = Instant::now();
390            let mut log_args = serde_json::json!({
391                "operation": "wallet_create",
392                "network": network.to_string(),
393                "label": label.as_deref().unwrap_or("default"),
394            });
395            if let Some(object) = log_args.as_object_mut() {
396                if !rpc_endpoints.is_empty() {
397                    object.insert(
398                        "rpc_endpoints".to_string(),
399                        serde_json::json!(rpc_endpoints),
400                    );
401                }
402                if let Some(cid) = chain_id {
403                    object.insert("chain_id".to_string(), serde_json::json!(cid));
404                }
405                if let Some(url) = mint_url.as_deref() {
406                    object.insert("mint_url".to_string(), serde_json::json!(url));
407                }
408                object.insert(
409                    "use_recovery_mnemonic".to_string(),
410                    serde_json::json!(mnemonic_secret.is_some()),
411                );
412            }
413            emit_log(app, "wallet", Some(id.clone()), log_args).await;
414            let request = WalletCreateRequest {
415                label: label.unwrap_or_else(|| "default".to_string()),
416                mint_url,
417                rpc_endpoints,
418                chain_id,
419                mnemonic_secret,
420                btc_esplora_url,
421                btc_network,
422                btc_address_type,
423                btc_backend,
424                btc_core_url,
425                btc_core_auth_secret,
426                btc_electrum_url,
427            };
428            match get_provider(&app.providers, network) {
429                Some(p) => match p.create_wallet(&request).await {
430                    Ok(info) => {
431                        // Sync wallet metadata to store (for non-redb backends).
432                        #[cfg(feature = "redb")]
433                        if let Some(store) = app.store.as_ref() {
434                            if let Ok(meta) = wallet::load_wallet_metadata(
435                                &app.config.read().await.data_dir,
436                                &info.id,
437                            ) {
438                                let _ = store.save_wallet_metadata(&meta);
439                            }
440                        }
441                        let _ = app
442                            .writer
443                            .send(Output::WalletCreated {
444                                id,
445                                wallet: info.id,
446                                network: info.network,
447                                address: info.address,
448                                mnemonic: None,
449                                trace: trace_from(start),
450                            })
451                            .await;
452                    }
453                    Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
454                },
455                None => {
456                    emit_error(
457                        &app.writer,
458                        Some(id),
459                        &PayError::NotImplemented(format!("no provider for {network}")),
460                        start,
461                    )
462                    .await;
463                }
464            }
465        }
466
467        Input::LnWalletCreate { id, request } => {
468            let start = Instant::now();
469            let mut log_args =
470                serde_json::to_value(&request).unwrap_or_else(|_| serde_json::json!({}));
471            if let Some(object) = log_args.as_object_mut() {
472                object.insert(
473                    "operation".to_string(),
474                    serde_json::json!("ln_wallet_create"),
475                );
476                object.insert("network".to_string(), serde_json::json!("ln"));
477            }
478            emit_log(app, "wallet", Some(id.clone()), log_args).await;
479
480            match get_provider(&app.providers, Network::Ln) {
481                Some(p) => match p.create_ln_wallet(request).await {
482                    Ok(info) => {
483                        #[cfg(feature = "redb")]
484                        if let Some(store) = app.store.as_ref() {
485                            if let Ok(meta) = wallet::load_wallet_metadata(
486                                &app.config.read().await.data_dir,
487                                &info.id,
488                            ) {
489                                let _ = store.save_wallet_metadata(&meta);
490                            }
491                        }
492                        let _ = app
493                            .writer
494                            .send(Output::WalletCreated {
495                                id,
496                                wallet: info.id,
497                                network: info.network,
498                                address: info.address,
499                                mnemonic: None,
500                                trace: trace_from(start),
501                            })
502                            .await;
503                    }
504                    Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
505                },
506                None => {
507                    emit_error(
508                        &app.writer,
509                        Some(id),
510                        &PayError::NotImplemented("no provider for ln".to_string()),
511                        start,
512                    )
513                    .await;
514                }
515            }
516        }
517
518        Input::WalletClose {
519            id,
520            wallet,
521            dangerously_skip_balance_check_and_may_lose_money,
522        } => {
523            let start = Instant::now();
524            emit_log(
525                app,
526                "wallet",
527                Some(id.clone()),
528                serde_json::json!({
529                    "operation": "wallet_close",
530                    "wallet": &wallet,
531                    "dangerously_skip_balance_check_and_may_lose_money": dangerously_skip_balance_check_and_may_lose_money,
532                }),
533            )
534            .await;
535            let close_result = if dangerously_skip_balance_check_and_may_lose_money {
536                match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
537                    Ok(_) => require_store(app).and_then(|s| s.delete_wallet_metadata(&wallet)).map(|_| ()),
538                    Err(PayError::WalletNotFound(_)) => Err(PayError::WalletNotFound(format!(
539                        "wallet {wallet} not found locally; dangerous skip balance check only supports local wallets"
540                    ))),
541                    Err(error) => Err(error),
542                }
543            } else {
544                match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
545                    Ok(meta) => match get_provider(&app.providers, meta.network) {
546                        Some(provider) => provider.close_wallet(&wallet).await,
547                        None => Err(PayError::NotImplemented(format!(
548                            "no provider for {}",
549                            meta.network
550                        ))),
551                    },
552                    Err(PayError::WalletNotFound(_)) => {
553                        // Fallback for remote-only deployments where wallets may not be stored locally.
554                        try_provider!(&app.providers, |p| p.close_wallet(&wallet))
555                    }
556                    Err(error) => Err(error),
557                }
558            };
559
560            match close_result {
561                Ok(()) => {
562                    let _ = app
563                        .writer
564                        .send(Output::WalletClosed {
565                            id,
566                            wallet,
567                            trace: trace_from(start),
568                        })
569                        .await;
570                }
571                Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
572            }
573        }
574
575        Input::WalletList { id, network } => {
576            let start = Instant::now();
577            emit_log(
578                app,
579                "wallet",
580                Some(id.clone()),
581                serde_json::json!({
582                    "operation": "wallet_list",
583                    "network": network.map(|c| c.to_string()).unwrap_or_else(|| "all".to_string()),
584                }),
585            )
586            .await;
587            if let Some(network) = network {
588                match get_provider(&app.providers, network) {
589                    Some(p) => match p.list_wallets().await {
590                        Ok(wallets) => {
591                            let _ = app
592                                .writer
593                                .send(Output::WalletList {
594                                    id,
595                                    wallets,
596                                    trace: trace_from(start),
597                                })
598                                .await;
599                        }
600                        Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
601                    },
602                    None => {
603                        emit_error(
604                            &app.writer,
605                            Some(id),
606                            &PayError::NotImplemented(format!("no provider for {network}")),
607                            start,
608                        )
609                        .await;
610                    }
611                }
612            } else {
613                let wallets = collect_all!(&app.providers, |p| p.list_wallets());
614                match wallets {
615                    Ok(all) => {
616                        let _ = app
617                            .writer
618                            .send(Output::WalletList {
619                                id,
620                                wallets: all,
621                                trace: trace_from(start),
622                            })
623                            .await;
624                    }
625                    Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
626                }
627            }
628        }
629
630        Input::Balance {
631            id,
632            wallet,
633            network,
634            check,
635        } => {
636            let start = Instant::now();
637            emit_log(
638                app,
639                "wallet",
640                Some(id.clone()),
641                serde_json::json!({
642                    "operation": "balance", "wallet": wallet.as_deref().unwrap_or("all"), "check": check,
643                }),
644            )
645            .await;
646            if let Some(wallet_id) = wallet {
647                let meta_opt = require_store(app)
648                    .and_then(|s| s.load_wallet_metadata(&wallet_id))
649                    .ok();
650                let result = if let Some(ref meta) = meta_opt {
651                    match get_provider(&app.providers, meta.network) {
652                        Some(provider) => {
653                            if check {
654                                match provider.check_balance(&wallet_id).await {
655                                    Err(PayError::NotImplemented(_)) => {
656                                        provider.balance(&wallet_id).await
657                                    }
658                                    other => other,
659                                }
660                            } else {
661                                provider.balance(&wallet_id).await
662                            }
663                        }
664                        None => Err(PayError::NotImplemented(format!(
665                            "no provider for {}",
666                            meta.network
667                        ))),
668                    }
669                } else {
670                    // Remote-only fallback: wallet metadata may not exist locally.
671                    if check {
672                        try_provider!(&app.providers, |p| async {
673                            match p.check_balance(&wallet_id).await {
674                                Err(PayError::NotImplemented(_)) => p.balance(&wallet_id).await,
675                                other => other,
676                            }
677                        })
678                    } else {
679                        try_provider!(&app.providers, |p| p.balance(&wallet_id))
680                    }
681                };
682                match result {
683                    Ok(balance) => {
684                        let summary = if let Some(meta) = meta_opt {
685                            wallet_summary_from_meta(&meta, &wallet_id)
686                        } else {
687                            resolve_wallet_summary(app, &wallet_id).await
688                        };
689                        let _ = app
690                            .writer
691                            .send(Output::WalletBalances {
692                                id,
693                                wallets: vec![WalletBalanceItem {
694                                    wallet: summary,
695                                    balance: Some(balance),
696                                    error: None,
697                                }],
698                                trace: trace_from(start),
699                            })
700                            .await;
701                    }
702                    Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
703                }
704            } else {
705                match collect_all!(&app.providers, |p| p.balance_all()) {
706                    Ok(wallets) => {
707                        let filtered = if let Some(network) = network {
708                            wallets
709                                .into_iter()
710                                .filter(|w| w.wallet.network == network)
711                                .collect()
712                        } else {
713                            wallets
714                        };
715                        let _ = app
716                            .writer
717                            .send(Output::WalletBalances {
718                                id,
719                                wallets: filtered,
720                                trace: trace_from(start),
721                            })
722                            .await;
723                    }
724                    Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
725                }
726            }
727        }
728
729        Input::Receive {
730            id,
731            wallet,
732            network,
733            amount,
734            onchain_memo,
735            wait_until_paid,
736            wait_timeout_s,
737            wait_poll_interval_ms,
738            wait_sync_limit,
739            write_qr_svg_file: _,
740            min_confirmations,
741        } => {
742            let start = Instant::now();
743            let wait_requested = wait_until_paid
744                || wait_timeout_s.is_some()
745                || wait_poll_interval_ms.is_some()
746                || wait_sync_limit.is_some();
747            emit_log(
748                app,
749                "wallet",
750                Some(id.clone()),
751                serde_json::json!({
752                    "operation": "receive",
753                    "wallet": &wallet,
754                    "network": network.map(|c| c.to_string()).unwrap_or_else(|| "auto".to_string()),
755                    "amount": amount.as_ref().map(|a| a.value),
756                    "onchain_memo": onchain_memo.as_deref().unwrap_or(""),
757                    "wait_until_paid": wait_requested,
758                    "wait_timeout_s": wait_timeout_s,
759                    "wait_poll_interval_ms": wait_poll_interval_ms,
760                    "wait_sync_limit": wait_sync_limit,
761                }),
762            )
763            .await;
764
765            let (target_network, wallet_for_call) = if wallet.trim().is_empty() {
766                let wallets = match require_store(app).and_then(|s| s.list_wallet_metadata(network))
767                {
768                    Ok(v) => v,
769                    Err(e) => {
770                        emit_error(&app.writer, Some(id), &e, start).await;
771                        return;
772                    }
773                };
774                match wallets.len() {
775                    0 => {
776                        let msg = match network {
777                            Some(network) => format!("no {network} wallet found"),
778                            None => "no wallet found".to_string(),
779                        };
780                        emit_error(&app.writer, Some(id), &PayError::WalletNotFound(msg), start)
781                            .await;
782                        return;
783                    }
784                    1 => (wallets[0].network, wallets[0].id.clone()),
785                    _ => {
786                        let msg = match network {
787                            Some(network) => {
788                                format!("multiple {network} wallets found; pass --wallet")
789                            }
790                            None => "multiple wallets found; pass --wallet".to_string(),
791                        };
792                        emit_error(&app.writer, Some(id), &PayError::InvalidAmount(msg), start)
793                            .await;
794                        return;
795                    }
796                }
797            } else {
798                let meta = match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
799                    Ok(m) => m,
800                    Err(e) => {
801                        emit_error(&app.writer, Some(id), &e, start).await;
802                        return;
803                    }
804                };
805                if let Some(expected) = network {
806                    if meta.network != expected {
807                        emit_error(
808                            &app.writer,
809                            Some(id),
810                            &PayError::InvalidAmount(format!(
811                                "wallet {wallet} is {}, not {expected}",
812                                meta.network
813                            )),
814                            start,
815                        )
816                        .await;
817                        return;
818                    }
819                }
820                (meta.network, wallet.clone())
821            };
822
823            let Some(provider) = get_provider(&app.providers, target_network) else {
824                emit_error(
825                    &app.writer,
826                    Some(id),
827                    &PayError::NotImplemented(format!("no provider for {target_network}")),
828                    start,
829                )
830                .await;
831                return;
832            };
833
834            match provider
835                .receive_info(&wallet_for_call, amount.clone())
836                .await
837            {
838                Ok(receive_info) => {
839                    let quote_id = receive_info.quote_id.clone();
840                    let _ = app
841                        .writer
842                        .send(Output::ReceiveInfo {
843                            id: id.clone(),
844                            wallet: wallet_for_call.clone(),
845                            receive_info,
846                            trace: trace_from(start),
847                        })
848                        .await;
849
850                    if !wait_requested {
851                        return;
852                    }
853
854                    let timeout_secs = wait_timeout_s.unwrap_or(DEFAULT_WAIT_TIMEOUT_SECS);
855                    if timeout_secs == 0 {
856                        emit_error(
857                            &app.writer,
858                            Some(id),
859                            &PayError::InvalidAmount("wait_timeout_s must be >= 1".to_string()),
860                            start,
861                        )
862                        .await;
863                        return;
864                    }
865                    let poll_interval_ms =
866                        wait_poll_interval_ms.unwrap_or(DEFAULT_WAIT_POLL_INTERVAL_MS);
867                    if poll_interval_ms == 0 {
868                        emit_error(
869                            &app.writer,
870                            Some(id),
871                            &PayError::InvalidAmount(
872                                "wait_poll_interval_ms must be >= 1".to_string(),
873                            ),
874                            start,
875                        )
876                        .await;
877                        return;
878                    }
879                    let sync_limit = wait_sync_limit
880                        .unwrap_or(DEFAULT_WAIT_SYNC_LIMIT)
881                        .clamp(1, 5000);
882
883                    if target_network == Network::Sol {
884                        let memo_to_watch = onchain_memo
885                            .as_deref()
886                            .map(str::trim)
887                            .filter(|text| !text.is_empty())
888                            .map(str::to_owned);
889                        let amount_to_watch = amount.as_ref().map(|a| a.value);
890
891                        if memo_to_watch.is_none() && amount_to_watch.is_none() {
892                            emit_error_hint(
893                                &app.writer,
894                                Some(id),
895                                &PayError::InvalidAmount(
896                                    "sol receive --wait requires a match condition".to_string(),
897                                ),
898                                start,
899                                Some("pass --onchain-memo or --amount"),
900                            )
901                            .await;
902                            return;
903                        }
904
905                        let deadline = Instant::now() + Duration::from_secs(timeout_secs);
906                        loop {
907                            match provider.history_list(&wallet_for_call, 200, 0).await {
908                                Ok(items) => {
909                                    let matched = items.into_iter().find(|item| {
910                                        if item.direction != Direction::Receive {
911                                            return false;
912                                        }
913                                        if let Some(ref m) = memo_to_watch {
914                                            item.onchain_memo.as_deref() == Some(m.as_str())
915                                        } else if let Some(expected) = amount_to_watch {
916                                            item.amount.value == expected
917                                        } else {
918                                            false
919                                        }
920                                    });
921                                    if let Some(item) = matched {
922                                        // Check confirmation depth if requested
923                                        if let Some(min_conf) = min_confirmations {
924                                            match provider
925                                                .history_status(&item.transaction_id)
926                                                .await
927                                            {
928                                                Ok(status_info) => {
929                                                    let confs = status_info
930                                                        .confirmations
931                                                        .unwrap_or_else(|| {
932                                                            if status_info.status
933                                                                == TxStatus::Confirmed
934                                                            {
935                                                                min_conf
936                                                            } else {
937                                                                0
938                                                            }
939                                                        });
940                                                    if confs < min_conf {
941                                                        // Not enough confirmations yet, keep polling
942                                                        if Instant::now() >= deadline {
943                                                            let criteria = if let Some(ref m) =
944                                                                memo_to_watch
945                                                            {
946                                                                format!("memo '{m}'")
947                                                            } else {
948                                                                format!(
949                                                                    "amount {}",
950                                                                    amount_to_watch.unwrap_or(0)
951                                                                )
952                                                            };
953                                                            emit_error(
954                                                                &app.writer,
955                                                                Some(id),
956                                                                &PayError::NetworkError(format!(
957                                                                    "wait timeout after {timeout_secs}s: sol transaction {tx} matching {criteria} has {confs}/{min_conf} confirmations",
958                                                                    tx = item.transaction_id,
959                                                                )),
960                                                                start,
961                                                            )
962                                                            .await;
963                                                            break;
964                                                        }
965                                                        sleep(Duration::from_millis(
966                                                            poll_interval_ms,
967                                                        ))
968                                                        .await;
969                                                        continue;
970                                                    }
971                                                    // Enough confirmations — emit with confirmation count
972                                                    let transaction_id =
973                                                        item.transaction_id.clone();
974                                                    let _ = app
975                                                        .writer
976                                                        .send(Output::HistoryStatus {
977                                                            id,
978                                                            transaction_id,
979                                                            status: item.status,
980                                                            confirmations: Some(confs),
981                                                            preimage: item.preimage.clone(),
982                                                            item: Some(item),
983                                                            trace: trace_from(start),
984                                                        })
985                                                        .await;
986                                                    break;
987                                                }
988                                                Err(e) if e.retryable() => {
989                                                    sleep(Duration::from_millis(poll_interval_ms))
990                                                        .await;
991                                                    continue;
992                                                }
993                                                Err(e) => {
994                                                    emit_error(&app.writer, Some(id), &e, start)
995                                                        .await;
996                                                    break;
997                                                }
998                                            }
999                                        } else {
1000                                            let transaction_id = item.transaction_id.clone();
1001                                            let _ = app
1002                                                .writer
1003                                                .send(Output::HistoryStatus {
1004                                                    id,
1005                                                    transaction_id,
1006                                                    status: item.status,
1007                                                    confirmations: None,
1008                                                    preimage: item.preimage.clone(),
1009                                                    item: Some(item),
1010                                                    trace: trace_from(start),
1011                                                })
1012                                                .await;
1013                                            break;
1014                                        }
1015                                    }
1016                                    if Instant::now() >= deadline {
1017                                        let criteria = if let Some(ref m) = memo_to_watch {
1018                                            format!("memo '{m}'")
1019                                        } else {
1020                                            format!("amount {}", amount_to_watch.unwrap_or(0))
1021                                        };
1022                                        emit_error(
1023                                            &app.writer,
1024                                            Some(id),
1025                                            &PayError::NetworkError(format!(
1026                                                "wait timeout after {timeout_secs}s: no incoming sol transaction matching {criteria}"
1027                                            )),
1028                                            start,
1029                                        )
1030                                        .await;
1031                                        break;
1032                                    }
1033                                    sleep(Duration::from_millis(poll_interval_ms)).await;
1034                                }
1035                                Err(e) if e.retryable() => {
1036                                    if Instant::now() >= deadline {
1037                                        let criteria = if let Some(ref m) = memo_to_watch {
1038                                            format!("memo '{m}'")
1039                                        } else {
1040                                            format!("amount {}", amount_to_watch.unwrap_or(0))
1041                                        };
1042                                        emit_error(
1043                                            &app.writer,
1044                                            Some(id),
1045                                            &PayError::NetworkError(format!(
1046                                                "wait timeout after {timeout_secs}s: no incoming sol transaction matching {criteria}"
1047                                            )),
1048                                            start,
1049                                        )
1050                                        .await;
1051                                        break;
1052                                    }
1053                                    sleep(Duration::from_millis(poll_interval_ms)).await;
1054                                }
1055                                Err(e) => {
1056                                    emit_error(&app.writer, Some(id), &e, start).await;
1057                                    break;
1058                                }
1059                            }
1060                        }
1061                        return;
1062                    }
1063
1064                    // EVM: poll balance deltas, then resolve matched on-chain tx hash from history.
1065                    if target_network == Network::Evm {
1066                        let memo_to_watch = onchain_memo
1067                            .as_deref()
1068                            .map(str::trim)
1069                            .filter(|text| !text.is_empty())
1070                            .map(str::to_owned);
1071                        let amount_to_watch = amount.as_ref().map(|a| a.value);
1072                        let token_to_watch = amount.as_ref().map(|a| a.token.to_ascii_lowercase());
1073
1074                        if amount_to_watch.is_none() {
1075                            emit_error_hint(
1076                                &app.writer,
1077                                Some(id),
1078                                &PayError::InvalidAmount(
1079                                    "evm receive --wait requires --amount".to_string(),
1080                                ),
1081                                start,
1082                                Some("pass --amount"),
1083                            )
1084                            .await;
1085                            return;
1086                        }
1087                        let wait_criteria = if let Some(ref memo) = memo_to_watch {
1088                            format!("amount {} and memo '{memo}'", amount_to_watch.unwrap_or(0))
1089                        } else {
1090                            format!("amount {}", amount_to_watch.unwrap_or(0))
1091                        };
1092
1093                        // Snapshot known receives so we only match newly arrived transactions.
1094                        let mut known_receive_ids: HashSet<String> =
1095                            match provider.history_list(&wallet_for_call, 1000, 0).await {
1096                                Ok(items) => items
1097                                    .into_iter()
1098                                    .filter(|item| item.direction == Direction::Receive)
1099                                    .map(|item| item.transaction_id)
1100                                    .collect(),
1101                                Err(_) => HashSet::new(),
1102                            };
1103
1104                        let initial_balance = match provider.balance(&wallet_for_call).await {
1105                            Ok(b) => b,
1106                            Err(e) => {
1107                                emit_error(&app.writer, Some(id), &e, start).await;
1108                                return;
1109                            }
1110                        };
1111
1112                        let deadline = Instant::now() + Duration::from_secs(timeout_secs);
1113                        'evm_wait: loop {
1114                            sleep(Duration::from_millis(poll_interval_ms)).await;
1115                            if Instant::now() >= deadline {
1116                                emit_error(
1117                                    &app.writer,
1118                                    Some(id),
1119                                    &PayError::NetworkError(format!(
1120                                        "wait timeout after {timeout_secs}s: no incoming evm deposit matching {wait_criteria}"
1121                                    )),
1122                                    start,
1123                                )
1124                                .await;
1125                                break;
1126                            }
1127
1128                            let current = match provider.balance(&wallet_for_call).await {
1129                                Ok(current) => current,
1130                                Err(e) if e.retryable() => continue,
1131                                Err(e) => {
1132                                    emit_error(&app.writer, Some(id), &e, start).await;
1133                                    break;
1134                                }
1135                            };
1136
1137                            let native_increase =
1138                                current.confirmed.saturating_sub(initial_balance.confirmed);
1139                            let token_increase =
1140                                current.additional.iter().find_map(|(key, &cur)| {
1141                                    let init =
1142                                        initial_balance.additional.get(key).copied().unwrap_or(0);
1143                                    (cur > init).then_some((key.clone(), cur - init))
1144                                });
1145                            if native_increase == 0 && token_increase.is_none() {
1146                                continue;
1147                            }
1148
1149                            let observed_value = token_increase
1150                                .as_ref()
1151                                .map(|(_, delta)| *delta)
1152                                .unwrap_or(native_increase);
1153                            if let Some(expected) = amount_to_watch {
1154                                if observed_value != expected {
1155                                    continue;
1156                                }
1157                            }
1158
1159                            match provider.history_sync(&wallet_for_call, sync_limit).await {
1160                                Ok(_)
1161                                | Err(PayError::NotImplemented(_))
1162                                | Err(PayError::WalletNotFound(_)) => {}
1163                                Err(e) if e.retryable() => continue,
1164                                Err(e) => {
1165                                    emit_error(&app.writer, Some(id), &e, start).await;
1166                                    break;
1167                                }
1168                            }
1169
1170                            let recent = match provider
1171                                .history_list(&wallet_for_call, sync_limit, 0)
1172                                .await
1173                            {
1174                                Ok(items) => items,
1175                                Err(e) if e.retryable() => continue,
1176                                Err(e) => {
1177                                    emit_error(&app.writer, Some(id), &e, start).await;
1178                                    break;
1179                                }
1180                            };
1181
1182                            let mut matched: Option<HistoryRecord> = None;
1183                            let mut memo_lookup_error: Option<PayError> = None;
1184                            for item in recent.into_iter() {
1185                                if item.direction != Direction::Receive {
1186                                    continue;
1187                                }
1188                                if known_receive_ids.contains(&item.transaction_id) {
1189                                    continue;
1190                                }
1191                                if let Some(expected) = amount_to_watch {
1192                                    if item.amount.value != expected {
1193                                        continue;
1194                                    }
1195                                }
1196                                if let Some(expected_token) = token_to_watch.as_deref() {
1197                                    if !evm_receive_token_matches(
1198                                        expected_token,
1199                                        &item.amount.token,
1200                                    ) {
1201                                        continue;
1202                                    }
1203                                }
1204                                if let Some(expected_memo) = memo_to_watch.as_deref() {
1205                                    let mut memo_matches =
1206                                        item.onchain_memo.as_deref() == Some(expected_memo);
1207                                    if !memo_matches {
1208                                        match provider
1209                                            .history_onchain_memo(
1210                                                &wallet_for_call,
1211                                                &item.transaction_id,
1212                                            )
1213                                            .await
1214                                        {
1215                                            Ok(Some(chain_memo)) => {
1216                                                memo_matches = chain_memo == expected_memo;
1217                                            }
1218                                            Ok(None)
1219                                            | Err(PayError::NotImplemented(_))
1220                                            | Err(PayError::WalletNotFound(_)) => {}
1221                                            Err(e) if e.retryable() => continue 'evm_wait,
1222                                            Err(e) => {
1223                                                memo_lookup_error = Some(e);
1224                                                break;
1225                                            }
1226                                        }
1227                                    }
1228                                    if !memo_matches {
1229                                        continue;
1230                                    }
1231                                }
1232                                matched = Some(item);
1233                                break;
1234                            }
1235                            if let Some(e) = memo_lookup_error {
1236                                emit_error(&app.writer, Some(id), &e, start).await;
1237                                break;
1238                            }
1239                            let Some(item) = matched else {
1240                                continue;
1241                            };
1242
1243                            known_receive_ids.insert(item.transaction_id.clone());
1244                            if let Some(min_conf) = min_confirmations {
1245                                loop {
1246                                    match provider.history_status(&item.transaction_id).await {
1247                                        Ok(status_info) => {
1248                                            let confs =
1249                                                status_info.confirmations.unwrap_or_else(|| {
1250                                                    if status_info.status == TxStatus::Confirmed {
1251                                                        min_conf
1252                                                    } else {
1253                                                        0
1254                                                    }
1255                                                });
1256                                            if confs >= min_conf {
1257                                                let _ = app
1258                                                    .writer
1259                                                    .send(Output::HistoryStatus {
1260                                                        id,
1261                                                        transaction_id: status_info.transaction_id,
1262                                                        status: status_info.status,
1263                                                        confirmations: Some(confs),
1264                                                        preimage: status_info.preimage,
1265                                                        item: status_info.item.or(Some(item)),
1266                                                        trace: trace_from(start),
1267                                                    })
1268                                                    .await;
1269                                                break 'evm_wait;
1270                                            }
1271                                            if Instant::now() >= deadline {
1272                                                emit_error(
1273                                                    &app.writer,
1274                                                    Some(id),
1275                                                    &PayError::NetworkError(format!(
1276                                                        "wait timeout after {timeout_secs}s: evm transaction {tx} matching {wait_criteria} has {confs}/{min_conf} confirmations",
1277                                                        tx = item.transaction_id
1278                                                    )),
1279                                                    start,
1280                                                )
1281                                                .await;
1282                                                break 'evm_wait;
1283                                            }
1284                                            sleep(Duration::from_millis(poll_interval_ms)).await;
1285                                        }
1286                                        Err(e) if e.retryable() => {
1287                                            if Instant::now() >= deadline {
1288                                                emit_error(&app.writer, Some(id), &e, start).await;
1289                                                break 'evm_wait;
1290                                            }
1291                                            sleep(Duration::from_millis(poll_interval_ms)).await;
1292                                        }
1293                                        Err(e) => {
1294                                            emit_error(&app.writer, Some(id), &e, start).await;
1295                                            break 'evm_wait;
1296                                        }
1297                                    }
1298                                }
1299                            } else {
1300                                match provider.history_status(&item.transaction_id).await {
1301                                    Ok(status_info) => {
1302                                        let _ = app
1303                                            .writer
1304                                            .send(Output::HistoryStatus {
1305                                                id,
1306                                                transaction_id: status_info.transaction_id,
1307                                                status: status_info.status,
1308                                                confirmations: status_info.confirmations,
1309                                                preimage: status_info.preimage,
1310                                                item: status_info.item.or(Some(item)),
1311                                                trace: trace_from(start),
1312                                            })
1313                                            .await;
1314                                        break;
1315                                    }
1316                                    Err(e) if e.retryable() => continue,
1317                                    Err(e) => {
1318                                        emit_error(&app.writer, Some(id), &e, start).await;
1319                                        break;
1320                                    }
1321                                }
1322                            }
1323                        }
1324                        return;
1325                    }
1326
1327                    // BTC: poll balance deltas, then resolve matched on-chain txid from history.
1328                    if target_network == Network::Btc {
1329                        let amount_to_watch = amount.as_ref().map(|a| a.value).filter(|v| *v > 0);
1330                        let mut known_receive_ids: HashSet<String> =
1331                            match provider.history_list(&wallet_for_call, 1000, 0).await {
1332                                Ok(items) => items
1333                                    .into_iter()
1334                                    .filter(|item| item.direction == Direction::Receive)
1335                                    .map(|item| item.transaction_id)
1336                                    .collect(),
1337                                Err(_) => HashSet::new(),
1338                            };
1339                        let initial_balance = match provider.balance(&wallet_for_call).await {
1340                            Ok(b) => b,
1341                            Err(e) => {
1342                                emit_error(&app.writer, Some(id), &e, start).await;
1343                                return;
1344                            }
1345                        };
1346
1347                        let deadline = Instant::now() + Duration::from_secs(timeout_secs);
1348                        loop {
1349                            sleep(Duration::from_millis(poll_interval_ms)).await;
1350                            if Instant::now() >= deadline {
1351                                let criteria = if let Some(expected) = amount_to_watch {
1352                                    format!("amount {expected}")
1353                                } else {
1354                                    "any incoming amount".to_string()
1355                                };
1356                                emit_error(
1357                                    &app.writer,
1358                                    Some(id),
1359                                    &PayError::NetworkError(format!(
1360                                        "wait timeout after {timeout_secs}s: no incoming btc transaction matching {criteria}"
1361                                    )),
1362                                    start,
1363                                )
1364                                .await;
1365                                break;
1366                            }
1367
1368                            let current = match provider.balance(&wallet_for_call).await {
1369                                Ok(current) => current,
1370                                Err(e) if e.retryable() => continue,
1371                                Err(e) => {
1372                                    emit_error(&app.writer, Some(id), &e, start).await;
1373                                    break;
1374                                }
1375                            };
1376                            let confirmed_delta =
1377                                current.confirmed.saturating_sub(initial_balance.confirmed);
1378                            let pending_delta =
1379                                current.pending.saturating_sub(initial_balance.pending);
1380                            let observed_delta = confirmed_delta.saturating_add(pending_delta);
1381                            if observed_delta == 0 {
1382                                continue;
1383                            }
1384                            if let Some(expected) = amount_to_watch {
1385                                if observed_delta != expected {
1386                                    continue;
1387                                }
1388                            }
1389
1390                            match provider.history_sync(&wallet_for_call, sync_limit).await {
1391                                Ok(_)
1392                                | Err(PayError::NotImplemented(_))
1393                                | Err(PayError::WalletNotFound(_)) => {}
1394                                Err(e) if e.retryable() => continue,
1395                                Err(e) => {
1396                                    emit_error(&app.writer, Some(id), &e, start).await;
1397                                    break;
1398                                }
1399                            }
1400
1401                            let recent = match provider
1402                                .history_list(&wallet_for_call, sync_limit, 0)
1403                                .await
1404                            {
1405                                Ok(items) => items,
1406                                Err(e) if e.retryable() => continue,
1407                                Err(e) => {
1408                                    emit_error(&app.writer, Some(id), &e, start).await;
1409                                    break;
1410                                }
1411                            };
1412
1413                            let matched = recent.into_iter().find(|item| {
1414                                if item.direction != Direction::Receive {
1415                                    return false;
1416                                }
1417                                if known_receive_ids.contains(&item.transaction_id) {
1418                                    return false;
1419                                }
1420                                if let Some(expected) = amount_to_watch {
1421                                    if item.amount.value != expected {
1422                                        return false;
1423                                    }
1424                                }
1425                                true
1426                            });
1427
1428                            let Some(item) = matched else {
1429                                continue;
1430                            };
1431
1432                            known_receive_ids.insert(item.transaction_id.clone());
1433                            match provider.history_status(&item.transaction_id).await {
1434                                Ok(status_info) => {
1435                                    let _ = app
1436                                        .writer
1437                                        .send(Output::HistoryStatus {
1438                                            id,
1439                                            transaction_id: status_info.transaction_id,
1440                                            status: status_info.status,
1441                                            confirmations: status_info.confirmations,
1442                                            preimage: status_info.preimage,
1443                                            item: status_info.item.or(Some(item)),
1444                                            trace: trace_from(start),
1445                                        })
1446                                        .await;
1447                                    break;
1448                                }
1449                                Err(e) if e.retryable() => continue,
1450                                Err(e) => {
1451                                    emit_error(&app.writer, Some(id), &e, start).await;
1452                                    break;
1453                                }
1454                            }
1455                        }
1456                        return;
1457                    }
1458
1459                    let Some(quote_id) = quote_id else {
1460                        emit_error(
1461                            &app.writer,
1462                            Some(id),
1463                            &PayError::InternalError(
1464                                "deposit response missing quote_id/payment_hash".to_string(),
1465                            ),
1466                            start,
1467                        )
1468                        .await;
1469                        return;
1470                    };
1471
1472                    let deadline = Instant::now() + Duration::from_secs(timeout_secs);
1473                    loop {
1474                        match provider.receive_claim(&wallet_for_call, &quote_id).await {
1475                            Ok(claimed) => {
1476                                let _ = app
1477                                    .writer
1478                                    .send(Output::ReceiveClaimed {
1479                                        id,
1480                                        wallet: wallet_for_call.clone(),
1481                                        amount: Amount {
1482                                            value: claimed,
1483                                            token: "sats".to_string(),
1484                                        },
1485                                        trace: trace_from(start),
1486                                    })
1487                                    .await;
1488                                break;
1489                            }
1490                            Err(e) if e.retryable() => {
1491                                if Instant::now() >= deadline {
1492                                    emit_error(
1493                                        &app.writer,
1494                                        Some(id),
1495                                        &PayError::NetworkError(format!(
1496                                            "wait-until-paid timeout after {timeout_secs}s"
1497                                        )),
1498                                        start,
1499                                    )
1500                                    .await;
1501                                    break;
1502                                }
1503                                sleep(Duration::from_millis(poll_interval_ms)).await;
1504                            }
1505                            Err(e) => {
1506                                emit_error(&app.writer, Some(id), &e, start).await;
1507                                break;
1508                            }
1509                        }
1510                    }
1511                }
1512                Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
1513            }
1514        }
1515
1516        Input::ReceiveClaim {
1517            id,
1518            wallet,
1519            quote_id,
1520        } => {
1521            let start = Instant::now();
1522            emit_log(
1523                app,
1524                "wallet",
1525                Some(id.clone()),
1526                serde_json::json!({
1527                    "operation": "receive_claim", "wallet": &wallet, "quote_id": &quote_id,
1528                }),
1529            )
1530            .await;
1531            let meta = match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
1532                Ok(m) => m,
1533                Err(e) => {
1534                    emit_error(&app.writer, Some(id), &e, start).await;
1535                    return;
1536                }
1537            };
1538            let Some(provider) = get_provider(&app.providers, meta.network) else {
1539                emit_error(
1540                    &app.writer,
1541                    Some(id),
1542                    &PayError::NotImplemented(format!("no provider for {}", meta.network)),
1543                    start,
1544                )
1545                .await;
1546                return;
1547            };
1548
1549            match provider.receive_claim(&wallet, &quote_id).await {
1550                Ok(claimed) => {
1551                    let _ = app
1552                        .writer
1553                        .send(Output::ReceiveClaimed {
1554                            id,
1555                            wallet,
1556                            amount: Amount {
1557                                value: claimed,
1558                                token: "sats".to_string(),
1559                            },
1560                            trace: trace_from(start),
1561                        })
1562                        .await;
1563                }
1564                Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
1565            }
1566        }
1567
1568        Input::CashuSend {
1569            id,
1570            wallet,
1571            amount,
1572            onchain_memo,
1573            local_memo,
1574            mints,
1575        } => {
1576            let start = Instant::now();
1577            emit_log(
1578                app,
1579                "pay",
1580                Some(id.clone()),
1581                serde_json::json!({
1582                    "operation": "cashu_send", "wallet": wallet.as_deref().unwrap_or("auto"),
1583                    "amount": amount.value, "onchain_memo": onchain_memo.as_deref().unwrap_or(""),
1584                    "mints": mints.as_deref().unwrap_or(&[]),
1585                }),
1586            )
1587            .await;
1588
1589            let wallet_str = wallet.unwrap_or_default();
1590            let mints_ref = mints.as_deref();
1591            let Some(provider) = get_provider(&app.providers, Network::Cashu) else {
1592                emit_error(
1593                    &app.writer,
1594                    Some(id),
1595                    &PayError::NotImplemented("no provider for cashu".to_string()),
1596                    start,
1597                )
1598                .await;
1599                return;
1600            };
1601
1602            let mut reservation_id: Option<u64> = None;
1603            if app.enforce_limits {
1604                let spend_ctx = SpendContext {
1605                    network: "cashu".to_string(),
1606                    wallet: if wallet_str.is_empty() {
1607                        None
1608                    } else {
1609                        Some(wallet_str.clone())
1610                    },
1611                    amount_native: amount.value,
1612                    token: None,
1613                };
1614                match app
1615                    .spend_ledger
1616                    .reserve(&format!("cashu_send:{id}"), &spend_ctx)
1617                    .await
1618                {
1619                    Ok(rid) => reservation_id = Some(rid),
1620                    Err(e) => {
1621                        if let PayError::LimitExceeded {
1622                            rule_id,
1623                            scope,
1624                            scope_key,
1625                            spent,
1626                            max_spend,
1627                            token,
1628                            remaining_s,
1629                            origin,
1630                        } = &e
1631                        {
1632                            let _ = app
1633                                .writer
1634                                .send(Output::LimitExceeded {
1635                                    id,
1636                                    rule_id: rule_id.clone(),
1637                                    scope: *scope,
1638                                    scope_key: scope_key.clone(),
1639                                    spent: *spent,
1640                                    max_spend: *max_spend,
1641                                    token: token.clone(),
1642                                    remaining_s: *remaining_s,
1643                                    origin: origin.clone(),
1644                                    trace: trace_from(start),
1645                                })
1646                                .await;
1647                        } else {
1648                            emit_error(&app.writer, Some(id), &e, start).await;
1649                        }
1650                        return;
1651                    }
1652                }
1653            }
1654
1655            match provider
1656                .cashu_send(
1657                    &wallet_str,
1658                    amount.clone(),
1659                    onchain_memo.as_deref(),
1660                    mints_ref,
1661                )
1662                .await
1663            {
1664                Ok(r) => {
1665                    if let Some(rid) = reservation_id {
1666                        let _ = app.spend_ledger.confirm(rid).await;
1667                    }
1668                    if local_memo.is_some() {
1669                        if let Some(s) = &app.store {
1670                            let _ = s.update_transaction_record_memo(
1671                                &r.transaction_id,
1672                                local_memo.as_ref(),
1673                            );
1674                        }
1675                    }
1676                    let _ = app
1677                        .writer
1678                        .send(Output::CashuSent {
1679                            id,
1680                            wallet: r.wallet,
1681                            transaction_id: r.transaction_id,
1682                            status: r.status,
1683                            fee: r.fee,
1684                            token: r.token,
1685                            trace: trace_from(start),
1686                        })
1687                        .await;
1688                }
1689                Err(e) => {
1690                    if let Some(rid) = reservation_id {
1691                        let _ = app.spend_ledger.cancel(rid).await;
1692                    }
1693                    emit_error(&app.writer, Some(id), &e, start).await
1694                }
1695            }
1696        }
1697
1698        Input::CashuReceive { id, wallet, token } => {
1699            let start = Instant::now();
1700            let token_preview = if token.len() > 20 {
1701                format!("{}...", &token[..20])
1702            } else {
1703                token.clone()
1704            };
1705            emit_log(
1706                app,
1707                "pay",
1708                Some(id.clone()),
1709                serde_json::json!({
1710                    "operation": "cashu_receive", "wallet": wallet.as_deref().unwrap_or("auto"), "token": token_preview,
1711                }),
1712            )
1713            .await;
1714            let wallet_str = wallet.unwrap_or_default();
1715            let Some(provider) = get_provider(&app.providers, Network::Cashu) else {
1716                emit_error(
1717                    &app.writer,
1718                    Some(id),
1719                    &PayError::NotImplemented("no provider for cashu".to_string()),
1720                    start,
1721                )
1722                .await;
1723                return;
1724            };
1725            match provider.cashu_receive(&wallet_str, &token).await {
1726                Ok(r) => {
1727                    let _ = app
1728                        .writer
1729                        .send(Output::CashuReceived {
1730                            id,
1731                            wallet: r.wallet,
1732                            amount: r.amount,
1733                            trace: trace_from(start),
1734                        })
1735                        .await;
1736                }
1737                Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
1738            }
1739        }
1740
1741        Input::Send {
1742            id,
1743            wallet,
1744            network,
1745            to,
1746            onchain_memo,
1747            local_memo,
1748            mints,
1749        } => {
1750            let start = Instant::now();
1751            let operation_name = "send";
1752            let to_preview = if to.len() > 20 {
1753                format!("{}...", &to[..20])
1754            } else {
1755                to.clone()
1756            };
1757            emit_log(
1758                app,
1759                "pay",
1760                Some(id.clone()),
1761                serde_json::json!({
1762                    "operation": operation_name, "wallet": wallet.as_deref().unwrap_or("auto"),
1763                    "network": network.map(|c| c.to_string()).unwrap_or_else(|| "auto".to_string()),
1764                    "to": to_preview, "onchain_memo": onchain_memo.as_deref().unwrap_or(""),
1765                }),
1766            )
1767            .await;
1768
1769            let (target_network, wallet_for_call) = if let Some(w) =
1770                wallet.filter(|w| !w.is_empty())
1771            {
1772                let meta = match require_store(app).and_then(|s| s.load_wallet_metadata(&w)) {
1773                    Ok(m) => m,
1774                    Err(e) => {
1775                        emit_error(&app.writer, Some(id), &e, start).await;
1776                        return;
1777                    }
1778                };
1779                if let Some(expected) = network {
1780                    if meta.network != expected {
1781                        emit_error(
1782                            &app.writer,
1783                            Some(id),
1784                            &PayError::InvalidAmount(format!(
1785                                "wallet {w} is {}, not {expected}",
1786                                meta.network
1787                            )),
1788                            start,
1789                        )
1790                        .await;
1791                        return;
1792                    }
1793                }
1794                (meta.network, w)
1795            } else {
1796                let wallets = match require_store(app).and_then(|s| s.list_wallet_metadata(network))
1797                {
1798                    Ok(v) => v,
1799                    Err(e) => {
1800                        emit_error(&app.writer, Some(id), &e, start).await;
1801                        return;
1802                    }
1803                };
1804
1805                // For cashu with --cashu-mint or multiple wallets: filter by mint,
1806                // then select wallet with smallest sufficient balance.
1807                let is_cashu = matches!(network, Some(Network::Cashu));
1808                let filtered: Vec<_> = if is_cashu {
1809                    if let Some(ref mint_list) = mints {
1810                        wallets
1811                            .into_iter()
1812                            .filter(|w| {
1813                                w.mint_url.as_deref().is_some_and(|u| {
1814                                    let nu = u.trim().trim_end_matches('/');
1815                                    mint_list
1816                                        .iter()
1817                                        .any(|m| m.trim().trim_end_matches('/') == nu)
1818                                })
1819                            })
1820                            .collect()
1821                    } else {
1822                        wallets
1823                    }
1824                } else {
1825                    wallets
1826                };
1827
1828                match filtered.len() {
1829                    0 => {
1830                        let msg = if mints.is_some() {
1831                            "no cashu wallet found matching --cashu-mint".to_string()
1832                        } else {
1833                            match network {
1834                                Some(network) => format!("no {network} wallet found"),
1835                                None => "no wallet found".to_string(),
1836                            }
1837                        };
1838                        emit_error(&app.writer, Some(id), &PayError::WalletNotFound(msg), start)
1839                            .await;
1840                        return;
1841                    }
1842                    1 => (filtered[0].network, filtered[0].id.clone()),
1843                    _ if is_cashu => {
1844                        // Multiple cashu wallets: select by smallest sufficient balance.
1845                        // Pass empty wallet to provider — it will use select_wallet_by_balance.
1846                        (Network::Cashu, String::new())
1847                    }
1848                    _ => {
1849                        let msg = match network {
1850                            Some(network) => {
1851                                format!("multiple {network} wallets found; pass --wallet")
1852                            }
1853                            None => "multiple wallets found; pass --wallet".to_string(),
1854                        };
1855                        emit_error(&app.writer, Some(id), &PayError::InvalidAmount(msg), start)
1856                            .await;
1857                        return;
1858                    }
1859                }
1860            };
1861
1862            let Some(provider) = get_provider(&app.providers, target_network) else {
1863                emit_error(
1864                    &app.writer,
1865                    Some(id),
1866                    &PayError::NotImplemented(format!("no provider for {target_network}")),
1867                    start,
1868                )
1869                .await;
1870                return;
1871            };
1872
1873            let mut reservation_id: Option<u64> = None;
1874            if app.enforce_limits {
1875                let quote = match provider
1876                    .send_quote(&wallet_for_call, &to, mints.as_deref())
1877                    .await
1878                {
1879                    Ok(q) => q,
1880                    Err(e) => {
1881                        emit_error(&app.writer, Some(id), &e, start).await;
1882                        return;
1883                    }
1884                };
1885                let spend_amount = quote.amount_native + quote.fee_estimate_native;
1886                let provider_key = require_store(app)
1887                    .and_then(|s| s.load_wallet_metadata(&quote.wallet))
1888                    .ok()
1889                    .map(|meta| wallet_provider_key(&meta))
1890                    .unwrap_or_else(|| target_network.to_string());
1891                let spend_ctx = SpendContext {
1892                    network: provider_key,
1893                    wallet: Some(quote.wallet.clone()),
1894                    amount_native: spend_amount,
1895                    token: extract_token_from_target(&to),
1896                };
1897                match app
1898                    .spend_ledger
1899                    .reserve(&format!("send:{id}"), &spend_ctx)
1900                    .await
1901                {
1902                    Ok(rid) => reservation_id = Some(rid),
1903                    Err(e) => {
1904                        if let PayError::LimitExceeded {
1905                            rule_id,
1906                            scope,
1907                            scope_key,
1908                            spent,
1909                            max_spend,
1910                            token,
1911                            remaining_s,
1912                            origin,
1913                        } = &e
1914                        {
1915                            let _ = app
1916                                .writer
1917                                .send(Output::LimitExceeded {
1918                                    id,
1919                                    rule_id: rule_id.clone(),
1920                                    scope: *scope,
1921                                    scope_key: scope_key.clone(),
1922                                    spent: *spent,
1923                                    max_spend: *max_spend,
1924                                    token: token.clone(),
1925                                    remaining_s: *remaining_s,
1926                                    origin: origin.clone(),
1927                                    trace: trace_from(start),
1928                                })
1929                                .await;
1930                        } else {
1931                            emit_error(&app.writer, Some(id), &e, start).await;
1932                        }
1933                        return;
1934                    }
1935                }
1936            }
1937            match provider
1938                .send(
1939                    &wallet_for_call,
1940                    &to,
1941                    onchain_memo.as_deref(),
1942                    mints.as_deref(),
1943                )
1944                .await
1945            {
1946                Ok(r) => {
1947                    if let Some(rid) = reservation_id {
1948                        let _ = app.spend_ledger.confirm(rid).await;
1949                    }
1950                    if local_memo.is_some() {
1951                        if let Some(s) = &app.store {
1952                            let _ = s.update_transaction_record_memo(
1953                                &r.transaction_id,
1954                                local_memo.as_ref(),
1955                            );
1956                        }
1957                    }
1958                    let _ = app
1959                        .writer
1960                        .send(Output::Sent {
1961                            id,
1962                            wallet: r.wallet,
1963                            transaction_id: r.transaction_id,
1964                            amount: r.amount,
1965                            fee: r.fee,
1966                            preimage: r.preimage,
1967                            trace: trace_from(start),
1968                        })
1969                        .await;
1970                }
1971                Err(e) => {
1972                    if let Some(rid) = reservation_id {
1973                        let _ = app.spend_ledger.cancel(rid).await;
1974                    }
1975                    emit_error(&app.writer, Some(id), &e, start).await
1976                }
1977            }
1978        }
1979
1980        Input::Restore { id, wallet } => {
1981            let start = Instant::now();
1982            emit_log(
1983                app,
1984                "wallet",
1985                Some(id.clone()),
1986                serde_json::json!({
1987                    "operation": "restore", "wallet": &wallet,
1988                }),
1989            )
1990            .await;
1991            match try_provider!(&app.providers, |p| p.restore(&wallet)) {
1992                Ok(r) => {
1993                    let _ = app
1994                        .writer
1995                        .send(Output::Restored {
1996                            id,
1997                            wallet: r.wallet,
1998                            unspent: r.unspent,
1999                            spent: r.spent,
2000                            pending: r.pending,
2001                            unit: r.unit,
2002                            trace: trace_from(start),
2003                        })
2004                        .await;
2005                }
2006                Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2007            }
2008        }
2009
2010        Input::WalletShowSeed { id, wallet } => {
2011            let start = Instant::now();
2012            emit_log(
2013                app,
2014                "wallet",
2015                Some(id.clone()),
2016                serde_json::json!({
2017                    "operation": "wallet_show_seed", "wallet": &wallet,
2018                }),
2019            )
2020            .await;
2021            match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
2022                Ok(meta) => match meta.network {
2023                    Network::Cashu => match meta.seed_secret {
2024                        Some(mnemonic) => {
2025                            let _ = app
2026                                .writer
2027                                .send(Output::WalletSeed {
2028                                    id,
2029                                    wallet,
2030                                    mnemonic_secret: mnemonic,
2031                                    trace: trace_from(start),
2032                                })
2033                                .await;
2034                        }
2035                        None => {
2036                            emit_error(
2037                                &app.writer,
2038                                Some(id),
2039                                &PayError::InternalError("wallet has no seed".to_string()),
2040                                start,
2041                            )
2042                            .await;
2043                        }
2044                    },
2045                    Network::Sol => match meta.seed_secret {
2046                        Some(secret) => {
2047                            if looks_like_bip39_mnemonic(&secret) {
2048                                let _ = app
2049                                    .writer
2050                                    .send(Output::WalletSeed {
2051                                        id,
2052                                        wallet,
2053                                        mnemonic_secret: secret,
2054                                        trace: trace_from(start),
2055                                    })
2056                                    .await;
2057                            } else {
2058                                emit_error(
2059                                        &app.writer,
2060                                        Some(id),
2061                                        &PayError::InvalidAmount(
2062                                            "this sol wallet was created before mnemonic support; create a new sol wallet to get 12-word backup".to_string(),
2063                                        ),
2064                                        start,
2065                                    )
2066                                    .await;
2067                            }
2068                        }
2069                        None => {
2070                            emit_error(
2071                                &app.writer,
2072                                Some(id),
2073                                &PayError::InternalError("wallet has no seed".to_string()),
2074                                start,
2075                            )
2076                            .await;
2077                        }
2078                    },
2079                    Network::Ln => {
2080                        emit_error(
2081                                &app.writer,
2082                                Some(id),
2083                                &PayError::InvalidAmount(
2084                                    "ln wallets do not have mnemonic words; they store backend credentials (nwc-uri/password/admin-key)".to_string(),
2085                                ),
2086                                start,
2087                            )
2088                            .await;
2089                    }
2090                    Network::Evm | Network::Btc => match meta.seed_secret {
2091                        Some(secret) => {
2092                            if looks_like_bip39_mnemonic(&secret) {
2093                                let _ = app
2094                                    .writer
2095                                    .send(Output::WalletSeed {
2096                                        id,
2097                                        wallet,
2098                                        mnemonic_secret: secret,
2099                                        trace: trace_from(start),
2100                                    })
2101                                    .await;
2102                            } else {
2103                                emit_error(
2104                                        &app.writer,
2105                                        Some(id),
2106                                        &PayError::InvalidAmount(
2107                                            "this wallet was created before mnemonic support; create a new wallet to get 12-word backup".to_string(),
2108                                        ),
2109                                        start,
2110                                    )
2111                                    .await;
2112                            }
2113                        }
2114                        None => {
2115                            emit_error(
2116                                &app.writer,
2117                                Some(id),
2118                                &PayError::InternalError("wallet has no seed".to_string()),
2119                                start,
2120                            )
2121                            .await;
2122                        }
2123                    },
2124                },
2125                Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2126            }
2127        }
2128
2129        Input::HistoryList {
2130            id,
2131            wallet,
2132            network,
2133            onchain_memo,
2134            limit,
2135            offset,
2136            since_epoch_s,
2137            until_epoch_s,
2138        } => {
2139            let start = Instant::now();
2140            let lim = limit.unwrap_or(20);
2141            let off = offset.unwrap_or(0);
2142            let memo_filter = onchain_memo
2143                .as_deref()
2144                .map(str::trim)
2145                .filter(|value| !value.is_empty())
2146                .map(str::to_owned);
2147            let store = match require_store(app) {
2148                Ok(store) => store,
2149                Err(e) => {
2150                    emit_error(&app.writer, Some(id), &e, start).await;
2151                    return;
2152                }
2153            };
2154
2155            let mut all_txs = Vec::new();
2156            if let Some(wallet_id) = wallet {
2157                let meta = match store.load_wallet_metadata(&wallet_id) {
2158                    Ok(meta) => meta,
2159                    Err(e) => {
2160                        emit_error(&app.writer, Some(id), &e, start).await;
2161                        return;
2162                    }
2163                };
2164                if let Some(expected_network) = network {
2165                    if meta.network != expected_network {
2166                        let _ = app
2167                            .writer
2168                            .send(Output::History {
2169                                id,
2170                                items: Vec::new(),
2171                                trace: trace_from(start),
2172                            })
2173                            .await;
2174                        return;
2175                    }
2176                }
2177                match store.load_wallet_transaction_records(&wallet_id) {
2178                    Ok(mut records) => all_txs.append(&mut records),
2179                    Err(e) => {
2180                        emit_error(&app.writer, Some(id), &e, start).await;
2181                        return;
2182                    }
2183                }
2184            } else {
2185                let wallets = match store.list_wallet_metadata(network) {
2186                    Ok(wallets) => wallets,
2187                    Err(e) => {
2188                        emit_error(&app.writer, Some(id), &e, start).await;
2189                        return;
2190                    }
2191                };
2192                for wallet_meta in wallets {
2193                    match store.load_wallet_transaction_records(&wallet_meta.id) {
2194                        Ok(mut records) => all_txs.append(&mut records),
2195                        Err(e) => {
2196                            emit_error(&app.writer, Some(id.clone()), &e, start).await;
2197                            return;
2198                        }
2199                    }
2200                }
2201            }
2202
2203            if let Some(expected_network) = network {
2204                all_txs.retain(|item| item.network == expected_network);
2205            }
2206            if let Some(since) = since_epoch_s {
2207                all_txs.retain(|item| item.created_at_epoch_s >= since);
2208            }
2209            if let Some(until) = until_epoch_s {
2210                all_txs.retain(|item| item.created_at_epoch_s < until);
2211            }
2212            if let Some(filter) = memo_filter.as_deref() {
2213                all_txs.retain(|item| item.onchain_memo.as_deref() == Some(filter));
2214            }
2215            all_txs.sort_by(|a, b| b.created_at_epoch_s.cmp(&a.created_at_epoch_s));
2216            let start_idx = all_txs.len().min(off);
2217            let end_idx = all_txs.len().min(off.saturating_add(lim));
2218            let items = all_txs[start_idx..end_idx].to_vec();
2219            let _ = app
2220                .writer
2221                .send(Output::History {
2222                    id,
2223                    items,
2224                    trace: trace_from(start),
2225                })
2226                .await;
2227        }
2228
2229        Input::HistoryStatus { id, transaction_id } => {
2230            let start = Instant::now();
2231            let mut routed: Option<Result<HistoryStatusInfo, PayError>> = None;
2232            for provider in app.providers.values() {
2233                match provider.history_status(&transaction_id).await {
2234                    Ok(info) => {
2235                        routed = Some(Ok(info));
2236                        break;
2237                    }
2238                    Err(PayError::NotImplemented(_)) | Err(PayError::WalletNotFound(_)) => {}
2239                    Err(err) => {
2240                        routed = Some(Err(err));
2241                        break;
2242                    }
2243                }
2244            }
2245            match routed.unwrap_or_else(|| {
2246                Err(PayError::WalletNotFound(format!(
2247                    "transaction {transaction_id} not found"
2248                )))
2249            }) {
2250                Ok(info) => {
2251                    let _ = app
2252                        .writer
2253                        .send(Output::HistoryStatus {
2254                            id,
2255                            transaction_id: info.transaction_id,
2256                            status: info.status,
2257                            confirmations: info.confirmations,
2258                            preimage: info.preimage,
2259                            item: info.item,
2260                            trace: trace_from(start),
2261                        })
2262                        .await;
2263                }
2264                Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2265            }
2266        }
2267
2268        Input::HistoryUpdate {
2269            id,
2270            wallet,
2271            network,
2272            limit,
2273        } => {
2274            let start = Instant::now();
2275            if let Err(e) = require_store(app) {
2276                emit_error(&app.writer, Some(id), &e, start).await;
2277                return;
2278            }
2279
2280            let sync_limit = limit.unwrap_or(200).clamp(1, 5000);
2281            let mut totals = HistorySyncStats::default();
2282            let mut wallets_synced = 0usize;
2283
2284            if let Some(wallet_id) = wallet {
2285                let sync_result = if let Some(expected_network) = network {
2286                    match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet_id)) {
2287                        Ok(meta) if meta.network != expected_network => {
2288                            Err(PayError::InvalidAmount(format!(
2289                                "wallet {wallet_id} belongs to {}, not {expected_network}",
2290                                meta.network
2291                            )))
2292                        }
2293                        Ok(_) => match get_provider(&app.providers, expected_network) {
2294                            Some(provider) => provider.history_sync(&wallet_id, sync_limit).await,
2295                            None => Err(PayError::NotImplemented(format!(
2296                                "network {expected_network} not enabled"
2297                            ))),
2298                        },
2299                        Err(e) => Err(e),
2300                    }
2301                } else {
2302                    match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet_id)) {
2303                        Ok(meta) => match get_provider(&app.providers, meta.network) {
2304                            Some(provider) => provider.history_sync(&wallet_id, sync_limit).await,
2305                            None => Err(PayError::NotImplemented(format!(
2306                                "network {} not enabled",
2307                                meta.network
2308                            ))),
2309                        },
2310                        Err(e) => Err(e),
2311                    }
2312                };
2313
2314                match sync_result {
2315                    Ok(stats) => {
2316                        wallets_synced = 1;
2317                        totals.records_scanned =
2318                            totals.records_scanned.saturating_add(stats.records_scanned);
2319                        totals.records_added =
2320                            totals.records_added.saturating_add(stats.records_added);
2321                        totals.records_updated =
2322                            totals.records_updated.saturating_add(stats.records_updated);
2323                    }
2324                    Err(e) => {
2325                        emit_error(&app.writer, Some(id), &e, start).await;
2326                        return;
2327                    }
2328                }
2329            } else {
2330                let target_networks: Vec<Network> = if let Some(single) = network {
2331                    vec![single]
2332                } else {
2333                    vec![
2334                        Network::Cashu,
2335                        Network::Ln,
2336                        Network::Sol,
2337                        Network::Evm,
2338                        Network::Btc,
2339                    ]
2340                };
2341
2342                let wallets = match require_store(app).and_then(|s| s.list_wallet_metadata(None)) {
2343                    Ok(wallets) => wallets,
2344                    Err(e) => {
2345                        emit_error(&app.writer, Some(id), &e, start).await;
2346                        return;
2347                    }
2348                };
2349
2350                for network_key in target_networks {
2351                    let Some(provider) = get_provider(&app.providers, network_key) else {
2352                        if network.is_some() {
2353                            emit_error(
2354                                &app.writer,
2355                                Some(id),
2356                                &PayError::NotImplemented(format!(
2357                                    "network {network_key} not enabled"
2358                                )),
2359                                start,
2360                            )
2361                            .await;
2362                            return;
2363                        }
2364                        continue;
2365                    };
2366                    for wallet_meta in &wallets {
2367                        if wallet_meta.network != network_key {
2368                            continue;
2369                        }
2370                        match provider.history_sync(&wallet_meta.id, sync_limit).await {
2371                            Ok(stats) => {
2372                                wallets_synced = wallets_synced.saturating_add(1);
2373                                totals.records_scanned =
2374                                    totals.records_scanned.saturating_add(stats.records_scanned);
2375                                totals.records_added =
2376                                    totals.records_added.saturating_add(stats.records_added);
2377                                totals.records_updated =
2378                                    totals.records_updated.saturating_add(stats.records_updated);
2379                            }
2380                            Err(PayError::NotImplemented(_)) | Err(PayError::WalletNotFound(_)) => {
2381                            }
2382                            Err(e) => {
2383                                emit_error(&app.writer, Some(id), &e, start).await;
2384                                return;
2385                            }
2386                        }
2387                    }
2388                }
2389            }
2390
2391            let _ = app
2392                .writer
2393                .send(Output::HistoryUpdated {
2394                    id,
2395                    wallets_synced,
2396                    records_scanned: totals.records_scanned,
2397                    records_added: totals.records_added,
2398                    records_updated: totals.records_updated,
2399                    trace: trace_from(start),
2400                })
2401                .await;
2402        }
2403
2404        Input::LimitAdd { id, mut limit } => {
2405            let start = Instant::now();
2406            if !app.enforce_limits {
2407                emit_error(
2408                    &app.writer,
2409                    Some(id),
2410                    &PayError::NotImplemented(
2411                        "limit_add is unavailable when limits are not enforced locally; configure limits on the RPC daemon"
2412                            .to_string(),
2413                    ),
2414                    start,
2415                )
2416                .await;
2417                return;
2418            }
2419
2420            // Auto-fill provider for wallet-scope rules that don't have one
2421            if limit.scope == SpendScope::Wallet && limit.network.is_none() {
2422                if let Some(wallet_id) = limit.wallet.as_deref() {
2423                    match require_store(app).and_then(|s| s.load_wallet_metadata(wallet_id)) {
2424                        Ok(meta) => {
2425                            limit.network = Some(meta.network.to_string());
2426                        }
2427                        Err(e) => {
2428                            emit_error(&app.writer, Some(id), &e, start).await;
2429                            return;
2430                        }
2431                    }
2432                }
2433            }
2434
2435            match app.spend_ledger.add_limit(&mut limit).await {
2436                Ok(rule_id) => {
2437                    let _ = app
2438                        .writer
2439                        .send(Output::LimitAdded {
2440                            id,
2441                            rule_id,
2442                            trace: trace_from(start),
2443                        })
2444                        .await;
2445                }
2446                Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2447            }
2448        }
2449
2450        Input::LimitRemove { id, rule_id } => {
2451            let start = Instant::now();
2452            if !app.enforce_limits {
2453                emit_error(
2454                    &app.writer,
2455                    Some(id),
2456                    &PayError::NotImplemented(
2457                        "limit_remove is unavailable when limits are not enforced locally; configure limits on the RPC daemon"
2458                            .to_string(),
2459                    ),
2460                    start,
2461                )
2462                .await;
2463                return;
2464            }
2465
2466            match app.spend_ledger.remove_limit(&rule_id).await {
2467                Ok(()) => {
2468                    let _ = app
2469                        .writer
2470                        .send(Output::LimitRemoved {
2471                            id,
2472                            rule_id,
2473                            trace: trace_from(start),
2474                        })
2475                        .await;
2476                }
2477                Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2478            }
2479        }
2480
2481        Input::LimitList { id } => {
2482            let start = Instant::now();
2483            let local_limits = if app.enforce_limits {
2484                match app.spend_ledger.get_status().await {
2485                    Ok(status) => status,
2486                    Err(e) => {
2487                        emit_error(&app.writer, Some(id), &e, start).await;
2488                        return;
2489                    }
2490                }
2491            } else {
2492                vec![]
2493            };
2494
2495            // Query downstream afpay_rpc nodes
2496            let config = app.config.read().await.clone();
2497            let downstream = query_downstream_limits(&config).await;
2498
2499            let _ = app
2500                .writer
2501                .send(Output::LimitStatus {
2502                    id,
2503                    limits: local_limits,
2504                    downstream,
2505                    trace: trace_from(start),
2506                })
2507                .await;
2508        }
2509
2510        Input::LimitSet { id, mut limits } => {
2511            let start = Instant::now();
2512            if !app.enforce_limits {
2513                emit_error(
2514                    &app.writer,
2515                    Some(id),
2516                    &PayError::NotImplemented(
2517                        "limit_set is unavailable when limits are not enforced locally; configure limits on the RPC daemon"
2518                            .to_string(),
2519                    ),
2520                    start,
2521                )
2522                .await;
2523                return;
2524            }
2525
2526            // Auto-fill provider for wallet-scope rules that don't have one
2527            for rule in &mut limits {
2528                if rule.scope == SpendScope::Wallet && rule.network.is_none() {
2529                    if let Some(wallet_id) = rule.wallet.as_deref() {
2530                        match require_store(app).and_then(|s| s.load_wallet_metadata(wallet_id)) {
2531                            Ok(meta) => {
2532                                rule.network = Some(meta.network.to_string());
2533                            }
2534                            Err(e) => {
2535                                emit_error(&app.writer, Some(id), &e, start).await;
2536                                return;
2537                            }
2538                        }
2539                    }
2540                }
2541            }
2542
2543            match app.spend_ledger.set_limits(&limits).await {
2544                Ok(()) => match app.spend_ledger.get_status().await {
2545                    Ok(status) => {
2546                        let _ = app
2547                            .writer
2548                            .send(Output::LimitStatus {
2549                                id,
2550                                limits: status,
2551                                downstream: vec![],
2552                                trace: trace_from(start),
2553                            })
2554                            .await;
2555                    }
2556                    Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2557                },
2558                Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2559            }
2560        }
2561
2562        Input::WalletConfigShow { id, wallet } => {
2563            let start = Instant::now();
2564            match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
2565                Ok(meta) => {
2566                    let resolved_wallet = meta.id.clone();
2567                    let _ = app
2568                        .writer
2569                        .send(Output::WalletConfig {
2570                            id,
2571                            wallet: resolved_wallet,
2572                            config: meta,
2573                            trace: trace_from(start),
2574                        })
2575                        .await;
2576                }
2577                Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2578            }
2579        }
2580
2581        Input::WalletConfigSet {
2582            id,
2583            wallet,
2584            label,
2585            rpc_endpoints,
2586            chain_id,
2587        } => {
2588            let start = Instant::now();
2589            match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
2590                Ok(mut meta) => {
2591                    let resolved_wallet = meta.id.clone();
2592                    let mut changed = false;
2593
2594                    if let Some(new_label) = label {
2595                        let trimmed = new_label.trim();
2596                        meta.label = if trimmed.is_empty() {
2597                            None
2598                        } else {
2599                            Some(trimmed.to_string())
2600                        };
2601                        changed = true;
2602                    }
2603
2604                    if !rpc_endpoints.is_empty() {
2605                        match meta.network {
2606                            Network::Sol => {
2607                                meta.sol_rpc_endpoints = Some(rpc_endpoints);
2608                                changed = true;
2609                            }
2610                            Network::Evm => {
2611                                meta.evm_rpc_endpoints = Some(rpc_endpoints);
2612                                changed = true;
2613                            }
2614                            _ => {
2615                                emit_error(
2616                                    &app.writer,
2617                                    Some(id),
2618                                    &PayError::InvalidAmount(format!(
2619                                        "rpc-endpoint not supported for {} wallets",
2620                                        meta.network
2621                                    )),
2622                                    start,
2623                                )
2624                                .await;
2625                                return;
2626                            }
2627                        }
2628                    }
2629
2630                    if let Some(cid) = chain_id {
2631                        if meta.network != Network::Evm {
2632                            emit_error(
2633                                &app.writer,
2634                                Some(id),
2635                                &PayError::InvalidAmount(
2636                                    "chain-id is only supported for evm wallets".to_string(),
2637                                ),
2638                                start,
2639                            )
2640                            .await;
2641                            return;
2642                        }
2643                        meta.evm_chain_id = Some(cid);
2644                        changed = true;
2645                    }
2646
2647                    if !changed {
2648                        emit_error(
2649                            &app.writer,
2650                            Some(id),
2651                            &PayError::InvalidAmount(
2652                                "no configuration changes specified".to_string(),
2653                            ),
2654                            start,
2655                        )
2656                        .await;
2657                        return;
2658                    }
2659
2660                    match require_store(app).and_then(|s| s.save_wallet_metadata(&meta)) {
2661                        Ok(()) => {
2662                            let _ = app
2663                                .writer
2664                                .send(Output::WalletConfigUpdated {
2665                                    id,
2666                                    wallet: resolved_wallet,
2667                                    trace: trace_from(start),
2668                                })
2669                                .await;
2670                        }
2671                        Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2672                    }
2673                }
2674                Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2675            }
2676        }
2677
2678        Input::WalletConfigTokenAdd {
2679            id,
2680            wallet,
2681            symbol,
2682            address,
2683            decimals,
2684        } => {
2685            let start = Instant::now();
2686            match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
2687                Ok(mut meta) => {
2688                    let resolved_wallet = meta.id.clone();
2689                    if !matches!(meta.network, Network::Evm | Network::Sol) {
2690                        emit_error(
2691                            &app.writer,
2692                            Some(id),
2693                            &PayError::InvalidAmount(format!(
2694                                "custom tokens not supported for {} wallets",
2695                                meta.network
2696                            )),
2697                            start,
2698                        )
2699                        .await;
2700                        return;
2701                    }
2702
2703                    let lower_symbol = symbol.to_ascii_lowercase();
2704                    let tokens = meta.custom_tokens.get_or_insert_with(Vec::new);
2705                    if tokens
2706                        .iter()
2707                        .any(|t| t.symbol.to_ascii_lowercase() == lower_symbol)
2708                    {
2709                        emit_error(
2710                            &app.writer,
2711                            Some(id),
2712                            &PayError::InvalidAmount(format!(
2713                                "custom token '{lower_symbol}' already registered"
2714                            )),
2715                            start,
2716                        )
2717                        .await;
2718                        return;
2719                    }
2720
2721                    tokens.push(wallet::CustomToken {
2722                        symbol: lower_symbol.clone(),
2723                        address: address.clone(),
2724                        decimals,
2725                    });
2726
2727                    match require_store(app).and_then(|s| s.save_wallet_metadata(&meta)) {
2728                        Ok(()) => {
2729                            let _ = app
2730                                .writer
2731                                .send(Output::WalletConfigTokenAdded {
2732                                    id,
2733                                    wallet: resolved_wallet,
2734                                    symbol: lower_symbol,
2735                                    address,
2736                                    decimals,
2737                                    trace: trace_from(start),
2738                                })
2739                                .await;
2740                        }
2741                        Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2742                    }
2743                }
2744                Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2745            }
2746        }
2747
2748        Input::WalletConfigTokenRemove { id, wallet, symbol } => {
2749            let start = Instant::now();
2750            match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
2751                Ok(mut meta) => {
2752                    let resolved_wallet = meta.id.clone();
2753                    let lower_symbol = symbol.to_ascii_lowercase();
2754                    let tokens = meta.custom_tokens.get_or_insert_with(Vec::new);
2755                    let before_len = tokens.len();
2756                    tokens.retain(|t| t.symbol.to_ascii_lowercase() != lower_symbol);
2757                    if tokens.len() == before_len {
2758                        emit_error(
2759                            &app.writer,
2760                            Some(id),
2761                            &PayError::InvalidAmount(format!(
2762                                "custom token '{lower_symbol}' not found"
2763                            )),
2764                            start,
2765                        )
2766                        .await;
2767                        return;
2768                    }
2769                    if tokens.is_empty() {
2770                        meta.custom_tokens = None;
2771                    }
2772
2773                    match require_store(app).and_then(|s| s.save_wallet_metadata(&meta)) {
2774                        Ok(()) => {
2775                            let _ = app
2776                                .writer
2777                                .send(Output::WalletConfigTokenRemoved {
2778                                    id,
2779                                    wallet: resolved_wallet,
2780                                    symbol: lower_symbol,
2781                                    trace: trace_from(start),
2782                                })
2783                                .await;
2784                        }
2785                        Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2786                    }
2787                }
2788                Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2789            }
2790        }
2791
2792        Input::Config(patch) => {
2793            let start = Instant::now();
2794            let ConfigPatch {
2795                data_dir,
2796                limits,
2797                log,
2798                exchange_rate,
2799                afpay_rpc,
2800                providers,
2801            } = patch;
2802
2803            let mut unsupported = Vec::new();
2804            if data_dir.is_some() {
2805                unsupported.push("data_dir");
2806            }
2807            if afpay_rpc.is_some() {
2808                unsupported.push("afpay_rpc");
2809            }
2810            if providers.is_some() {
2811                unsupported.push("providers");
2812            }
2813            if exchange_rate.is_some() {
2814                unsupported.push("exchange_rate");
2815            }
2816            if !unsupported.is_empty() {
2817                let err = PayError::NotImplemented(format!(
2818                    "runtime config only supports 'log' and 'limits'; unsupported fields: {}",
2819                    unsupported.join(", ")
2820                ));
2821                emit_error(&app.writer, None, &err, start).await;
2822                return;
2823            }
2824
2825            if let Some(ref v) = limits {
2826                if !app.enforce_limits {
2827                    let err = PayError::NotImplemented(
2828                        "config.limits is unavailable when limits are not enforced locally; configure limits on the RPC daemon"
2829                            .to_string(),
2830                    );
2831                    emit_error(&app.writer, None, &err, start).await;
2832                    return;
2833                }
2834                if let Err(e) = app.spend_ledger.set_limits(v).await {
2835                    emit_error(&app.writer, None, &e, start).await;
2836                    return;
2837                }
2838            }
2839
2840            let mut cfg = app.config.write().await;
2841            if let Some(v) = limits {
2842                cfg.limits = v;
2843            }
2844            if let Some(v) = log {
2845                cfg.log = agent_first_data::cli_parse_log_filters(&v);
2846            }
2847            let _ = app.writer.send(Output::Config(cfg.clone())).await;
2848        }
2849
2850        Input::Version => {
2851            let _ = app
2852                .writer
2853                .send(Output::Version {
2854                    version: crate::config::VERSION.to_string(),
2855                    trace: PongTrace {
2856                        uptime_s: app.start_time.elapsed().as_secs(),
2857                        requests_total: app.requests_total.load(Ordering::Relaxed),
2858                        in_flight: app.in_flight.lock().await.len(),
2859                    },
2860                })
2861                .await;
2862        }
2863
2864        Input::Close => {
2865            // Handled in main loop
2866        }
2867    }
2868
2869    emit_migration_log(app).await;
2870}
2871
2872// ═══════════════════════════════════════════
2873// Helpers
2874// ═══════════════════════════════════════════
2875
2876fn get_provider(
2877    providers: &HashMap<Network, Box<dyn PayProvider>>,
2878    network: Network,
2879) -> Option<&dyn PayProvider> {
2880    providers.get(&network).map(|p| p.as_ref())
2881}
2882
2883fn looks_like_bip39_mnemonic(secret: &str) -> bool {
2884    let words = secret.split_whitespace().count();
2885    words == 12 || words == 24
2886}
2887
2888fn evm_receive_token_matches(expected: &str, observed: &str) -> bool {
2889    let expected = expected.trim().to_ascii_lowercase();
2890    let observed = observed.trim().to_ascii_lowercase();
2891    if expected == "native" {
2892        return observed == "native" || observed == "gwei" || observed == "wei";
2893    }
2894    if observed == expected {
2895        return true;
2896    }
2897    if let Some(stripped) = observed.strip_suffix("_base_units") {
2898        return stripped == expected;
2899    }
2900    false
2901}
2902
2903async fn emit_error(
2904    writer: &mpsc::Sender<Output>,
2905    id: Option<String>,
2906    err: &PayError,
2907    start: Instant,
2908) {
2909    emit_error_hint(writer, id, err, start, None).await;
2910}
2911
2912/// Like [`emit_error`] but with an optional hint override.
2913/// When `hint_override` is `Some`, it takes precedence over `PayError::hint()`.
2914async fn emit_error_hint(
2915    writer: &mpsc::Sender<Output>,
2916    id: Option<String>,
2917    err: &PayError,
2918    start: Instant,
2919    hint_override: Option<&str>,
2920) {
2921    let _ = writer
2922        .send(Output::Error {
2923            id,
2924            error_code: err.error_code().to_string(),
2925            error: err.to_string(),
2926            hint: hint_override.map(|h| h.to_string()).or_else(|| err.hint()),
2927            retryable: err.retryable(),
2928            trace: trace_from(start),
2929        })
2930        .await;
2931}
2932
2933fn extract_id(input: &Input) -> Option<String> {
2934    match input {
2935        Input::WalletCreate { id, .. }
2936        | Input::LnWalletCreate { id, .. }
2937        | Input::WalletClose { id, .. }
2938        | Input::WalletList { id, .. }
2939        | Input::Balance { id, .. }
2940        | Input::Receive { id, .. }
2941        | Input::ReceiveClaim { id, .. }
2942        | Input::CashuSend { id, .. }
2943        | Input::CashuReceive { id, .. }
2944        | Input::Send { id, .. }
2945        | Input::Restore { id, .. }
2946        | Input::WalletShowSeed { id, .. }
2947        | Input::HistoryList { id, .. }
2948        | Input::HistoryStatus { id, .. }
2949        | Input::HistoryUpdate { id, .. }
2950        | Input::LimitAdd { id, .. }
2951        | Input::LimitRemove { id, .. }
2952        | Input::LimitList { id, .. }
2953        | Input::LimitSet { id, .. }
2954        | Input::WalletConfigShow { id, .. }
2955        | Input::WalletConfigSet { id, .. }
2956        | Input::WalletConfigTokenAdd { id, .. }
2957        | Input::WalletConfigTokenRemove { id, .. } => Some(id.clone()),
2958        Input::Config(_) | Input::Version | Input::Close => None,
2959    }
2960}
2961
2962fn trace_from(start: Instant) -> Trace {
2963    Trace::from_duration(start.elapsed().as_millis() as u64)
2964}
2965
2966/// Query limits from each unique downstream afpay_rpc node.
2967async fn query_downstream_limits(config: &RuntimeConfig) -> Vec<DownstreamLimitNode> {
2968    let mut result = Vec::new();
2969    let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
2970    for (name, rpc_cfg) in &config.afpay_rpc {
2971        if !seen.insert(rpc_cfg.endpoint.clone()) {
2972            continue;
2973        }
2974        let secret = rpc_cfg.endpoint_secret.as_deref().unwrap_or("");
2975        let limit_input = Input::LimitList {
2976            id: format!("downstream_{name}"),
2977        };
2978        let outputs =
2979            crate::provider::remote::rpc_call(&rpc_cfg.endpoint, secret, &limit_input).await;
2980        let mut node = DownstreamLimitNode {
2981            name: name.clone(),
2982            endpoint: rpc_cfg.endpoint.clone(),
2983            limits: vec![],
2984            error: None,
2985            downstream: vec![],
2986        };
2987        for value in &outputs {
2988            if value.get("code").and_then(|v| v.as_str()) == Some("error") {
2989                node.error = value
2990                    .get("error")
2991                    .and_then(|v| v.as_str())
2992                    .map(|s| s.to_string());
2993            }
2994            if value.get("code").and_then(|v| v.as_str()) == Some("limit_status") {
2995                if let Some(limits) = value.get("limits") {
2996                    node.limits = serde_json::from_value(limits.clone()).unwrap_or_default();
2997                }
2998                if let Some(ds) = value.get("downstream") {
2999                    node.downstream = serde_json::from_value(ds.clone()).unwrap_or_default();
3000                }
3001            }
3002        }
3003        result.push(node);
3004    }
3005    result
3006}
3007
3008/// Extract `token=<value>` from a transfer target URI query string.
3009fn extract_token_from_target(to: &str) -> Option<String> {
3010    let query = to.split('?').nth(1)?;
3011    for part in query.split('&') {
3012        if let Some(val) = part.strip_prefix("token=") {
3013            if !val.is_empty() {
3014                return Some(val.to_string());
3015            }
3016        }
3017    }
3018    None
3019}
3020
3021fn wallet_provider_key(meta: &wallet::WalletMetadata) -> String {
3022    match meta.network {
3023        Network::Ln => meta
3024            .backend
3025            .as_deref()
3026            .map(|b| format!("ln-{}", b.to_ascii_lowercase()))
3027            .unwrap_or_else(|| "ln".to_string()),
3028        _ => meta.network.to_string(),
3029    }
3030}
3031
3032fn wallet_summary_from_meta(meta: &wallet::WalletMetadata, wallet_id: &str) -> WalletSummary {
3033    let (address, backend) = match meta.network {
3034        Network::Cashu => (meta.mint_url.clone().unwrap_or_default(), None),
3035        Network::Ln => {
3036            let b = meta
3037                .backend
3038                .clone()
3039                .unwrap_or_else(|| "unknown".to_string());
3040            (format!("ln:{b}"), Some(b))
3041        }
3042        _ => (wallet_id.to_string(), None),
3043    };
3044    WalletSummary {
3045        id: meta.id.clone(),
3046        network: meta.network,
3047        label: meta.label.clone(),
3048        address,
3049        backend,
3050        mint_url: meta.mint_url.clone(),
3051        rpc_endpoints: meta
3052            .sol_rpc_endpoints
3053            .clone()
3054            .or(meta.evm_rpc_endpoints.clone()),
3055        chain_id: meta.evm_chain_id,
3056        created_at_epoch_s: meta.created_at_epoch_s,
3057    }
3058}
3059
3060async fn resolve_wallet_summary(app: &App, wallet_id: &str) -> WalletSummary {
3061    if let Ok(meta) = require_store(app).and_then(|s| s.load_wallet_metadata(wallet_id)) {
3062        return wallet_summary_from_meta(&meta, wallet_id);
3063    }
3064    if let Ok(wallets) = collect_all!(&app.providers, |p| p.list_wallets()) {
3065        if let Some(summary) = wallets.into_iter().find(|w| w.id == wallet_id) {
3066            return summary;
3067        }
3068    }
3069    WalletSummary {
3070        id: wallet_id.to_string(),
3071        network: Network::Ln,
3072        label: None,
3073        address: String::new(),
3074        backend: None,
3075        mint_url: None,
3076        rpc_endpoints: None,
3077        chain_id: None,
3078        created_at_epoch_s: 0,
3079    }
3080}
3081
3082fn log_enabled(log: &[String], event: &str) -> bool {
3083    if log.is_empty() {
3084        return false;
3085    }
3086    let ev = event.to_ascii_lowercase();
3087    log.iter()
3088        .any(|f| f == "*" || f == "all" || ev.starts_with(f.as_str()))
3089}
3090
3091async fn emit_migration_log(app: &App) {
3092    let entries = app
3093        .store
3094        .as_ref()
3095        .map(|s| s.drain_migration_log())
3096        .unwrap_or_default();
3097    if entries.is_empty() {
3098        return;
3099    }
3100    for entry in entries {
3101        emit_log(
3102            app,
3103            "schema_migration",
3104            None,
3105            serde_json::json!({
3106                "database": entry.database,
3107                "from_version": entry.from_version,
3108                "to_version": entry.to_version,
3109            }),
3110        )
3111        .await;
3112    }
3113}
3114
3115async fn emit_log(app: &App, event: &str, request_id: Option<String>, args: serde_json::Value) {
3116    let log = app.config.read().await.log.clone();
3117    if !log_enabled(&log, event) {
3118        return;
3119    }
3120    let _ = app
3121        .writer
3122        .send(Output::Log {
3123            event: event.to_string(),
3124            request_id,
3125            version: None,
3126            argv: None,
3127            config: None,
3128            args: Some(args),
3129            env: None,
3130            trace: Trace::from_duration(0),
3131        })
3132        .await;
3133}