Skip to main content

agent_first_pay/
handler.rs

1#[cfg(any(
2    feature = "btc-esplora",
3    feature = "btc-core",
4    feature = "btc-electrum"
5))]
6use crate::provider::btc::BtcProvider;
7#[cfg(feature = "cashu")]
8use crate::provider::cashu::CashuProvider;
9#[cfg(feature = "evm")]
10use crate::provider::evm::EvmProvider;
11#[cfg(any(feature = "ln-nwc", feature = "ln-phoenixd", feature = "ln-lnbits"))]
12use crate::provider::ln::LnProvider;
13use crate::provider::remote::RemoteProvider;
14#[cfg(feature = "sol")]
15use crate::provider::sol::SolProvider;
16use crate::provider::{HistorySyncStats, PayError, PayProvider, StubProvider};
17use crate::spend::{SpendContext, SpendLedger};
18#[cfg(feature = "redb")]
19use crate::store::lock;
20use crate::store::wallet;
21use crate::store::{PayStore, StorageBackend};
22use crate::types::*;
23use std::collections::HashMap;
24use std::sync::atomic::{AtomicU64, Ordering};
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use tokio::sync::{mpsc, Mutex, RwLock};
28use tokio::task::JoinHandle;
29use tokio::time::sleep;
30
31const DEFAULT_WAIT_TIMEOUT_SECS: u64 = 300;
32const DEFAULT_WAIT_POLL_INTERVAL_MS: u64 = 1000;
33
34pub struct App {
35    pub config: RwLock<RuntimeConfig>,
36    pub providers: HashMap<Network, Box<dyn PayProvider>>,
37    pub writer: mpsc::Sender<Output>,
38    pub in_flight: Mutex<HashMap<String, JoinHandle<()>>>,
39    pub requests_total: AtomicU64,
40    pub start_time: Instant,
41    /// True if any provider uses local data (needs data-dir lock for writes).
42    pub has_local_providers: bool,
43    /// Whether this node enforces spend limits.
44    /// RPC mode: always true. CLI/pipe with all remote: false. CLI/pipe with any local: true.
45    pub enforce_limits: bool,
46    pub spend_ledger: SpendLedger,
47    /// Storage backend for wallet metadata and transaction history.
48    /// None when running in frontend-only mode (no local DB, only remote RPC).
49    pub store: Option<Arc<StorageBackend>>,
50}
51
52impl App {
53    /// Create a new App. If `enforce_limits_override` is Some, use that value;
54    /// otherwise auto-detect: enforce if any provider writes locally.
55    pub fn new(
56        config: RuntimeConfig,
57        writer: mpsc::Sender<Output>,
58        enforce_limits_override: Option<bool>,
59        store: Option<StorageBackend>,
60    ) -> Self {
61        let store = store.map(Arc::new);
62        let mut providers: HashMap<Network, Box<dyn PayProvider>> = HashMap::new();
63
64        for network in &[
65            Network::Ln,
66            Network::Sol,
67            Network::Evm,
68            Network::Cashu,
69            Network::Btc,
70        ] {
71            let key = network.to_string();
72            if let Some(rpc_name) = config.providers.get(&key) {
73                // Look up the afpay_rpc node by name
74                if let Some(rpc_cfg) = config.afpay_rpc.get(rpc_name) {
75                    let secret = rpc_cfg.endpoint_secret.as_deref().unwrap_or("");
76                    providers.insert(
77                        *network,
78                        Box::new(RemoteProvider::new(&rpc_cfg.endpoint, secret, *network)),
79                    );
80                } else {
81                    // Unknown afpay_rpc name — insert stub so errors surface at runtime
82                    providers.insert(*network, Box::new(StubProvider::new(*network)));
83                }
84            } else {
85                #[allow(unreachable_patterns)]
86                match network {
87                    #[cfg(feature = "cashu")]
88                    Network::Cashu => {
89                        if let Some(s) = &store {
90                            let pg_url = config
91                                .postgres_url_secret
92                                .clone()
93                                .filter(|_| config.storage_backend.as_deref() == Some("postgres"));
94                            providers.insert(
95                                *network,
96                                Box::new(CashuProvider::new(&config.data_dir, pg_url, s.clone())),
97                            );
98                        } else {
99                            providers.insert(*network, Box::new(StubProvider::new(*network)));
100                        }
101                    }
102                    #[cfg(any(feature = "ln-nwc", feature = "ln-phoenixd", feature = "ln-lnbits"))]
103                    Network::Ln => {
104                        providers.insert(*network, Box::new(LnProvider::new(&config.data_dir)));
105                    }
106                    #[cfg(feature = "sol")]
107                    Network::Sol => {
108                        providers.insert(*network, Box::new(SolProvider::new(&config.data_dir)));
109                    }
110                    #[cfg(feature = "evm")]
111                    Network::Evm => {
112                        providers.insert(*network, Box::new(EvmProvider::new(&config.data_dir)));
113                    }
114                    #[cfg(any(
115                        feature = "btc-esplora",
116                        feature = "btc-core",
117                        feature = "btc-electrum"
118                    ))]
119                    Network::Btc => {
120                        providers.insert(*network, Box::new(BtcProvider::new(&config.data_dir)));
121                    }
122                    _ => {
123                        providers.insert(*network, Box::new(StubProvider::new(*network)));
124                    }
125                }
126            }
127        }
128
129        let has_local = providers.values().any(|p| p.writes_locally());
130        let spend_ledger = match store.as_deref() {
131            #[cfg(feature = "postgres")]
132            Some(StorageBackend::Postgres(pg)) => {
133                SpendLedger::new_postgres(pg.pool().clone(), config.exchange_rate.clone())
134            }
135            _ => SpendLedger::new(&config.data_dir, config.exchange_rate.clone()),
136        };
137        Self {
138            config: RwLock::new(config),
139            providers,
140            writer,
141            in_flight: Mutex::new(HashMap::new()),
142            requests_total: AtomicU64::new(0),
143            start_time: Instant::now(),
144            has_local_providers: has_local,
145            enforce_limits: enforce_limits_override.unwrap_or(has_local),
146            spend_ledger,
147            store,
148        }
149    }
150}
151
152/// Unified startup validation for long-lived modes.
153/// Pings all configured remote afpay_rpc nodes (deduplicated) and validates provider mappings.
154pub async fn startup_provider_validation_errors(config: &RuntimeConfig) -> Vec<Output> {
155    let mut errors = Vec::new();
156
157    // Validate that all provider values reference known afpay_rpc names
158    for (network, rpc_name) in &config.providers {
159        if !config.afpay_rpc.contains_key(rpc_name) {
160            errors.push(Output::Error {
161                id: None,
162                error_code: "invalid_config".to_string(),
163                error: format!(
164                    "providers.{network} references unknown afpay_rpc node '{rpc_name}'"
165                ),
166                hint: Some(format!(
167                    "add [afpay_rpc.{rpc_name}] with endpoint and endpoint_secret to config.toml"
168                )),
169                retryable: false,
170                trace: Trace::from_duration(0),
171            });
172        }
173    }
174    if !errors.is_empty() {
175        return errors;
176    }
177
178    // Ping each unique afpay_rpc endpoint once
179    let mut pinged: std::collections::HashSet<String> = std::collections::HashSet::new();
180    for (rpc_name, rpc_cfg) in &config.afpay_rpc {
181        if !pinged.insert(rpc_cfg.endpoint.clone()) {
182            continue;
183        }
184        // Find any network that maps to this rpc_name (for the RemoteProvider constructor)
185        let network = config
186            .providers
187            .iter()
188            .find(|(_, name)| *name == rpc_name)
189            .and_then(|(k, _)| k.parse::<Network>().ok())
190            .unwrap_or(Network::Cashu);
191        let secret = rpc_cfg.endpoint_secret.as_deref().unwrap_or("");
192        let provider = RemoteProvider::new(&rpc_cfg.endpoint, secret, network);
193        if let Err(err) = provider.ping().await {
194            errors.push(Output::Error {
195                id: None,
196                error_code: "provider_unreachable".to_string(),
197                error: format!("afpay_rpc.{rpc_name} ({}): {err}", rpc_cfg.endpoint),
198                hint: Some("check endpoint address and that the daemon is running".to_string()),
199                retryable: true,
200                trace: Trace::from_duration(0),
201            });
202        }
203    }
204    errors
205}
206
207/// Get a reference to the storage backend, or return NotImplemented.
208fn require_store(app: &App) -> Result<&StorageBackend, PayError> {
209    app.store
210        .as_deref()
211        .ok_or_else(|| PayError::NotImplemented("no storage backend available".to_string()))
212}
213
214/// Acquire the data-directory lock for a write operation.
215/// Returns the lock guard (dropped after operation) or emits an error.
216#[cfg(feature = "redb")]
217async fn acquire_write_lock(app: &App) -> Result<lock::DataLock, PayError> {
218    let data_dir = app.config.read().await.data_dir.clone();
219    let lock = tokio::task::spawn_blocking(move || lock::acquire(&data_dir, None))
220        .await
221        .map_err(|e| PayError::InternalError(format!("lock task: {e}")))?
222        .map_err(PayError::InternalError)?;
223    Ok(lock)
224}
225
226fn needs_write_lock(input: &Input) -> bool {
227    matches!(
228        input,
229        Input::WalletCreate { .. }
230            | Input::LnWalletCreate { .. }
231            | Input::WalletClose { .. }
232            | Input::Receive { .. }
233            | Input::ReceiveClaim { .. }
234            | Input::CashuSend { .. }
235            | Input::CashuReceive { .. }
236            | Input::Send { .. }
237            | Input::Restore { .. }
238            | Input::LimitAdd { .. }
239            | Input::LimitRemove { .. }
240            | Input::LimitSet { .. }
241            | Input::HistoryUpdate { .. }
242            | Input::WalletConfigSet { .. }
243            | Input::WalletConfigTokenAdd { .. }
244            | Input::WalletConfigTokenRemove { .. }
245    )
246}
247
248/// Try each provider until one succeeds. Skips NotImplemented, stops on first real result.
249macro_rules! try_provider {
250    ($providers:expr, |$p:ident| $call:expr) => {{
251        let mut _result: Option<Result<_, PayError>> = None;
252        for _prov in $providers.values() {
253            let $p = _prov.as_ref();
254            match $call.await {
255                Ok(v) => {
256                    _result = Some(Ok(v));
257                    break;
258                }
259                Err(PayError::NotImplemented(_)) => continue,
260                Err(e) => {
261                    _result = Some(Err(e));
262                    break;
263                }
264            }
265        }
266        match _result {
267            Some(r) => r,
268            None => Err(PayError::NotImplemented("network not enabled".to_string())),
269        }
270    }};
271}
272
273/// Collect results from all providers, skipping NotImplemented.
274macro_rules! collect_all {
275    ($providers:expr, |$p:ident| $call:expr) => {{
276        let mut _all = Vec::new();
277        let mut _err: Option<PayError> = None;
278        for _prov in $providers.values() {
279            let $p = _prov.as_ref();
280            match $call.await {
281                Ok(mut items) => _all.append(&mut items),
282                Err(PayError::NotImplemented(_)) => {}
283                Err(e) => {
284                    _err = Some(e);
285                    break;
286                }
287            }
288        }
289        match _err {
290            Some(e) => Err(e),
291            None => Ok(_all),
292        }
293    }};
294}
295
296/// Resolve wallet labels to wallet IDs in-place.
297/// If a wallet field does not start with "w_", treat it as a label and look it up.
298fn resolve_wallet_labels(input: &mut Input, store: &dyn PayStore) -> Result<(), PayError> {
299    fn resolve(store: &dyn PayStore, w: &mut String) -> Result<(), PayError> {
300        if !w.starts_with("w_") {
301            *w = store.resolve_wallet_id(w)?;
302        }
303        Ok(())
304    }
305    fn resolve_opt(store: &dyn PayStore, w: &mut Option<String>) -> Result<(), PayError> {
306        if let Some(val) = w.as_mut() {
307            if !val.starts_with("w_") {
308                *val = store.resolve_wallet_id(val)?;
309            }
310        }
311        Ok(())
312    }
313    match input {
314        Input::WalletClose { wallet, .. } => resolve(store, wallet),
315        Input::Balance { wallet, .. } => resolve_opt(store, wallet),
316        Input::Receive { wallet, .. } => resolve(store, wallet),
317        Input::ReceiveClaim { wallet, .. } => resolve(store, wallet),
318        Input::CashuSend { wallet, .. } => resolve_opt(store, wallet),
319        Input::CashuReceive { wallet, .. } => resolve_opt(store, wallet),
320        Input::Send { wallet, .. } => resolve_opt(store, wallet),
321        Input::Restore { wallet, .. } => resolve(store, wallet),
322        Input::WalletShowSeed { wallet, .. } => resolve(store, wallet),
323        Input::HistoryList { wallet, .. } | Input::HistoryUpdate { wallet, .. } => {
324            resolve_opt(store, wallet)
325        }
326        Input::WalletConfigShow { wallet, .. } => resolve(store, wallet),
327        Input::WalletConfigSet { wallet, .. } => resolve(store, wallet),
328        Input::WalletConfigTokenAdd { wallet, .. } => resolve(store, wallet),
329        Input::WalletConfigTokenRemove { wallet, .. } => resolve(store, wallet),
330        Input::LimitAdd { limit, .. } => resolve_opt(store, &mut limit.wallet),
331        Input::LimitSet { limits, .. } => {
332            for limit in limits.iter_mut() {
333                resolve_opt(store, &mut limit.wallet)?;
334            }
335            Ok(())
336        }
337        _ => Ok(()),
338    }
339}
340
341pub async fn dispatch(app: &App, input: Input) {
342    // Acquire per-operation file lock for redb write operations.
343    // Postgres handles its own concurrency; no file lock needed.
344    #[cfg(feature = "redb")]
345    let _lock = if app.has_local_providers
346        && needs_write_lock(&input)
347        && matches!(app.store.as_deref(), Some(StorageBackend::Redb(..)) | None)
348    {
349        match acquire_write_lock(app).await {
350            Ok(guard) => Some(guard),
351            Err(e) => {
352                let id = extract_id(&input);
353                emit_error(&app.writer, id, &e, Instant::now()).await;
354                return;
355            }
356        }
357    } else {
358        None
359    };
360
361    // Resolve wallet labels → wallet IDs before dispatch
362    let mut input = input;
363    if let Some(store) = &app.store {
364        if let Err(e) = resolve_wallet_labels(&mut input, store.as_ref()) {
365            let id = extract_id(&input);
366            emit_error(&app.writer, id, &e, Instant::now()).await;
367            return;
368        }
369    }
370
371    match input {
372        Input::WalletCreate {
373            id,
374            network,
375            label,
376            mint_url,
377            rpc_endpoints,
378            chain_id,
379            mnemonic_secret,
380            btc_esplora_url,
381            btc_network,
382            btc_address_type,
383            btc_backend,
384            btc_core_url,
385            btc_core_auth_secret,
386            btc_electrum_url,
387        } => {
388            let start = Instant::now();
389            let mut log_args = serde_json::json!({
390                "operation": "wallet_create",
391                "network": network.to_string(),
392                "label": label.as_deref().unwrap_or("default"),
393            });
394            if let Some(object) = log_args.as_object_mut() {
395                if !rpc_endpoints.is_empty() {
396                    object.insert(
397                        "rpc_endpoints".to_string(),
398                        serde_json::json!(rpc_endpoints),
399                    );
400                }
401                if let Some(cid) = chain_id {
402                    object.insert("chain_id".to_string(), serde_json::json!(cid));
403                }
404                if let Some(url) = mint_url.as_deref() {
405                    object.insert("mint_url".to_string(), serde_json::json!(url));
406                }
407                object.insert(
408                    "use_recovery_mnemonic".to_string(),
409                    serde_json::json!(mnemonic_secret.is_some()),
410                );
411            }
412            emit_log(app, "wallet", Some(id.clone()), log_args).await;
413            let request = WalletCreateRequest {
414                label: label.unwrap_or_else(|| "default".to_string()),
415                mint_url,
416                rpc_endpoints,
417                chain_id,
418                mnemonic_secret,
419                btc_esplora_url,
420                btc_network,
421                btc_address_type,
422                btc_backend,
423                btc_core_url,
424                btc_core_auth_secret,
425                btc_electrum_url,
426            };
427            match get_provider(&app.providers, network) {
428                Some(p) => match p.create_wallet(&request).await {
429                    Ok(info) => {
430                        // Sync wallet metadata to store (for non-redb backends).
431                        #[cfg(feature = "redb")]
432                        if let Some(store) = app.store.as_ref() {
433                            if let Ok(meta) = wallet::load_wallet_metadata(
434                                &app.config.read().await.data_dir,
435                                &info.id,
436                            ) {
437                                let _ = store.save_wallet_metadata(&meta);
438                            }
439                        }
440                        let _ = app
441                            .writer
442                            .send(Output::WalletCreated {
443                                id,
444                                wallet: info.id,
445                                network: info.network,
446                                address: info.address,
447                                mnemonic: None,
448                                trace: trace_from(start),
449                            })
450                            .await;
451                    }
452                    Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
453                },
454                None => {
455                    emit_error(
456                        &app.writer,
457                        Some(id),
458                        &PayError::NotImplemented(format!("no provider for {network}")),
459                        start,
460                    )
461                    .await;
462                }
463            }
464        }
465
466        Input::LnWalletCreate { id, request } => {
467            let start = Instant::now();
468            let mut log_args =
469                serde_json::to_value(&request).unwrap_or_else(|_| serde_json::json!({}));
470            if let Some(object) = log_args.as_object_mut() {
471                object.insert(
472                    "operation".to_string(),
473                    serde_json::json!("ln_wallet_create"),
474                );
475                object.insert("network".to_string(), serde_json::json!("ln"));
476            }
477            emit_log(app, "wallet", Some(id.clone()), log_args).await;
478
479            match get_provider(&app.providers, Network::Ln) {
480                Some(p) => match p.create_ln_wallet(request).await {
481                    Ok(info) => {
482                        #[cfg(feature = "redb")]
483                        if let Some(store) = app.store.as_ref() {
484                            if let Ok(meta) = wallet::load_wallet_metadata(
485                                &app.config.read().await.data_dir,
486                                &info.id,
487                            ) {
488                                let _ = store.save_wallet_metadata(&meta);
489                            }
490                        }
491                        let _ = app
492                            .writer
493                            .send(Output::WalletCreated {
494                                id,
495                                wallet: info.id,
496                                network: info.network,
497                                address: info.address,
498                                mnemonic: None,
499                                trace: trace_from(start),
500                            })
501                            .await;
502                    }
503                    Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
504                },
505                None => {
506                    emit_error(
507                        &app.writer,
508                        Some(id),
509                        &PayError::NotImplemented("no provider for ln".to_string()),
510                        start,
511                    )
512                    .await;
513                }
514            }
515        }
516
517        Input::WalletClose {
518            id,
519            wallet,
520            dangerously_skip_balance_check_and_may_lose_money,
521        } => {
522            let start = Instant::now();
523            emit_log(
524                app,
525                "wallet",
526                Some(id.clone()),
527                serde_json::json!({
528                    "operation": "wallet_close",
529                    "wallet": &wallet,
530                    "dangerously_skip_balance_check_and_may_lose_money": dangerously_skip_balance_check_and_may_lose_money,
531                }),
532            )
533            .await;
534            let close_result = if dangerously_skip_balance_check_and_may_lose_money {
535                match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
536                    Ok(_) => require_store(app).and_then(|s| s.delete_wallet_metadata(&wallet)).map(|_| ()),
537                    Err(PayError::WalletNotFound(_)) => Err(PayError::WalletNotFound(format!(
538                        "wallet {wallet} not found locally; dangerous skip balance check only supports local wallets"
539                    ))),
540                    Err(error) => Err(error),
541                }
542            } else {
543                match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
544                    Ok(meta) => match get_provider(&app.providers, meta.network) {
545                        Some(provider) => provider.close_wallet(&wallet).await,
546                        None => Err(PayError::NotImplemented(format!(
547                            "no provider for {}",
548                            meta.network
549                        ))),
550                    },
551                    Err(PayError::WalletNotFound(_)) => {
552                        // Fallback for remote-only deployments where wallets may not be stored locally.
553                        try_provider!(&app.providers, |p| p.close_wallet(&wallet))
554                    }
555                    Err(error) => Err(error),
556                }
557            };
558
559            match close_result {
560                Ok(()) => {
561                    let _ = app
562                        .writer
563                        .send(Output::WalletClosed {
564                            id,
565                            wallet,
566                            trace: trace_from(start),
567                        })
568                        .await;
569                }
570                Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
571            }
572        }
573
574        Input::WalletList { id, network } => {
575            let start = Instant::now();
576            emit_log(
577                app,
578                "wallet",
579                Some(id.clone()),
580                serde_json::json!({
581                    "operation": "wallet_list",
582                    "network": network.map(|c| c.to_string()).unwrap_or_else(|| "all".to_string()),
583                }),
584            )
585            .await;
586            if let Some(network) = network {
587                match get_provider(&app.providers, network) {
588                    Some(p) => match p.list_wallets().await {
589                        Ok(wallets) => {
590                            let _ = app
591                                .writer
592                                .send(Output::WalletList {
593                                    id,
594                                    wallets,
595                                    trace: trace_from(start),
596                                })
597                                .await;
598                        }
599                        Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
600                    },
601                    None => {
602                        emit_error(
603                            &app.writer,
604                            Some(id),
605                            &PayError::NotImplemented(format!("no provider for {network}")),
606                            start,
607                        )
608                        .await;
609                    }
610                }
611            } else {
612                let wallets = collect_all!(&app.providers, |p| p.list_wallets());
613                match wallets {
614                    Ok(all) => {
615                        let _ = app
616                            .writer
617                            .send(Output::WalletList {
618                                id,
619                                wallets: all,
620                                trace: trace_from(start),
621                            })
622                            .await;
623                    }
624                    Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
625                }
626            }
627        }
628
629        Input::Balance {
630            id,
631            wallet,
632            network,
633            check,
634        } => {
635            let start = Instant::now();
636            emit_log(
637                app,
638                "wallet",
639                Some(id.clone()),
640                serde_json::json!({
641                    "operation": "balance", "wallet": wallet.as_deref().unwrap_or("all"), "check": check,
642                }),
643            )
644            .await;
645            if let Some(wallet_id) = wallet {
646                let meta_opt = require_store(app)
647                    .and_then(|s| s.load_wallet_metadata(&wallet_id))
648                    .ok();
649                let result = if let Some(ref meta) = meta_opt {
650                    match get_provider(&app.providers, meta.network) {
651                        Some(provider) => {
652                            if check {
653                                match provider.check_balance(&wallet_id).await {
654                                    Err(PayError::NotImplemented(_)) => {
655                                        provider.balance(&wallet_id).await
656                                    }
657                                    other => other,
658                                }
659                            } else {
660                                provider.balance(&wallet_id).await
661                            }
662                        }
663                        None => Err(PayError::NotImplemented(format!(
664                            "no provider for {}",
665                            meta.network
666                        ))),
667                    }
668                } else {
669                    // Remote-only fallback: wallet metadata may not exist locally.
670                    if check {
671                        try_provider!(&app.providers, |p| async {
672                            match p.check_balance(&wallet_id).await {
673                                Err(PayError::NotImplemented(_)) => p.balance(&wallet_id).await,
674                                other => other,
675                            }
676                        })
677                    } else {
678                        try_provider!(&app.providers, |p| p.balance(&wallet_id))
679                    }
680                };
681                match result {
682                    Ok(balance) => {
683                        let summary = if let Some(meta) = meta_opt {
684                            wallet_summary_from_meta(&meta, &wallet_id)
685                        } else {
686                            resolve_wallet_summary(app, &wallet_id).await
687                        };
688                        let _ = app
689                            .writer
690                            .send(Output::WalletBalances {
691                                id,
692                                wallets: vec![WalletBalanceItem {
693                                    wallet: summary,
694                                    balance: Some(balance),
695                                    error: None,
696                                }],
697                                trace: trace_from(start),
698                            })
699                            .await;
700                    }
701                    Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
702                }
703            } else {
704                match collect_all!(&app.providers, |p| p.balance_all()) {
705                    Ok(wallets) => {
706                        let filtered = if let Some(network) = network {
707                            wallets
708                                .into_iter()
709                                .filter(|w| w.wallet.network == network)
710                                .collect()
711                        } else {
712                            wallets
713                        };
714                        let _ = app
715                            .writer
716                            .send(Output::WalletBalances {
717                                id,
718                                wallets: filtered,
719                                trace: trace_from(start),
720                            })
721                            .await;
722                    }
723                    Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
724                }
725            }
726        }
727
728        Input::Receive {
729            id,
730            wallet,
731            network,
732            amount,
733            onchain_memo,
734            wait_until_paid,
735            wait_timeout_s,
736            wait_poll_interval_ms,
737            write_qr_svg_file: _,
738            min_confirmations,
739        } => {
740            let start = Instant::now();
741            let wait_requested =
742                wait_until_paid || wait_timeout_s.is_some() || wait_poll_interval_ms.is_some();
743            emit_log(
744                app,
745                "wallet",
746                Some(id.clone()),
747                serde_json::json!({
748                    "operation": "receive",
749                    "wallet": &wallet,
750                    "network": network.map(|c| c.to_string()).unwrap_or_else(|| "auto".to_string()),
751                    "amount": amount.as_ref().map(|a| a.value),
752                    "onchain_memo": onchain_memo.as_deref().unwrap_or(""),
753                    "wait_until_paid": wait_requested,
754                    "wait_timeout_s": wait_timeout_s,
755                    "wait_poll_interval_ms": wait_poll_interval_ms,
756                }),
757            )
758            .await;
759
760            let (target_network, wallet_for_call) = if wallet.trim().is_empty() {
761                let wallets = match require_store(app).and_then(|s| s.list_wallet_metadata(network))
762                {
763                    Ok(v) => v,
764                    Err(e) => {
765                        emit_error(&app.writer, Some(id), &e, start).await;
766                        return;
767                    }
768                };
769                match wallets.len() {
770                    0 => {
771                        let msg = match network {
772                            Some(network) => format!("no {network} wallet found"),
773                            None => "no wallet found".to_string(),
774                        };
775                        emit_error(&app.writer, Some(id), &PayError::WalletNotFound(msg), start)
776                            .await;
777                        return;
778                    }
779                    1 => (wallets[0].network, wallets[0].id.clone()),
780                    _ => {
781                        let msg = match network {
782                            Some(network) => {
783                                format!("multiple {network} wallets found; pass --wallet")
784                            }
785                            None => "multiple wallets found; pass --wallet".to_string(),
786                        };
787                        emit_error(&app.writer, Some(id), &PayError::InvalidAmount(msg), start)
788                            .await;
789                        return;
790                    }
791                }
792            } else {
793                let meta = match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
794                    Ok(m) => m,
795                    Err(e) => {
796                        emit_error(&app.writer, Some(id), &e, start).await;
797                        return;
798                    }
799                };
800                if let Some(expected) = network {
801                    if meta.network != expected {
802                        emit_error(
803                            &app.writer,
804                            Some(id),
805                            &PayError::InvalidAmount(format!(
806                                "wallet {wallet} is {}, not {expected}",
807                                meta.network
808                            )),
809                            start,
810                        )
811                        .await;
812                        return;
813                    }
814                }
815                (meta.network, wallet.clone())
816            };
817
818            let Some(provider) = get_provider(&app.providers, target_network) else {
819                emit_error(
820                    &app.writer,
821                    Some(id),
822                    &PayError::NotImplemented(format!("no provider for {target_network}")),
823                    start,
824                )
825                .await;
826                return;
827            };
828
829            match provider
830                .receive_info(&wallet_for_call, amount.clone())
831                .await
832            {
833                Ok(receive_info) => {
834                    let quote_id = receive_info.quote_id.clone();
835                    let _ = app
836                        .writer
837                        .send(Output::ReceiveInfo {
838                            id: id.clone(),
839                            wallet: wallet_for_call.clone(),
840                            receive_info,
841                            trace: trace_from(start),
842                        })
843                        .await;
844
845                    if !wait_requested {
846                        return;
847                    }
848
849                    let timeout_secs = wait_timeout_s.unwrap_or(DEFAULT_WAIT_TIMEOUT_SECS);
850                    if timeout_secs == 0 {
851                        emit_error(
852                            &app.writer,
853                            Some(id),
854                            &PayError::InvalidAmount("wait_timeout_s must be >= 1".to_string()),
855                            start,
856                        )
857                        .await;
858                        return;
859                    }
860                    let poll_interval_ms =
861                        wait_poll_interval_ms.unwrap_or(DEFAULT_WAIT_POLL_INTERVAL_MS);
862                    if poll_interval_ms == 0 {
863                        emit_error(
864                            &app.writer,
865                            Some(id),
866                            &PayError::InvalidAmount(
867                                "wait_poll_interval_ms must be >= 1".to_string(),
868                            ),
869                            start,
870                        )
871                        .await;
872                        return;
873                    }
874
875                    if target_network == Network::Sol {
876                        let memo_to_watch = onchain_memo
877                            .as_deref()
878                            .map(str::trim)
879                            .filter(|text| !text.is_empty())
880                            .map(str::to_owned);
881                        let amount_to_watch = amount.as_ref().map(|a| a.value);
882
883                        if memo_to_watch.is_none() && amount_to_watch.is_none() {
884                            emit_error_hint(
885                                &app.writer,
886                                Some(id),
887                                &PayError::InvalidAmount(
888                                    "sol receive --wait requires a match condition".to_string(),
889                                ),
890                                start,
891                                Some("pass --onchain-memo or --amount"),
892                            )
893                            .await;
894                            return;
895                        }
896
897                        let deadline = Instant::now() + Duration::from_secs(timeout_secs);
898                        loop {
899                            match provider.history_list(&wallet_for_call, 200, 0).await {
900                                Ok(items) => {
901                                    let matched = items.into_iter().find(|item| {
902                                        if item.direction != Direction::Receive {
903                                            return false;
904                                        }
905                                        if let Some(ref m) = memo_to_watch {
906                                            item.onchain_memo.as_deref() == Some(m.as_str())
907                                        } else if let Some(expected) = amount_to_watch {
908                                            item.amount.value == expected
909                                        } else {
910                                            false
911                                        }
912                                    });
913                                    if let Some(item) = matched {
914                                        // Check confirmation depth if requested
915                                        if let Some(min_conf) = min_confirmations {
916                                            match provider
917                                                .history_status(&item.transaction_id)
918                                                .await
919                                            {
920                                                Ok(status_info) => {
921                                                    let confs = status_info
922                                                        .confirmations
923                                                        .unwrap_or_else(|| {
924                                                            if status_info.status
925                                                                == TxStatus::Confirmed
926                                                            {
927                                                                min_conf
928                                                            } else {
929                                                                0
930                                                            }
931                                                        });
932                                                    if confs < min_conf {
933                                                        // Not enough confirmations yet, keep polling
934                                                        if Instant::now() >= deadline {
935                                                            let criteria = if let Some(ref m) =
936                                                                memo_to_watch
937                                                            {
938                                                                format!("memo '{m}'")
939                                                            } else {
940                                                                format!(
941                                                                    "amount {}",
942                                                                    amount_to_watch.unwrap_or(0)
943                                                                )
944                                                            };
945                                                            emit_error(
946                                                                &app.writer,
947                                                                Some(id),
948                                                                &PayError::NetworkError(format!(
949                                                                    "wait timeout after {timeout_secs}s: sol transaction {tx} matching {criteria} has {confs}/{min_conf} confirmations",
950                                                                    tx = item.transaction_id,
951                                                                )),
952                                                                start,
953                                                            )
954                                                            .await;
955                                                            break;
956                                                        }
957                                                        sleep(Duration::from_millis(
958                                                            poll_interval_ms,
959                                                        ))
960                                                        .await;
961                                                        continue;
962                                                    }
963                                                    // Enough confirmations — emit with confirmation count
964                                                    let transaction_id =
965                                                        item.transaction_id.clone();
966                                                    let _ = app
967                                                        .writer
968                                                        .send(Output::HistoryStatus {
969                                                            id,
970                                                            transaction_id,
971                                                            status: item.status,
972                                                            confirmations: Some(confs),
973                                                            preimage: item.preimage.clone(),
974                                                            item: Some(item),
975                                                            trace: trace_from(start),
976                                                        })
977                                                        .await;
978                                                    break;
979                                                }
980                                                Err(e) if e.retryable() => {
981                                                    sleep(Duration::from_millis(poll_interval_ms))
982                                                        .await;
983                                                    continue;
984                                                }
985                                                Err(e) => {
986                                                    emit_error(&app.writer, Some(id), &e, start)
987                                                        .await;
988                                                    break;
989                                                }
990                                            }
991                                        } else {
992                                            let transaction_id = item.transaction_id.clone();
993                                            let _ = app
994                                                .writer
995                                                .send(Output::HistoryStatus {
996                                                    id,
997                                                    transaction_id,
998                                                    status: item.status,
999                                                    confirmations: None,
1000                                                    preimage: item.preimage.clone(),
1001                                                    item: Some(item),
1002                                                    trace: trace_from(start),
1003                                                })
1004                                                .await;
1005                                            break;
1006                                        }
1007                                    }
1008                                    if Instant::now() >= deadline {
1009                                        let criteria = if let Some(ref m) = memo_to_watch {
1010                                            format!("memo '{m}'")
1011                                        } else {
1012                                            format!("amount {}", amount_to_watch.unwrap_or(0))
1013                                        };
1014                                        emit_error(
1015                                            &app.writer,
1016                                            Some(id),
1017                                            &PayError::NetworkError(format!(
1018                                                "wait timeout after {timeout_secs}s: no incoming sol transaction matching {criteria}"
1019                                            )),
1020                                            start,
1021                                        )
1022                                        .await;
1023                                        break;
1024                                    }
1025                                    sleep(Duration::from_millis(poll_interval_ms)).await;
1026                                }
1027                                Err(e) if e.retryable() => {
1028                                    if Instant::now() >= deadline {
1029                                        let criteria = if let Some(ref m) = memo_to_watch {
1030                                            format!("memo '{m}'")
1031                                        } else {
1032                                            format!("amount {}", amount_to_watch.unwrap_or(0))
1033                                        };
1034                                        emit_error(
1035                                            &app.writer,
1036                                            Some(id),
1037                                            &PayError::NetworkError(format!(
1038                                                "wait timeout after {timeout_secs}s: no incoming sol transaction matching {criteria}"
1039                                            )),
1040                                            start,
1041                                        )
1042                                        .await;
1043                                        break;
1044                                    }
1045                                    sleep(Duration::from_millis(poll_interval_ms)).await;
1046                                }
1047                                Err(e) => {
1048                                    emit_error(&app.writer, Some(id), &e, start).await;
1049                                    break;
1050                                }
1051                            }
1052                        }
1053                        return;
1054                    }
1055
1056                    // EVM: poll balance deltas for incoming deposits.
1057                    if target_network == Network::Evm {
1058                        if onchain_memo
1059                            .as_deref()
1060                            .map(str::trim)
1061                            .filter(|text| !text.is_empty())
1062                            .is_some()
1063                        {
1064                            emit_error_hint(
1065                                &app.writer,
1066                                Some(id),
1067                                &PayError::InvalidAmount(
1068                                    "evm receive --wait does not support --onchain-memo matching"
1069                                        .to_string(),
1070                                ),
1071                                start,
1072                                Some("pass --amount to match incoming transfers"),
1073                            )
1074                            .await;
1075                            return;
1076                        }
1077                        let amount_to_watch = amount.as_ref().map(|a| a.value);
1078
1079                        if amount_to_watch.is_none() {
1080                            emit_error_hint(
1081                                &app.writer,
1082                                Some(id),
1083                                &PayError::InvalidAmount(
1084                                    "evm receive --wait requires --amount".to_string(),
1085                                ),
1086                                start,
1087                                Some("pass --amount"),
1088                            )
1089                            .await;
1090                            return;
1091                        }
1092
1093                        // Snapshot current balance before polling
1094                        let initial_balance = match provider.balance(&wallet_for_call).await {
1095                            Ok(b) => b,
1096                            Err(e) => {
1097                                emit_error(&app.writer, Some(id), &e, start).await;
1098                                return;
1099                            }
1100                        };
1101
1102                        let deadline = Instant::now() + Duration::from_secs(timeout_secs);
1103                        loop {
1104                            sleep(Duration::from_millis(poll_interval_ms)).await;
1105                            if Instant::now() >= deadline {
1106                                let criteria = format!("amount {}", amount_to_watch.unwrap_or(0));
1107                                emit_error(
1108                                    &app.writer,
1109                                    Some(id),
1110                                    &PayError::NetworkError(format!(
1111                                        "wait timeout after {timeout_secs}s: no incoming evm deposit matching {criteria}"
1112                                    )),
1113                                    start,
1114                                )
1115                                .await;
1116                                break;
1117                            }
1118                            match provider.balance(&wallet_for_call).await {
1119                                Ok(current) => {
1120                                    // Check native balance increase
1121                                    let native_increase =
1122                                        current.confirmed.saturating_sub(initial_balance.confirmed);
1123                                    // Check token balance increases
1124                                    let mut token_increase: Option<(String, u64)> = None;
1125                                    for (key, &cur_val) in &current.additional {
1126                                        let init_val = initial_balance
1127                                            .additional
1128                                            .get(key)
1129                                            .copied()
1130                                            .unwrap_or(0);
1131                                        if cur_val > init_val {
1132                                            token_increase =
1133                                                Some((key.clone(), cur_val - init_val));
1134                                            break;
1135                                        }
1136                                    }
1137
1138                                    if native_increase > 0 || token_increase.is_some() {
1139                                        let (amount_value, amount_unit) =
1140                                            if let Some((token_key, delta)) = token_increase {
1141                                                (delta, token_key)
1142                                            } else {
1143                                                (native_increase, current.unit.clone())
1144                                            };
1145
1146                                        // Match expected amount exactly.
1147                                        if let Some(expected) = amount_to_watch {
1148                                            if amount_value != expected {
1149                                                continue;
1150                                            }
1151                                        }
1152
1153                                        // If min_confirmations requested, find the on-chain tx and wait for depth
1154                                        if let Some(min_conf) = min_confirmations {
1155                                            // Find the most recent incoming transaction via history_list
1156                                            let chain_tx_id = match provider
1157                                                .history_list(&wallet_for_call, 20, 0)
1158                                                .await
1159                                            {
1160                                                Ok(items) => items
1161                                                    .into_iter()
1162                                                    .find(|i| {
1163                                                        i.direction == Direction::Receive
1164                                                            && i.amount.value == amount_value
1165                                                    })
1166                                                    .map(|i| i.transaction_id),
1167                                                Err(_) => None,
1168                                            };
1169                                            if let Some(ref ctxid) = chain_tx_id {
1170                                                // Poll history_status until enough confirmations or timeout
1171                                                loop {
1172                                                    match provider.history_status(ctxid).await {
1173                                                        Ok(status_info) => {
1174                                                            let confs = status_info
1175                                                                .confirmations
1176                                                                .unwrap_or(0);
1177                                                            if confs >= min_conf {
1178                                                                let record = HistoryRecord {
1179                                                                    transaction_id: ctxid.clone(),
1180                                                                    wallet: wallet_for_call.clone(),
1181                                                                    network: Network::Evm,
1182                                                                    direction: Direction::Receive,
1183                                                                    amount: Amount {
1184                                                                        value: amount_value,
1185                                                                        token: amount_unit.clone(),
1186                                                                    },
1187                                                                    status: TxStatus::Confirmed,
1188                                                                    onchain_memo: None,
1189                                                                    local_memo: None,
1190                                                                    remote_addr: None,
1191                                                                    preimage: None,
1192                                                                    created_at_epoch_s:
1193                                                                        wallet::now_epoch_seconds(),
1194                                                                    confirmed_at_epoch_s: Some(
1195                                                                        wallet::now_epoch_seconds(),
1196                                                                    ),
1197                                                                    fee: None,
1198                                                                };
1199                                                                if let Some(s) = &app.store {
1200                                                                    let _ = s
1201                                                                        .append_transaction_record(
1202                                                                            &record,
1203                                                                        );
1204                                                                }
1205                                                                let _ = app
1206                                                                    .writer
1207                                                                    .send(Output::HistoryStatus {
1208                                                                        id,
1209                                                                        transaction_id: ctxid
1210                                                                            .clone(),
1211                                                                        status: TxStatus::Confirmed,
1212                                                                        confirmations: Some(confs),
1213                                                                        preimage: None,
1214                                                                        item: Some(record),
1215                                                                        trace: trace_from(start),
1216                                                                    })
1217                                                                    .await;
1218                                                                break;
1219                                                            }
1220                                                            if Instant::now() >= deadline {
1221                                                                let criteria = format!(
1222                                                                    "amount {}",
1223                                                                    amount_to_watch.unwrap_or(0)
1224                                                                );
1225                                                                emit_error(
1226                                                                    &app.writer,
1227                                                                    Some(id),
1228                                                                    &PayError::NetworkError(format!(
1229                                                                        "wait timeout after {timeout_secs}s: evm transaction {ctxid} matching {criteria} has {confs}/{min_conf} confirmations",
1230                                                                    )),
1231                                                                    start,
1232                                                                )
1233                                                                .await;
1234                                                                break;
1235                                                            }
1236                                                            sleep(Duration::from_millis(
1237                                                                poll_interval_ms,
1238                                                            ))
1239                                                            .await;
1240                                                        }
1241                                                        Err(e) if e.retryable() => {
1242                                                            if Instant::now() >= deadline {
1243                                                                emit_error(
1244                                                                    &app.writer,
1245                                                                    Some(id),
1246                                                                    &e,
1247                                                                    start,
1248                                                                )
1249                                                                .await;
1250                                                                break;
1251                                                            }
1252                                                            sleep(Duration::from_millis(
1253                                                                poll_interval_ms,
1254                                                            ))
1255                                                            .await;
1256                                                        }
1257                                                        Err(e) => {
1258                                                            emit_error(
1259                                                                &app.writer,
1260                                                                Some(id),
1261                                                                &e,
1262                                                                start,
1263                                                            )
1264                                                            .await;
1265                                                            break;
1266                                                        }
1267                                                    }
1268                                                }
1269                                            } else {
1270                                                // Could not find on-chain tx; fall back to recording without confirmations
1271                                                let tx_id = format!(
1272                                                    "evm_recv_{}",
1273                                                    wallet::now_epoch_seconds()
1274                                                );
1275                                                let record = HistoryRecord {
1276                                                    transaction_id: tx_id.clone(),
1277                                                    wallet: wallet_for_call.clone(),
1278                                                    network: Network::Evm,
1279                                                    direction: Direction::Receive,
1280                                                    amount: Amount {
1281                                                        value: amount_value,
1282                                                        token: amount_unit,
1283                                                    },
1284                                                    status: TxStatus::Confirmed,
1285                                                    onchain_memo: None,
1286                                                    local_memo: None,
1287                                                    remote_addr: None,
1288                                                    preimage: None,
1289                                                    created_at_epoch_s: wallet::now_epoch_seconds(),
1290                                                    confirmed_at_epoch_s: Some(
1291                                                        wallet::now_epoch_seconds(),
1292                                                    ),
1293                                                    fee: None,
1294                                                };
1295                                                if let Some(s) = &app.store {
1296                                                    let _ = s.append_transaction_record(&record);
1297                                                }
1298                                                let _ = app
1299                                                    .writer
1300                                                    .send(Output::HistoryStatus {
1301                                                        id,
1302                                                        transaction_id: tx_id,
1303                                                        status: TxStatus::Confirmed,
1304                                                        confirmations: None,
1305                                                        preimage: None,
1306                                                        item: Some(record),
1307                                                        trace: trace_from(start),
1308                                                    })
1309                                                    .await;
1310                                            }
1311                                        } else {
1312                                            let tx_id =
1313                                                format!("evm_recv_{}", wallet::now_epoch_seconds());
1314                                            let record = HistoryRecord {
1315                                                transaction_id: tx_id.clone(),
1316                                                wallet: wallet_for_call.clone(),
1317                                                network: Network::Evm,
1318                                                direction: Direction::Receive,
1319                                                amount: Amount {
1320                                                    value: amount_value,
1321                                                    token: amount_unit,
1322                                                },
1323                                                status: TxStatus::Confirmed,
1324                                                onchain_memo: None,
1325                                                local_memo: None,
1326                                                remote_addr: None,
1327                                                preimage: None,
1328                                                created_at_epoch_s: wallet::now_epoch_seconds(),
1329                                                confirmed_at_epoch_s: Some(
1330                                                    wallet::now_epoch_seconds(),
1331                                                ),
1332                                                fee: None,
1333                                            };
1334                                            if let Some(s) = &app.store {
1335                                                let _ = s.append_transaction_record(&record);
1336                                            }
1337                                            let _ = app
1338                                                .writer
1339                                                .send(Output::HistoryStatus {
1340                                                    id,
1341                                                    transaction_id: tx_id,
1342                                                    status: TxStatus::Confirmed,
1343                                                    confirmations: None,
1344                                                    preimage: None,
1345                                                    item: Some(record),
1346                                                    trace: trace_from(start),
1347                                                })
1348                                                .await;
1349                                        }
1350                                        break;
1351                                    }
1352                                }
1353                                Err(e) if e.retryable() => {
1354                                    // transient error, keep polling
1355                                }
1356                                Err(e) => {
1357                                    emit_error(&app.writer, Some(id), &e, start).await;
1358                                    break;
1359                                }
1360                            }
1361                        }
1362                        return;
1363                    }
1364
1365                    // BTC: poll wallet balance deltas for incoming deposits.
1366                    if target_network == Network::Btc {
1367                        let amount_to_watch = amount.as_ref().map(|a| a.value).filter(|v| *v > 0);
1368                        let initial_balance = match provider.balance(&wallet_for_call).await {
1369                            Ok(b) => b,
1370                            Err(e) => {
1371                                emit_error(&app.writer, Some(id), &e, start).await;
1372                                return;
1373                            }
1374                        };
1375
1376                        let deadline = Instant::now() + Duration::from_secs(timeout_secs);
1377                        loop {
1378                            sleep(Duration::from_millis(poll_interval_ms)).await;
1379                            if Instant::now() >= deadline {
1380                                let criteria = if let Some(expected) = amount_to_watch {
1381                                    format!("amount {expected}")
1382                                } else {
1383                                    "any incoming amount".to_string()
1384                                };
1385                                emit_error(
1386                                    &app.writer,
1387                                    Some(id),
1388                                    &PayError::NetworkError(format!(
1389                                        "wait timeout after {timeout_secs}s: no incoming btc transaction matching {criteria}"
1390                                    )),
1391                                    start,
1392                                )
1393                                .await;
1394                                break;
1395                            }
1396
1397                            match provider.balance(&wallet_for_call).await {
1398                                Ok(current) => {
1399                                    let confirmed_delta =
1400                                        current.confirmed.saturating_sub(initial_balance.confirmed);
1401                                    let pending_delta =
1402                                        current.pending.saturating_sub(initial_balance.pending);
1403                                    let observed_delta =
1404                                        confirmed_delta.saturating_add(pending_delta);
1405                                    if observed_delta == 0 {
1406                                        continue;
1407                                    }
1408                                    if let Some(expected) = amount_to_watch {
1409                                        if observed_delta != expected {
1410                                            continue;
1411                                        }
1412                                    }
1413
1414                                    let status = if confirmed_delta > 0 {
1415                                        TxStatus::Confirmed
1416                                    } else {
1417                                        TxStatus::Pending
1418                                    };
1419                                    let now = wallet::now_epoch_seconds();
1420                                    let tx_id = format!("btc_recv_{now}");
1421                                    let record = HistoryRecord {
1422                                        transaction_id: tx_id.clone(),
1423                                        wallet: wallet_for_call.clone(),
1424                                        network: Network::Btc,
1425                                        direction: Direction::Receive,
1426                                        amount: Amount {
1427                                            value: amount_to_watch.unwrap_or(observed_delta),
1428                                            token: current.unit.clone(),
1429                                        },
1430                                        status,
1431                                        onchain_memo: onchain_memo.clone(),
1432                                        local_memo: None,
1433                                        remote_addr: None,
1434                                        preimage: None,
1435                                        created_at_epoch_s: now,
1436                                        confirmed_at_epoch_s: if status == TxStatus::Confirmed {
1437                                            Some(now)
1438                                        } else {
1439                                            None
1440                                        },
1441                                        fee: None,
1442                                    };
1443                                    if let Some(s) = &app.store {
1444                                        if s.find_transaction_record_by_id(&tx_id)
1445                                            .ok()
1446                                            .flatten()
1447                                            .is_none()
1448                                        {
1449                                            let _ = s.append_transaction_record(&record);
1450                                        }
1451                                    }
1452                                    let _ = app
1453                                        .writer
1454                                        .send(Output::HistoryStatus {
1455                                            id,
1456                                            transaction_id: tx_id,
1457                                            status,
1458                                            confirmations: Some(if status == TxStatus::Confirmed {
1459                                                1
1460                                            } else {
1461                                                0
1462                                            }),
1463                                            preimage: None,
1464                                            item: Some(record),
1465                                            trace: trace_from(start),
1466                                        })
1467                                        .await;
1468                                    break;
1469                                }
1470                                Err(e) if e.retryable() => {
1471                                    // Transient network error, keep polling until timeout.
1472                                }
1473                                Err(e) => {
1474                                    emit_error(&app.writer, Some(id), &e, start).await;
1475                                    break;
1476                                }
1477                            }
1478                        }
1479                        return;
1480                    }
1481
1482                    let Some(quote_id) = quote_id else {
1483                        emit_error(
1484                            &app.writer,
1485                            Some(id),
1486                            &PayError::InternalError(
1487                                "deposit response missing quote_id/payment_hash".to_string(),
1488                            ),
1489                            start,
1490                        )
1491                        .await;
1492                        return;
1493                    };
1494
1495                    let deadline = Instant::now() + Duration::from_secs(timeout_secs);
1496                    loop {
1497                        match provider.receive_claim(&wallet_for_call, &quote_id).await {
1498                            Ok(claimed) => {
1499                                let _ = app
1500                                    .writer
1501                                    .send(Output::ReceiveClaimed {
1502                                        id,
1503                                        wallet: wallet_for_call.clone(),
1504                                        amount: Amount {
1505                                            value: claimed,
1506                                            token: "sats".to_string(),
1507                                        },
1508                                        trace: trace_from(start),
1509                                    })
1510                                    .await;
1511                                break;
1512                            }
1513                            Err(e) if e.retryable() => {
1514                                if Instant::now() >= deadline {
1515                                    emit_error(
1516                                        &app.writer,
1517                                        Some(id),
1518                                        &PayError::NetworkError(format!(
1519                                            "wait-until-paid timeout after {timeout_secs}s"
1520                                        )),
1521                                        start,
1522                                    )
1523                                    .await;
1524                                    break;
1525                                }
1526                                sleep(Duration::from_millis(poll_interval_ms)).await;
1527                            }
1528                            Err(e) => {
1529                                emit_error(&app.writer, Some(id), &e, start).await;
1530                                break;
1531                            }
1532                        }
1533                    }
1534                }
1535                Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
1536            }
1537        }
1538
1539        Input::ReceiveClaim {
1540            id,
1541            wallet,
1542            quote_id,
1543        } => {
1544            let start = Instant::now();
1545            emit_log(
1546                app,
1547                "wallet",
1548                Some(id.clone()),
1549                serde_json::json!({
1550                    "operation": "receive_claim", "wallet": &wallet, "quote_id": &quote_id,
1551                }),
1552            )
1553            .await;
1554            let meta = match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
1555                Ok(m) => m,
1556                Err(e) => {
1557                    emit_error(&app.writer, Some(id), &e, start).await;
1558                    return;
1559                }
1560            };
1561            let Some(provider) = get_provider(&app.providers, meta.network) else {
1562                emit_error(
1563                    &app.writer,
1564                    Some(id),
1565                    &PayError::NotImplemented(format!("no provider for {}", meta.network)),
1566                    start,
1567                )
1568                .await;
1569                return;
1570            };
1571
1572            match provider.receive_claim(&wallet, &quote_id).await {
1573                Ok(claimed) => {
1574                    let _ = app
1575                        .writer
1576                        .send(Output::ReceiveClaimed {
1577                            id,
1578                            wallet,
1579                            amount: Amount {
1580                                value: claimed,
1581                                token: "sats".to_string(),
1582                            },
1583                            trace: trace_from(start),
1584                        })
1585                        .await;
1586                }
1587                Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
1588            }
1589        }
1590
1591        Input::CashuSend {
1592            id,
1593            wallet,
1594            amount,
1595            onchain_memo,
1596            local_memo,
1597            mints,
1598        } => {
1599            let start = Instant::now();
1600            emit_log(
1601                app,
1602                "pay",
1603                Some(id.clone()),
1604                serde_json::json!({
1605                    "operation": "cashu_send", "wallet": wallet.as_deref().unwrap_or("auto"),
1606                    "amount": amount.value, "onchain_memo": onchain_memo.as_deref().unwrap_or(""),
1607                    "mints": mints.as_deref().unwrap_or(&[]),
1608                }),
1609            )
1610            .await;
1611
1612            let wallet_str = wallet.unwrap_or_default();
1613            let mints_ref = mints.as_deref();
1614            let Some(provider) = get_provider(&app.providers, Network::Cashu) else {
1615                emit_error(
1616                    &app.writer,
1617                    Some(id),
1618                    &PayError::NotImplemented("no provider for cashu".to_string()),
1619                    start,
1620                )
1621                .await;
1622                return;
1623            };
1624
1625            let mut reservation_id: Option<u64> = None;
1626            if app.enforce_limits {
1627                let spend_ctx = SpendContext {
1628                    network: "cashu".to_string(),
1629                    wallet: if wallet_str.is_empty() {
1630                        None
1631                    } else {
1632                        Some(wallet_str.clone())
1633                    },
1634                    amount_native: amount.value,
1635                    token: None,
1636                };
1637                match app
1638                    .spend_ledger
1639                    .reserve(&format!("cashu_send:{id}"), &spend_ctx)
1640                    .await
1641                {
1642                    Ok(rid) => reservation_id = Some(rid),
1643                    Err(e) => {
1644                        if let PayError::LimitExceeded {
1645                            rule_id,
1646                            scope,
1647                            scope_key,
1648                            spent,
1649                            max_spend,
1650                            token,
1651                            remaining_s,
1652                            origin,
1653                        } = &e
1654                        {
1655                            let _ = app
1656                                .writer
1657                                .send(Output::LimitExceeded {
1658                                    id,
1659                                    rule_id: rule_id.clone(),
1660                                    scope: *scope,
1661                                    scope_key: scope_key.clone(),
1662                                    spent: *spent,
1663                                    max_spend: *max_spend,
1664                                    token: token.clone(),
1665                                    remaining_s: *remaining_s,
1666                                    origin: origin.clone(),
1667                                    trace: trace_from(start),
1668                                })
1669                                .await;
1670                        } else {
1671                            emit_error(&app.writer, Some(id), &e, start).await;
1672                        }
1673                        return;
1674                    }
1675                }
1676            }
1677
1678            match provider
1679                .cashu_send(
1680                    &wallet_str,
1681                    amount.clone(),
1682                    onchain_memo.as_deref(),
1683                    mints_ref,
1684                )
1685                .await
1686            {
1687                Ok(r) => {
1688                    if let Some(rid) = reservation_id {
1689                        let _ = app.spend_ledger.confirm(rid).await;
1690                    }
1691                    if local_memo.is_some() {
1692                        if let Some(s) = &app.store {
1693                            let _ = s.update_transaction_record_memo(
1694                                &r.transaction_id,
1695                                local_memo.as_ref(),
1696                            );
1697                        }
1698                    }
1699                    let _ = app
1700                        .writer
1701                        .send(Output::CashuSent {
1702                            id,
1703                            wallet: r.wallet,
1704                            transaction_id: r.transaction_id,
1705                            status: r.status,
1706                            fee: r.fee,
1707                            token: r.token,
1708                            trace: trace_from(start),
1709                        })
1710                        .await;
1711                }
1712                Err(e) => {
1713                    if let Some(rid) = reservation_id {
1714                        let _ = app.spend_ledger.cancel(rid).await;
1715                    }
1716                    emit_error(&app.writer, Some(id), &e, start).await
1717                }
1718            }
1719        }
1720
1721        Input::CashuReceive { id, wallet, token } => {
1722            let start = Instant::now();
1723            let token_preview = if token.len() > 20 {
1724                format!("{}...", &token[..20])
1725            } else {
1726                token.clone()
1727            };
1728            emit_log(
1729                app,
1730                "pay",
1731                Some(id.clone()),
1732                serde_json::json!({
1733                    "operation": "cashu_receive", "wallet": wallet.as_deref().unwrap_or("auto"), "token": token_preview,
1734                }),
1735            )
1736            .await;
1737            let wallet_str = wallet.unwrap_or_default();
1738            let Some(provider) = get_provider(&app.providers, Network::Cashu) else {
1739                emit_error(
1740                    &app.writer,
1741                    Some(id),
1742                    &PayError::NotImplemented("no provider for cashu".to_string()),
1743                    start,
1744                )
1745                .await;
1746                return;
1747            };
1748            match provider.cashu_receive(&wallet_str, &token).await {
1749                Ok(r) => {
1750                    let _ = app
1751                        .writer
1752                        .send(Output::CashuReceived {
1753                            id,
1754                            wallet: r.wallet,
1755                            amount: r.amount,
1756                            trace: trace_from(start),
1757                        })
1758                        .await;
1759                }
1760                Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
1761            }
1762        }
1763
1764        Input::Send {
1765            id,
1766            wallet,
1767            network,
1768            to,
1769            onchain_memo,
1770            local_memo,
1771            mints,
1772        } => {
1773            let start = Instant::now();
1774            let operation_name = "send";
1775            let to_preview = if to.len() > 20 {
1776                format!("{}...", &to[..20])
1777            } else {
1778                to.clone()
1779            };
1780            emit_log(
1781                app,
1782                "pay",
1783                Some(id.clone()),
1784                serde_json::json!({
1785                    "operation": operation_name, "wallet": wallet.as_deref().unwrap_or("auto"),
1786                    "network": network.map(|c| c.to_string()).unwrap_or_else(|| "auto".to_string()),
1787                    "to": to_preview, "onchain_memo": onchain_memo.as_deref().unwrap_or(""),
1788                }),
1789            )
1790            .await;
1791
1792            let (target_network, wallet_for_call) = if let Some(w) =
1793                wallet.filter(|w| !w.is_empty())
1794            {
1795                let meta = match require_store(app).and_then(|s| s.load_wallet_metadata(&w)) {
1796                    Ok(m) => m,
1797                    Err(e) => {
1798                        emit_error(&app.writer, Some(id), &e, start).await;
1799                        return;
1800                    }
1801                };
1802                if let Some(expected) = network {
1803                    if meta.network != expected {
1804                        emit_error(
1805                            &app.writer,
1806                            Some(id),
1807                            &PayError::InvalidAmount(format!(
1808                                "wallet {w} is {}, not {expected}",
1809                                meta.network
1810                            )),
1811                            start,
1812                        )
1813                        .await;
1814                        return;
1815                    }
1816                }
1817                (meta.network, w)
1818            } else {
1819                let wallets = match require_store(app).and_then(|s| s.list_wallet_metadata(network))
1820                {
1821                    Ok(v) => v,
1822                    Err(e) => {
1823                        emit_error(&app.writer, Some(id), &e, start).await;
1824                        return;
1825                    }
1826                };
1827
1828                // For cashu with --cashu-mint or multiple wallets: filter by mint,
1829                // then select wallet with smallest sufficient balance.
1830                let is_cashu = matches!(network, Some(Network::Cashu));
1831                let filtered: Vec<_> = if is_cashu {
1832                    if let Some(ref mint_list) = mints {
1833                        wallets
1834                            .into_iter()
1835                            .filter(|w| {
1836                                w.mint_url.as_deref().is_some_and(|u| {
1837                                    let nu = u.trim().trim_end_matches('/');
1838                                    mint_list
1839                                        .iter()
1840                                        .any(|m| m.trim().trim_end_matches('/') == nu)
1841                                })
1842                            })
1843                            .collect()
1844                    } else {
1845                        wallets
1846                    }
1847                } else {
1848                    wallets
1849                };
1850
1851                match filtered.len() {
1852                    0 => {
1853                        let msg = if mints.is_some() {
1854                            "no cashu wallet found matching --cashu-mint".to_string()
1855                        } else {
1856                            match network {
1857                                Some(network) => format!("no {network} wallet found"),
1858                                None => "no wallet found".to_string(),
1859                            }
1860                        };
1861                        emit_error(&app.writer, Some(id), &PayError::WalletNotFound(msg), start)
1862                            .await;
1863                        return;
1864                    }
1865                    1 => (filtered[0].network, filtered[0].id.clone()),
1866                    _ if is_cashu => {
1867                        // Multiple cashu wallets: select by smallest sufficient balance.
1868                        // Pass empty wallet to provider — it will use select_wallet_by_balance.
1869                        (Network::Cashu, String::new())
1870                    }
1871                    _ => {
1872                        let msg = match network {
1873                            Some(network) => {
1874                                format!("multiple {network} wallets found; pass --wallet")
1875                            }
1876                            None => "multiple wallets found; pass --wallet".to_string(),
1877                        };
1878                        emit_error(&app.writer, Some(id), &PayError::InvalidAmount(msg), start)
1879                            .await;
1880                        return;
1881                    }
1882                }
1883            };
1884
1885            let Some(provider) = get_provider(&app.providers, target_network) else {
1886                emit_error(
1887                    &app.writer,
1888                    Some(id),
1889                    &PayError::NotImplemented(format!("no provider for {target_network}")),
1890                    start,
1891                )
1892                .await;
1893                return;
1894            };
1895
1896            let mut reservation_id: Option<u64> = None;
1897            if app.enforce_limits {
1898                let quote = match provider
1899                    .send_quote(&wallet_for_call, &to, mints.as_deref())
1900                    .await
1901                {
1902                    Ok(q) => q,
1903                    Err(e) => {
1904                        emit_error(&app.writer, Some(id), &e, start).await;
1905                        return;
1906                    }
1907                };
1908                let spend_amount = quote.amount_native + quote.fee_estimate_native;
1909                let provider_key = require_store(app)
1910                    .and_then(|s| s.load_wallet_metadata(&quote.wallet))
1911                    .ok()
1912                    .map(|meta| wallet_provider_key(&meta))
1913                    .unwrap_or_else(|| target_network.to_string());
1914                let spend_ctx = SpendContext {
1915                    network: provider_key,
1916                    wallet: Some(quote.wallet.clone()),
1917                    amount_native: spend_amount,
1918                    token: extract_token_from_target(&to),
1919                };
1920                match app
1921                    .spend_ledger
1922                    .reserve(&format!("send:{id}"), &spend_ctx)
1923                    .await
1924                {
1925                    Ok(rid) => reservation_id = Some(rid),
1926                    Err(e) => {
1927                        if let PayError::LimitExceeded {
1928                            rule_id,
1929                            scope,
1930                            scope_key,
1931                            spent,
1932                            max_spend,
1933                            token,
1934                            remaining_s,
1935                            origin,
1936                        } = &e
1937                        {
1938                            let _ = app
1939                                .writer
1940                                .send(Output::LimitExceeded {
1941                                    id,
1942                                    rule_id: rule_id.clone(),
1943                                    scope: *scope,
1944                                    scope_key: scope_key.clone(),
1945                                    spent: *spent,
1946                                    max_spend: *max_spend,
1947                                    token: token.clone(),
1948                                    remaining_s: *remaining_s,
1949                                    origin: origin.clone(),
1950                                    trace: trace_from(start),
1951                                })
1952                                .await;
1953                        } else {
1954                            emit_error(&app.writer, Some(id), &e, start).await;
1955                        }
1956                        return;
1957                    }
1958                }
1959            }
1960            match provider
1961                .send(
1962                    &wallet_for_call,
1963                    &to,
1964                    onchain_memo.as_deref(),
1965                    mints.as_deref(),
1966                )
1967                .await
1968            {
1969                Ok(r) => {
1970                    if let Some(rid) = reservation_id {
1971                        let _ = app.spend_ledger.confirm(rid).await;
1972                    }
1973                    if local_memo.is_some() {
1974                        if let Some(s) = &app.store {
1975                            let _ = s.update_transaction_record_memo(
1976                                &r.transaction_id,
1977                                local_memo.as_ref(),
1978                            );
1979                        }
1980                    }
1981                    let _ = app
1982                        .writer
1983                        .send(Output::Sent {
1984                            id,
1985                            wallet: r.wallet,
1986                            transaction_id: r.transaction_id,
1987                            amount: r.amount,
1988                            fee: r.fee,
1989                            preimage: r.preimage,
1990                            trace: trace_from(start),
1991                        })
1992                        .await;
1993                }
1994                Err(e) => {
1995                    if let Some(rid) = reservation_id {
1996                        let _ = app.spend_ledger.cancel(rid).await;
1997                    }
1998                    emit_error(&app.writer, Some(id), &e, start).await
1999                }
2000            }
2001        }
2002
2003        Input::Restore { id, wallet } => {
2004            let start = Instant::now();
2005            emit_log(
2006                app,
2007                "wallet",
2008                Some(id.clone()),
2009                serde_json::json!({
2010                    "operation": "restore", "wallet": &wallet,
2011                }),
2012            )
2013            .await;
2014            match try_provider!(&app.providers, |p| p.restore(&wallet)) {
2015                Ok(r) => {
2016                    let _ = app
2017                        .writer
2018                        .send(Output::Restored {
2019                            id,
2020                            wallet: r.wallet,
2021                            unspent: r.unspent,
2022                            spent: r.spent,
2023                            pending: r.pending,
2024                            unit: r.unit,
2025                            trace: trace_from(start),
2026                        })
2027                        .await;
2028                }
2029                Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2030            }
2031        }
2032
2033        Input::WalletShowSeed { id, wallet } => {
2034            let start = Instant::now();
2035            emit_log(
2036                app,
2037                "wallet",
2038                Some(id.clone()),
2039                serde_json::json!({
2040                    "operation": "wallet_show_seed", "wallet": &wallet,
2041                }),
2042            )
2043            .await;
2044            match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
2045                Ok(meta) => match meta.network {
2046                    Network::Cashu => match meta.seed_secret {
2047                        Some(mnemonic) => {
2048                            let _ = app
2049                                .writer
2050                                .send(Output::WalletSeed {
2051                                    id,
2052                                    wallet,
2053                                    mnemonic_secret: mnemonic,
2054                                    trace: trace_from(start),
2055                                })
2056                                .await;
2057                        }
2058                        None => {
2059                            emit_error(
2060                                &app.writer,
2061                                Some(id),
2062                                &PayError::InternalError("wallet has no seed".to_string()),
2063                                start,
2064                            )
2065                            .await;
2066                        }
2067                    },
2068                    Network::Sol => match meta.seed_secret {
2069                        Some(secret) => {
2070                            if looks_like_bip39_mnemonic(&secret) {
2071                                let _ = app
2072                                    .writer
2073                                    .send(Output::WalletSeed {
2074                                        id,
2075                                        wallet,
2076                                        mnemonic_secret: secret,
2077                                        trace: trace_from(start),
2078                                    })
2079                                    .await;
2080                            } else {
2081                                emit_error(
2082                                        &app.writer,
2083                                        Some(id),
2084                                        &PayError::InvalidAmount(
2085                                            "this sol wallet was created before mnemonic support; create a new sol wallet to get 12-word backup".to_string(),
2086                                        ),
2087                                        start,
2088                                    )
2089                                    .await;
2090                            }
2091                        }
2092                        None => {
2093                            emit_error(
2094                                &app.writer,
2095                                Some(id),
2096                                &PayError::InternalError("wallet has no seed".to_string()),
2097                                start,
2098                            )
2099                            .await;
2100                        }
2101                    },
2102                    Network::Ln => {
2103                        emit_error(
2104                                &app.writer,
2105                                Some(id),
2106                                &PayError::InvalidAmount(
2107                                    "ln wallets do not have mnemonic words; they store backend credentials (nwc-uri/password/admin-key)".to_string(),
2108                                ),
2109                                start,
2110                            )
2111                            .await;
2112                    }
2113                    Network::Evm | Network::Btc => match meta.seed_secret {
2114                        Some(secret) => {
2115                            if looks_like_bip39_mnemonic(&secret) {
2116                                let _ = app
2117                                    .writer
2118                                    .send(Output::WalletSeed {
2119                                        id,
2120                                        wallet,
2121                                        mnemonic_secret: secret,
2122                                        trace: trace_from(start),
2123                                    })
2124                                    .await;
2125                            } else {
2126                                emit_error(
2127                                        &app.writer,
2128                                        Some(id),
2129                                        &PayError::InvalidAmount(
2130                                            "this wallet was created before mnemonic support; create a new wallet to get 12-word backup".to_string(),
2131                                        ),
2132                                        start,
2133                                    )
2134                                    .await;
2135                            }
2136                        }
2137                        None => {
2138                            emit_error(
2139                                &app.writer,
2140                                Some(id),
2141                                &PayError::InternalError("wallet has no seed".to_string()),
2142                                start,
2143                            )
2144                            .await;
2145                        }
2146                    },
2147                },
2148                Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2149            }
2150        }
2151
2152        Input::HistoryList {
2153            id,
2154            wallet,
2155            network,
2156            onchain_memo,
2157            limit,
2158            offset,
2159            since_epoch_s,
2160            until_epoch_s,
2161        } => {
2162            let start = Instant::now();
2163            let lim = limit.unwrap_or(20);
2164            let off = offset.unwrap_or(0);
2165            let memo_filter = onchain_memo
2166                .as_deref()
2167                .map(str::trim)
2168                .filter(|value| !value.is_empty())
2169                .map(str::to_owned);
2170            let store = match require_store(app) {
2171                Ok(store) => store,
2172                Err(e) => {
2173                    emit_error(&app.writer, Some(id), &e, start).await;
2174                    return;
2175                }
2176            };
2177
2178            let mut all_txs = Vec::new();
2179            if let Some(wallet_id) = wallet {
2180                let meta = match store.load_wallet_metadata(&wallet_id) {
2181                    Ok(meta) => meta,
2182                    Err(e) => {
2183                        emit_error(&app.writer, Some(id), &e, start).await;
2184                        return;
2185                    }
2186                };
2187                if let Some(expected_network) = network {
2188                    if meta.network != expected_network {
2189                        let _ = app
2190                            .writer
2191                            .send(Output::History {
2192                                id,
2193                                items: Vec::new(),
2194                                trace: trace_from(start),
2195                            })
2196                            .await;
2197                        return;
2198                    }
2199                }
2200                match store.load_wallet_transaction_records(&wallet_id) {
2201                    Ok(mut records) => all_txs.append(&mut records),
2202                    Err(e) => {
2203                        emit_error(&app.writer, Some(id), &e, start).await;
2204                        return;
2205                    }
2206                }
2207            } else {
2208                let wallets = match store.list_wallet_metadata(network) {
2209                    Ok(wallets) => wallets,
2210                    Err(e) => {
2211                        emit_error(&app.writer, Some(id), &e, start).await;
2212                        return;
2213                    }
2214                };
2215                for wallet_meta in wallets {
2216                    match store.load_wallet_transaction_records(&wallet_meta.id) {
2217                        Ok(mut records) => all_txs.append(&mut records),
2218                        Err(e) => {
2219                            emit_error(&app.writer, Some(id.clone()), &e, start).await;
2220                            return;
2221                        }
2222                    }
2223                }
2224            }
2225
2226            if let Some(expected_network) = network {
2227                all_txs.retain(|item| item.network == expected_network);
2228            }
2229            if let Some(since) = since_epoch_s {
2230                all_txs.retain(|item| item.created_at_epoch_s >= since);
2231            }
2232            if let Some(until) = until_epoch_s {
2233                all_txs.retain(|item| item.created_at_epoch_s < until);
2234            }
2235            if let Some(filter) = memo_filter.as_deref() {
2236                all_txs.retain(|item| item.onchain_memo.as_deref() == Some(filter));
2237            }
2238            all_txs.sort_by(|a, b| b.created_at_epoch_s.cmp(&a.created_at_epoch_s));
2239            let start_idx = all_txs.len().min(off);
2240            let end_idx = all_txs.len().min(off.saturating_add(lim));
2241            let items = all_txs[start_idx..end_idx].to_vec();
2242            let _ = app
2243                .writer
2244                .send(Output::History {
2245                    id,
2246                    items,
2247                    trace: trace_from(start),
2248                })
2249                .await;
2250        }
2251
2252        Input::HistoryStatus { id, transaction_id } => {
2253            let start = Instant::now();
2254            let mut routed: Option<Result<HistoryStatusInfo, PayError>> = None;
2255            for provider in app.providers.values() {
2256                match provider.history_status(&transaction_id).await {
2257                    Ok(info) => {
2258                        routed = Some(Ok(info));
2259                        break;
2260                    }
2261                    Err(PayError::NotImplemented(_)) | Err(PayError::WalletNotFound(_)) => {}
2262                    Err(err) => {
2263                        routed = Some(Err(err));
2264                        break;
2265                    }
2266                }
2267            }
2268            match routed.unwrap_or_else(|| {
2269                Err(PayError::WalletNotFound(format!(
2270                    "transaction {transaction_id} not found"
2271                )))
2272            }) {
2273                Ok(info) => {
2274                    let _ = app
2275                        .writer
2276                        .send(Output::HistoryStatus {
2277                            id,
2278                            transaction_id: info.transaction_id,
2279                            status: info.status,
2280                            confirmations: info.confirmations,
2281                            preimage: info.preimage,
2282                            item: info.item,
2283                            trace: trace_from(start),
2284                        })
2285                        .await;
2286                }
2287                Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2288            }
2289        }
2290
2291        Input::HistoryUpdate {
2292            id,
2293            wallet,
2294            network,
2295            limit,
2296        } => {
2297            let start = Instant::now();
2298            if let Err(e) = require_store(app) {
2299                emit_error(&app.writer, Some(id), &e, start).await;
2300                return;
2301            }
2302
2303            let sync_limit = limit.unwrap_or(200).clamp(1, 5000);
2304            let mut totals = HistorySyncStats::default();
2305            let mut wallets_synced = 0usize;
2306
2307            if let Some(wallet_id) = wallet {
2308                let sync_result = if let Some(expected_network) = network {
2309                    match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet_id)) {
2310                        Ok(meta) if meta.network != expected_network => {
2311                            Err(PayError::InvalidAmount(format!(
2312                                "wallet {wallet_id} belongs to {}, not {expected_network}",
2313                                meta.network
2314                            )))
2315                        }
2316                        Ok(_) => match get_provider(&app.providers, expected_network) {
2317                            Some(provider) => provider.history_sync(&wallet_id, sync_limit).await,
2318                            None => Err(PayError::NotImplemented(format!(
2319                                "network {expected_network} not enabled"
2320                            ))),
2321                        },
2322                        Err(e) => Err(e),
2323                    }
2324                } else {
2325                    match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet_id)) {
2326                        Ok(meta) => match get_provider(&app.providers, meta.network) {
2327                            Some(provider) => provider.history_sync(&wallet_id, sync_limit).await,
2328                            None => Err(PayError::NotImplemented(format!(
2329                                "network {} not enabled",
2330                                meta.network
2331                            ))),
2332                        },
2333                        Err(e) => Err(e),
2334                    }
2335                };
2336
2337                match sync_result {
2338                    Ok(stats) => {
2339                        wallets_synced = 1;
2340                        totals.records_scanned =
2341                            totals.records_scanned.saturating_add(stats.records_scanned);
2342                        totals.records_added =
2343                            totals.records_added.saturating_add(stats.records_added);
2344                        totals.records_updated =
2345                            totals.records_updated.saturating_add(stats.records_updated);
2346                    }
2347                    Err(e) => {
2348                        emit_error(&app.writer, Some(id), &e, start).await;
2349                        return;
2350                    }
2351                }
2352            } else {
2353                let target_networks: Vec<Network> = if let Some(single) = network {
2354                    vec![single]
2355                } else {
2356                    vec![
2357                        Network::Cashu,
2358                        Network::Ln,
2359                        Network::Sol,
2360                        Network::Evm,
2361                        Network::Btc,
2362                    ]
2363                };
2364
2365                let wallets = match require_store(app).and_then(|s| s.list_wallet_metadata(None)) {
2366                    Ok(wallets) => wallets,
2367                    Err(e) => {
2368                        emit_error(&app.writer, Some(id), &e, start).await;
2369                        return;
2370                    }
2371                };
2372
2373                for network_key in target_networks {
2374                    let Some(provider) = get_provider(&app.providers, network_key) else {
2375                        if network.is_some() {
2376                            emit_error(
2377                                &app.writer,
2378                                Some(id),
2379                                &PayError::NotImplemented(format!(
2380                                    "network {network_key} not enabled"
2381                                )),
2382                                start,
2383                            )
2384                            .await;
2385                            return;
2386                        }
2387                        continue;
2388                    };
2389                    for wallet_meta in &wallets {
2390                        if wallet_meta.network != network_key {
2391                            continue;
2392                        }
2393                        match provider.history_sync(&wallet_meta.id, sync_limit).await {
2394                            Ok(stats) => {
2395                                wallets_synced = wallets_synced.saturating_add(1);
2396                                totals.records_scanned =
2397                                    totals.records_scanned.saturating_add(stats.records_scanned);
2398                                totals.records_added =
2399                                    totals.records_added.saturating_add(stats.records_added);
2400                                totals.records_updated =
2401                                    totals.records_updated.saturating_add(stats.records_updated);
2402                            }
2403                            Err(PayError::NotImplemented(_)) | Err(PayError::WalletNotFound(_)) => {
2404                            }
2405                            Err(e) => {
2406                                emit_error(&app.writer, Some(id), &e, start).await;
2407                                return;
2408                            }
2409                        }
2410                    }
2411                }
2412            }
2413
2414            let _ = app
2415                .writer
2416                .send(Output::HistoryUpdated {
2417                    id,
2418                    wallets_synced,
2419                    records_scanned: totals.records_scanned,
2420                    records_added: totals.records_added,
2421                    records_updated: totals.records_updated,
2422                    trace: trace_from(start),
2423                })
2424                .await;
2425        }
2426
2427        Input::LimitAdd { id, mut limit } => {
2428            let start = Instant::now();
2429            if !app.enforce_limits {
2430                emit_error(
2431                    &app.writer,
2432                    Some(id),
2433                    &PayError::NotImplemented(
2434                        "limit_add is unavailable when limits are not enforced locally; configure limits on the RPC daemon"
2435                            .to_string(),
2436                    ),
2437                    start,
2438                )
2439                .await;
2440                return;
2441            }
2442
2443            // Auto-fill provider for wallet-scope rules that don't have one
2444            if limit.scope == SpendScope::Wallet && limit.network.is_none() {
2445                if let Some(wallet_id) = limit.wallet.as_deref() {
2446                    match require_store(app).and_then(|s| s.load_wallet_metadata(wallet_id)) {
2447                        Ok(meta) => {
2448                            limit.network = Some(meta.network.to_string());
2449                        }
2450                        Err(e) => {
2451                            emit_error(&app.writer, Some(id), &e, start).await;
2452                            return;
2453                        }
2454                    }
2455                }
2456            }
2457
2458            match app.spend_ledger.add_limit(&mut limit).await {
2459                Ok(rule_id) => {
2460                    let _ = app
2461                        .writer
2462                        .send(Output::LimitAdded {
2463                            id,
2464                            rule_id,
2465                            trace: trace_from(start),
2466                        })
2467                        .await;
2468                }
2469                Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2470            }
2471        }
2472
2473        Input::LimitRemove { id, rule_id } => {
2474            let start = Instant::now();
2475            if !app.enforce_limits {
2476                emit_error(
2477                    &app.writer,
2478                    Some(id),
2479                    &PayError::NotImplemented(
2480                        "limit_remove is unavailable when limits are not enforced locally; configure limits on the RPC daemon"
2481                            .to_string(),
2482                    ),
2483                    start,
2484                )
2485                .await;
2486                return;
2487            }
2488
2489            match app.spend_ledger.remove_limit(&rule_id).await {
2490                Ok(()) => {
2491                    let _ = app
2492                        .writer
2493                        .send(Output::LimitRemoved {
2494                            id,
2495                            rule_id,
2496                            trace: trace_from(start),
2497                        })
2498                        .await;
2499                }
2500                Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2501            }
2502        }
2503
2504        Input::LimitList { id } => {
2505            let start = Instant::now();
2506            let local_limits = if app.enforce_limits {
2507                match app.spend_ledger.get_status().await {
2508                    Ok(status) => status,
2509                    Err(e) => {
2510                        emit_error(&app.writer, Some(id), &e, start).await;
2511                        return;
2512                    }
2513                }
2514            } else {
2515                vec![]
2516            };
2517
2518            // Query downstream afpay_rpc nodes
2519            let config = app.config.read().await.clone();
2520            let downstream = query_downstream_limits(&config).await;
2521
2522            let _ = app
2523                .writer
2524                .send(Output::LimitStatus {
2525                    id,
2526                    limits: local_limits,
2527                    downstream,
2528                    trace: trace_from(start),
2529                })
2530                .await;
2531        }
2532
2533        Input::LimitSet { id, mut limits } => {
2534            let start = Instant::now();
2535            if !app.enforce_limits {
2536                emit_error(
2537                    &app.writer,
2538                    Some(id),
2539                    &PayError::NotImplemented(
2540                        "limit_set is unavailable when limits are not enforced locally; configure limits on the RPC daemon"
2541                            .to_string(),
2542                    ),
2543                    start,
2544                )
2545                .await;
2546                return;
2547            }
2548
2549            // Auto-fill provider for wallet-scope rules that don't have one
2550            for rule in &mut limits {
2551                if rule.scope == SpendScope::Wallet && rule.network.is_none() {
2552                    if let Some(wallet_id) = rule.wallet.as_deref() {
2553                        match require_store(app).and_then(|s| s.load_wallet_metadata(wallet_id)) {
2554                            Ok(meta) => {
2555                                rule.network = Some(meta.network.to_string());
2556                            }
2557                            Err(e) => {
2558                                emit_error(&app.writer, Some(id), &e, start).await;
2559                                return;
2560                            }
2561                        }
2562                    }
2563                }
2564            }
2565
2566            match app.spend_ledger.set_limits(&limits).await {
2567                Ok(()) => match app.spend_ledger.get_status().await {
2568                    Ok(status) => {
2569                        let _ = app
2570                            .writer
2571                            .send(Output::LimitStatus {
2572                                id,
2573                                limits: status,
2574                                downstream: vec![],
2575                                trace: trace_from(start),
2576                            })
2577                            .await;
2578                    }
2579                    Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2580                },
2581                Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2582            }
2583        }
2584
2585        Input::WalletConfigShow { id, wallet } => {
2586            let start = Instant::now();
2587            match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
2588                Ok(meta) => {
2589                    let resolved_wallet = meta.id.clone();
2590                    let _ = app
2591                        .writer
2592                        .send(Output::WalletConfig {
2593                            id,
2594                            wallet: resolved_wallet,
2595                            config: meta,
2596                            trace: trace_from(start),
2597                        })
2598                        .await;
2599                }
2600                Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2601            }
2602        }
2603
2604        Input::WalletConfigSet {
2605            id,
2606            wallet,
2607            label,
2608            rpc_endpoints,
2609            chain_id,
2610        } => {
2611            let start = Instant::now();
2612            match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
2613                Ok(mut meta) => {
2614                    let resolved_wallet = meta.id.clone();
2615                    let mut changed = false;
2616
2617                    if let Some(new_label) = label {
2618                        let trimmed = new_label.trim();
2619                        meta.label = if trimmed.is_empty() {
2620                            None
2621                        } else {
2622                            Some(trimmed.to_string())
2623                        };
2624                        changed = true;
2625                    }
2626
2627                    if !rpc_endpoints.is_empty() {
2628                        match meta.network {
2629                            Network::Sol => {
2630                                meta.sol_rpc_endpoints = Some(rpc_endpoints);
2631                                changed = true;
2632                            }
2633                            Network::Evm => {
2634                                meta.evm_rpc_endpoints = Some(rpc_endpoints);
2635                                changed = true;
2636                            }
2637                            _ => {
2638                                emit_error(
2639                                    &app.writer,
2640                                    Some(id),
2641                                    &PayError::InvalidAmount(format!(
2642                                        "rpc-endpoint not supported for {} wallets",
2643                                        meta.network
2644                                    )),
2645                                    start,
2646                                )
2647                                .await;
2648                                return;
2649                            }
2650                        }
2651                    }
2652
2653                    if let Some(cid) = chain_id {
2654                        if meta.network != Network::Evm {
2655                            emit_error(
2656                                &app.writer,
2657                                Some(id),
2658                                &PayError::InvalidAmount(
2659                                    "chain-id is only supported for evm wallets".to_string(),
2660                                ),
2661                                start,
2662                            )
2663                            .await;
2664                            return;
2665                        }
2666                        meta.evm_chain_id = Some(cid);
2667                        changed = true;
2668                    }
2669
2670                    if !changed {
2671                        emit_error(
2672                            &app.writer,
2673                            Some(id),
2674                            &PayError::InvalidAmount(
2675                                "no configuration changes specified".to_string(),
2676                            ),
2677                            start,
2678                        )
2679                        .await;
2680                        return;
2681                    }
2682
2683                    match require_store(app).and_then(|s| s.save_wallet_metadata(&meta)) {
2684                        Ok(()) => {
2685                            let _ = app
2686                                .writer
2687                                .send(Output::WalletConfigUpdated {
2688                                    id,
2689                                    wallet: resolved_wallet,
2690                                    trace: trace_from(start),
2691                                })
2692                                .await;
2693                        }
2694                        Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2695                    }
2696                }
2697                Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2698            }
2699        }
2700
2701        Input::WalletConfigTokenAdd {
2702            id,
2703            wallet,
2704            symbol,
2705            address,
2706            decimals,
2707        } => {
2708            let start = Instant::now();
2709            match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
2710                Ok(mut meta) => {
2711                    let resolved_wallet = meta.id.clone();
2712                    if !matches!(meta.network, Network::Evm | Network::Sol) {
2713                        emit_error(
2714                            &app.writer,
2715                            Some(id),
2716                            &PayError::InvalidAmount(format!(
2717                                "custom tokens not supported for {} wallets",
2718                                meta.network
2719                            )),
2720                            start,
2721                        )
2722                        .await;
2723                        return;
2724                    }
2725
2726                    let lower_symbol = symbol.to_ascii_lowercase();
2727                    let tokens = meta.custom_tokens.get_or_insert_with(Vec::new);
2728                    if tokens
2729                        .iter()
2730                        .any(|t| t.symbol.to_ascii_lowercase() == lower_symbol)
2731                    {
2732                        emit_error(
2733                            &app.writer,
2734                            Some(id),
2735                            &PayError::InvalidAmount(format!(
2736                                "custom token '{lower_symbol}' already registered"
2737                            )),
2738                            start,
2739                        )
2740                        .await;
2741                        return;
2742                    }
2743
2744                    tokens.push(wallet::CustomToken {
2745                        symbol: lower_symbol.clone(),
2746                        address: address.clone(),
2747                        decimals,
2748                    });
2749
2750                    match require_store(app).and_then(|s| s.save_wallet_metadata(&meta)) {
2751                        Ok(()) => {
2752                            let _ = app
2753                                .writer
2754                                .send(Output::WalletConfigTokenAdded {
2755                                    id,
2756                                    wallet: resolved_wallet,
2757                                    symbol: lower_symbol,
2758                                    address,
2759                                    decimals,
2760                                    trace: trace_from(start),
2761                                })
2762                                .await;
2763                        }
2764                        Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2765                    }
2766                }
2767                Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2768            }
2769        }
2770
2771        Input::WalletConfigTokenRemove { id, wallet, symbol } => {
2772            let start = Instant::now();
2773            match require_store(app).and_then(|s| s.load_wallet_metadata(&wallet)) {
2774                Ok(mut meta) => {
2775                    let resolved_wallet = meta.id.clone();
2776                    let lower_symbol = symbol.to_ascii_lowercase();
2777                    let tokens = meta.custom_tokens.get_or_insert_with(Vec::new);
2778                    let before_len = tokens.len();
2779                    tokens.retain(|t| t.symbol.to_ascii_lowercase() != lower_symbol);
2780                    if tokens.len() == before_len {
2781                        emit_error(
2782                            &app.writer,
2783                            Some(id),
2784                            &PayError::InvalidAmount(format!(
2785                                "custom token '{lower_symbol}' not found"
2786                            )),
2787                            start,
2788                        )
2789                        .await;
2790                        return;
2791                    }
2792                    if tokens.is_empty() {
2793                        meta.custom_tokens = None;
2794                    }
2795
2796                    match require_store(app).and_then(|s| s.save_wallet_metadata(&meta)) {
2797                        Ok(()) => {
2798                            let _ = app
2799                                .writer
2800                                .send(Output::WalletConfigTokenRemoved {
2801                                    id,
2802                                    wallet: resolved_wallet,
2803                                    symbol: lower_symbol,
2804                                    trace: trace_from(start),
2805                                })
2806                                .await;
2807                        }
2808                        Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2809                    }
2810                }
2811                Err(e) => emit_error(&app.writer, Some(id), &e, start).await,
2812            }
2813        }
2814
2815        Input::Config(patch) => {
2816            let start = Instant::now();
2817            let ConfigPatch {
2818                data_dir,
2819                limits,
2820                log,
2821                exchange_rate,
2822                afpay_rpc,
2823                providers,
2824            } = patch;
2825
2826            let mut unsupported = Vec::new();
2827            if data_dir.is_some() {
2828                unsupported.push("data_dir");
2829            }
2830            if afpay_rpc.is_some() {
2831                unsupported.push("afpay_rpc");
2832            }
2833            if providers.is_some() {
2834                unsupported.push("providers");
2835            }
2836            if exchange_rate.is_some() {
2837                unsupported.push("exchange_rate");
2838            }
2839            if !unsupported.is_empty() {
2840                let err = PayError::NotImplemented(format!(
2841                    "runtime config only supports 'log' and 'limits'; unsupported fields: {}",
2842                    unsupported.join(", ")
2843                ));
2844                emit_error(&app.writer, None, &err, start).await;
2845                return;
2846            }
2847
2848            if let Some(ref v) = limits {
2849                if !app.enforce_limits {
2850                    let err = PayError::NotImplemented(
2851                        "config.limits is unavailable when limits are not enforced locally; configure limits on the RPC daemon"
2852                            .to_string(),
2853                    );
2854                    emit_error(&app.writer, None, &err, start).await;
2855                    return;
2856                }
2857                if let Err(e) = app.spend_ledger.set_limits(v).await {
2858                    emit_error(&app.writer, None, &e, start).await;
2859                    return;
2860                }
2861            }
2862
2863            let mut cfg = app.config.write().await;
2864            if let Some(v) = limits {
2865                cfg.limits = v;
2866            }
2867            if let Some(v) = log {
2868                cfg.log = agent_first_data::cli_parse_log_filters(&v);
2869            }
2870            let _ = app.writer.send(Output::Config(cfg.clone())).await;
2871        }
2872
2873        Input::Version => {
2874            let _ = app
2875                .writer
2876                .send(Output::Version {
2877                    version: crate::config::VERSION.to_string(),
2878                    trace: PongTrace {
2879                        uptime_s: app.start_time.elapsed().as_secs(),
2880                        requests_total: app.requests_total.load(Ordering::Relaxed),
2881                        in_flight: app.in_flight.lock().await.len(),
2882                    },
2883                })
2884                .await;
2885        }
2886
2887        Input::Close => {
2888            // Handled in main loop
2889        }
2890    }
2891
2892    emit_migration_log(app).await;
2893}
2894
2895// ═══════════════════════════════════════════
2896// Helpers
2897// ═══════════════════════════════════════════
2898
2899fn get_provider(
2900    providers: &HashMap<Network, Box<dyn PayProvider>>,
2901    network: Network,
2902) -> Option<&dyn PayProvider> {
2903    providers.get(&network).map(|p| p.as_ref())
2904}
2905
2906fn looks_like_bip39_mnemonic(secret: &str) -> bool {
2907    let words = secret.split_whitespace().count();
2908    words == 12 || words == 24
2909}
2910
2911async fn emit_error(
2912    writer: &mpsc::Sender<Output>,
2913    id: Option<String>,
2914    err: &PayError,
2915    start: Instant,
2916) {
2917    emit_error_hint(writer, id, err, start, None).await;
2918}
2919
2920/// Like [`emit_error`] but with an optional hint override.
2921/// When `hint_override` is `Some`, it takes precedence over `PayError::hint()`.
2922async fn emit_error_hint(
2923    writer: &mpsc::Sender<Output>,
2924    id: Option<String>,
2925    err: &PayError,
2926    start: Instant,
2927    hint_override: Option<&str>,
2928) {
2929    let _ = writer
2930        .send(Output::Error {
2931            id,
2932            error_code: err.error_code().to_string(),
2933            error: err.to_string(),
2934            hint: hint_override.map(|h| h.to_string()).or_else(|| err.hint()),
2935            retryable: err.retryable(),
2936            trace: trace_from(start),
2937        })
2938        .await;
2939}
2940
2941fn extract_id(input: &Input) -> Option<String> {
2942    match input {
2943        Input::WalletCreate { id, .. }
2944        | Input::LnWalletCreate { id, .. }
2945        | Input::WalletClose { id, .. }
2946        | Input::WalletList { id, .. }
2947        | Input::Balance { id, .. }
2948        | Input::Receive { id, .. }
2949        | Input::ReceiveClaim { id, .. }
2950        | Input::CashuSend { id, .. }
2951        | Input::CashuReceive { id, .. }
2952        | Input::Send { id, .. }
2953        | Input::Restore { id, .. }
2954        | Input::WalletShowSeed { id, .. }
2955        | Input::HistoryList { id, .. }
2956        | Input::HistoryStatus { id, .. }
2957        | Input::HistoryUpdate { id, .. }
2958        | Input::LimitAdd { id, .. }
2959        | Input::LimitRemove { id, .. }
2960        | Input::LimitList { id, .. }
2961        | Input::LimitSet { id, .. }
2962        | Input::WalletConfigShow { id, .. }
2963        | Input::WalletConfigSet { id, .. }
2964        | Input::WalletConfigTokenAdd { id, .. }
2965        | Input::WalletConfigTokenRemove { id, .. } => Some(id.clone()),
2966        Input::Config(_) | Input::Version | Input::Close => None,
2967    }
2968}
2969
2970fn trace_from(start: Instant) -> Trace {
2971    Trace::from_duration(start.elapsed().as_millis() as u64)
2972}
2973
2974/// Query limits from each unique downstream afpay_rpc node.
2975async fn query_downstream_limits(config: &RuntimeConfig) -> Vec<DownstreamLimitNode> {
2976    let mut result = Vec::new();
2977    let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
2978    for (name, rpc_cfg) in &config.afpay_rpc {
2979        if !seen.insert(rpc_cfg.endpoint.clone()) {
2980            continue;
2981        }
2982        let secret = rpc_cfg.endpoint_secret.as_deref().unwrap_or("");
2983        let limit_input = Input::LimitList {
2984            id: format!("downstream_{name}"),
2985        };
2986        let outputs =
2987            crate::provider::remote::rpc_call(&rpc_cfg.endpoint, secret, &limit_input).await;
2988        let mut node = DownstreamLimitNode {
2989            name: name.clone(),
2990            endpoint: rpc_cfg.endpoint.clone(),
2991            limits: vec![],
2992            error: None,
2993            downstream: vec![],
2994        };
2995        for value in &outputs {
2996            if value.get("code").and_then(|v| v.as_str()) == Some("error") {
2997                node.error = value
2998                    .get("error")
2999                    .and_then(|v| v.as_str())
3000                    .map(|s| s.to_string());
3001            }
3002            if value.get("code").and_then(|v| v.as_str()) == Some("limit_status") {
3003                if let Some(limits) = value.get("limits") {
3004                    node.limits = serde_json::from_value(limits.clone()).unwrap_or_default();
3005                }
3006                if let Some(ds) = value.get("downstream") {
3007                    node.downstream = serde_json::from_value(ds.clone()).unwrap_or_default();
3008                }
3009            }
3010        }
3011        result.push(node);
3012    }
3013    result
3014}
3015
3016/// Extract `token=<value>` from a transfer target URI query string.
3017fn extract_token_from_target(to: &str) -> Option<String> {
3018    let query = to.split('?').nth(1)?;
3019    for part in query.split('&') {
3020        if let Some(val) = part.strip_prefix("token=") {
3021            if !val.is_empty() {
3022                return Some(val.to_string());
3023            }
3024        }
3025    }
3026    None
3027}
3028
3029fn wallet_provider_key(meta: &wallet::WalletMetadata) -> String {
3030    match meta.network {
3031        Network::Ln => meta
3032            .backend
3033            .as_deref()
3034            .map(|b| format!("ln-{}", b.to_ascii_lowercase()))
3035            .unwrap_or_else(|| "ln".to_string()),
3036        _ => meta.network.to_string(),
3037    }
3038}
3039
3040fn wallet_summary_from_meta(meta: &wallet::WalletMetadata, wallet_id: &str) -> WalletSummary {
3041    let (address, backend) = match meta.network {
3042        Network::Cashu => (meta.mint_url.clone().unwrap_or_default(), None),
3043        Network::Ln => {
3044            let b = meta
3045                .backend
3046                .clone()
3047                .unwrap_or_else(|| "unknown".to_string());
3048            (format!("ln:{b}"), Some(b))
3049        }
3050        _ => (wallet_id.to_string(), None),
3051    };
3052    WalletSummary {
3053        id: meta.id.clone(),
3054        network: meta.network,
3055        label: meta.label.clone(),
3056        address,
3057        backend,
3058        mint_url: meta.mint_url.clone(),
3059        rpc_endpoints: meta
3060            .sol_rpc_endpoints
3061            .clone()
3062            .or(meta.evm_rpc_endpoints.clone()),
3063        chain_id: meta.evm_chain_id,
3064        created_at_epoch_s: meta.created_at_epoch_s,
3065    }
3066}
3067
3068async fn resolve_wallet_summary(app: &App, wallet_id: &str) -> WalletSummary {
3069    if let Ok(meta) = require_store(app).and_then(|s| s.load_wallet_metadata(wallet_id)) {
3070        return wallet_summary_from_meta(&meta, wallet_id);
3071    }
3072    if let Ok(wallets) = collect_all!(&app.providers, |p| p.list_wallets()) {
3073        if let Some(summary) = wallets.into_iter().find(|w| w.id == wallet_id) {
3074            return summary;
3075        }
3076    }
3077    WalletSummary {
3078        id: wallet_id.to_string(),
3079        network: Network::Ln,
3080        label: None,
3081        address: String::new(),
3082        backend: None,
3083        mint_url: None,
3084        rpc_endpoints: None,
3085        chain_id: None,
3086        created_at_epoch_s: 0,
3087    }
3088}
3089
3090fn log_enabled(log: &[String], event: &str) -> bool {
3091    if log.is_empty() {
3092        return false;
3093    }
3094    let ev = event.to_ascii_lowercase();
3095    log.iter()
3096        .any(|f| f == "*" || f == "all" || ev.starts_with(f.as_str()))
3097}
3098
3099async fn emit_migration_log(app: &App) {
3100    let entries = app
3101        .store
3102        .as_ref()
3103        .map(|s| s.drain_migration_log())
3104        .unwrap_or_default();
3105    if entries.is_empty() {
3106        return;
3107    }
3108    for entry in entries {
3109        emit_log(
3110            app,
3111            "schema_migration",
3112            None,
3113            serde_json::json!({
3114                "database": entry.database,
3115                "from_version": entry.from_version,
3116                "to_version": entry.to_version,
3117            }),
3118        )
3119        .await;
3120    }
3121}
3122
3123async fn emit_log(app: &App, event: &str, request_id: Option<String>, args: serde_json::Value) {
3124    let log = app.config.read().await.log.clone();
3125    if !log_enabled(&log, event) {
3126        return;
3127    }
3128    let _ = app
3129        .writer
3130        .send(Output::Log {
3131            event: event.to_string(),
3132            request_id,
3133            version: None,
3134            argv: None,
3135            config: None,
3136            args: Some(args),
3137            env: None,
3138            trace: Trace::from_duration(0),
3139        })
3140        .await;
3141}