Skip to main content

agent_first_pay/handler/
mod.rs

1#[macro_use]
2mod helpers;
3mod history;
4mod limit;
5mod pay;
6mod wallet;
7
8#[cfg(any(
9    feature = "btc-esplora",
10    feature = "btc-core",
11    feature = "btc-electrum"
12))]
13use crate::provider::btc::BtcProvider;
14#[cfg(feature = "cashu")]
15use crate::provider::cashu::CashuProvider;
16#[cfg(feature = "evm")]
17use crate::provider::evm::EvmProvider;
18#[cfg(any(feature = "ln-nwc", feature = "ln-phoenixd", feature = "ln-lnbits"))]
19use crate::provider::ln::LnProvider;
20use crate::provider::remote::RemoteProvider;
21#[cfg(feature = "sol")]
22use crate::provider::sol::SolProvider;
23use crate::provider::{PayError, PayProvider, StubProvider};
24use crate::spend::SpendLedger;
25use crate::store::StorageBackend;
26use crate::types::*;
27use std::collections::HashMap;
28use std::sync::atomic::{AtomicU64, Ordering};
29use std::sync::Arc;
30use std::time::Instant;
31use tokio::sync::{mpsc, Mutex, RwLock};
32use tokio::task::JoinHandle;
33
34use helpers::*;
35
36pub struct App {
37    pub config: RwLock<RuntimeConfig>,
38    pub providers: HashMap<Network, Box<dyn PayProvider>>,
39    pub writer: mpsc::Sender<Output>,
40    pub in_flight: Mutex<HashMap<String, JoinHandle<()>>>,
41    pub requests_total: AtomicU64,
42    pub start_time: Instant,
43    /// True if any provider uses local data (needs data-dir lock for writes).
44    #[cfg(feature = "redb")]
45    pub has_local_providers: bool,
46    /// Whether this node enforces spend limits.
47    /// RPC mode: always true. CLI/pipe with all remote: false. CLI/pipe with any local: true.
48    pub enforce_limits: bool,
49    pub spend_ledger: SpendLedger,
50    /// Storage backend for wallet metadata and transaction history.
51    /// None when running in frontend-only mode (no local DB, only remote RPC).
52    pub store: Option<Arc<StorageBackend>>,
53}
54
55impl App {
56    /// Create a new App. If `enforce_limits_override` is Some, use that value;
57    /// otherwise auto-detect: enforce if any provider writes locally.
58    pub fn new(
59        config: RuntimeConfig,
60        writer: mpsc::Sender<Output>,
61        enforce_limits_override: Option<bool>,
62        store: Option<StorageBackend>,
63    ) -> Self {
64        let store = store.map(Arc::new);
65        let mut providers: HashMap<Network, Box<dyn PayProvider>> = HashMap::new();
66
67        for network in &[
68            Network::Ln,
69            Network::Sol,
70            Network::Evm,
71            Network::Cashu,
72            Network::Btc,
73        ] {
74            let key = network.to_string();
75            if let Some(rpc_name) = config.providers.get(&key) {
76                // Look up the afpay_rpc node by name
77                if let Some(rpc_cfg) = config.afpay_rpc.get(rpc_name) {
78                    let secret = rpc_cfg.endpoint_secret.as_deref().unwrap_or("");
79                    providers.insert(
80                        *network,
81                        Box::new(RemoteProvider::new(&rpc_cfg.endpoint, secret, *network)),
82                    );
83                } else {
84                    // Unknown afpay_rpc name — insert stub so errors surface at runtime
85                    providers.insert(*network, Box::new(StubProvider::new(*network)));
86                }
87            } else {
88                #[allow(unreachable_patterns)]
89                match network {
90                    #[cfg(feature = "cashu")]
91                    Network::Cashu => {
92                        if let Some(s) = &store {
93                            let pg_url = config
94                                .postgres_url_secret
95                                .clone()
96                                .filter(|_| config.storage_backend.as_deref() == Some("postgres"));
97                            providers.insert(
98                                *network,
99                                Box::new(CashuProvider::new(&config.data_dir, pg_url, s.clone())),
100                            );
101                        } else {
102                            providers.insert(*network, Box::new(StubProvider::new(*network)));
103                        }
104                    }
105                    #[cfg(any(feature = "ln-nwc", feature = "ln-phoenixd", feature = "ln-lnbits"))]
106                    Network::Ln => {
107                        if let Some(s) = &store {
108                            providers.insert(
109                                *network,
110                                Box::new(LnProvider::new(&config.data_dir, s.clone())),
111                            );
112                        } else {
113                            providers.insert(*network, Box::new(StubProvider::new(*network)));
114                        }
115                    }
116                    #[cfg(feature = "sol")]
117                    Network::Sol => {
118                        if let Some(s) = &store {
119                            providers.insert(
120                                *network,
121                                Box::new(SolProvider::new(&config.data_dir, s.clone())),
122                            );
123                        } else {
124                            providers.insert(*network, Box::new(StubProvider::new(*network)));
125                        }
126                    }
127                    #[cfg(feature = "evm")]
128                    Network::Evm => {
129                        if let Some(s) = &store {
130                            providers.insert(
131                                *network,
132                                Box::new(EvmProvider::new(&config.data_dir, s.clone())),
133                            );
134                        } else {
135                            providers.insert(*network, Box::new(StubProvider::new(*network)));
136                        }
137                    }
138                    #[cfg(any(
139                        feature = "btc-esplora",
140                        feature = "btc-core",
141                        feature = "btc-electrum"
142                    ))]
143                    Network::Btc => {
144                        if let Some(s) = &store {
145                            providers.insert(
146                                *network,
147                                Box::new(BtcProvider::new(&config.data_dir, s.clone())),
148                            );
149                        } else {
150                            providers.insert(*network, Box::new(StubProvider::new(*network)));
151                        }
152                    }
153                    _ => {
154                        providers.insert(*network, Box::new(StubProvider::new(*network)));
155                    }
156                }
157            }
158        }
159
160        let has_local = providers.values().any(|p| p.writes_locally());
161        let spend_ledger = match store.as_deref() {
162            #[cfg(feature = "postgres")]
163            Some(StorageBackend::Postgres(pg)) => {
164                SpendLedger::new_postgres(pg.pool().clone(), config.exchange_rate.clone())
165            }
166            _ => SpendLedger::new(&config.data_dir, config.exchange_rate.clone()),
167        };
168        Self {
169            config: RwLock::new(config),
170            providers,
171            writer,
172            in_flight: Mutex::new(HashMap::new()),
173            requests_total: AtomicU64::new(0),
174            start_time: Instant::now(),
175            #[cfg(feature = "redb")]
176            has_local_providers: has_local,
177            enforce_limits: enforce_limits_override.unwrap_or(has_local),
178            spend_ledger,
179            store,
180        }
181    }
182}
183
184/// Unified startup validation for long-lived modes.
185/// Pings all configured remote afpay_rpc nodes (deduplicated) and validates provider mappings.
186pub async fn startup_provider_validation_errors(config: &RuntimeConfig) -> Vec<Output> {
187    let mut errors = Vec::new();
188
189    // Validate that all provider values reference known afpay_rpc names
190    for (network, rpc_name) in &config.providers {
191        if !config.afpay_rpc.contains_key(rpc_name) {
192            errors.push(Output::Error {
193                id: None,
194                error_code: "invalid_config".to_string(),
195                error: format!(
196                    "providers.{network} references unknown afpay_rpc node '{rpc_name}'"
197                ),
198                hint: Some(format!(
199                    "add [afpay_rpc.{rpc_name}] with endpoint and endpoint_secret to config.toml"
200                )),
201                retryable: false,
202                trace: Trace::from_duration(0),
203            });
204        }
205    }
206    if !errors.is_empty() {
207        return errors;
208    }
209
210    // Ping each unique afpay_rpc endpoint once
211    let mut pinged: std::collections::HashSet<String> = std::collections::HashSet::new();
212    for (rpc_name, rpc_cfg) in &config.afpay_rpc {
213        if !pinged.insert(rpc_cfg.endpoint.clone()) {
214            continue;
215        }
216        // Find any network that maps to this rpc_name (for the RemoteProvider constructor)
217        let network = config
218            .providers
219            .iter()
220            .find(|(_, name)| *name == rpc_name)
221            .and_then(|(k, _)| k.parse::<Network>().ok())
222            .unwrap_or(Network::Cashu);
223        let secret = rpc_cfg.endpoint_secret.as_deref().unwrap_or("");
224        let provider = RemoteProvider::new(&rpc_cfg.endpoint, secret, network);
225        if let Err(err) = provider.ping().await {
226            errors.push(Output::Error {
227                id: None,
228                error_code: "provider_unreachable".to_string(),
229                error: format!("afpay_rpc.{rpc_name} ({}): {err}", rpc_cfg.endpoint),
230                hint: Some("check endpoint address and that the daemon is running".to_string()),
231                retryable: true,
232                trace: Trace::from_duration(0),
233            });
234        }
235    }
236    errors
237}
238
239pub async fn dispatch(app: &App, input: Input) {
240    // Acquire per-operation file lock for redb write operations.
241    // Postgres handles its own concurrency; no file lock needed.
242    #[cfg(feature = "redb")]
243    let _lock = if app.has_local_providers
244        && needs_write_lock(&input)
245        && matches!(app.store.as_deref(), Some(StorageBackend::Redb(..)) | None)
246    {
247        match acquire_write_lock(app).await {
248            Ok(guard) => Some(guard),
249            Err(e) => {
250                let id = extract_id(&input);
251                emit_error(&app.writer, id, &e, Instant::now()).await;
252                return;
253            }
254        }
255    } else {
256        None
257    };
258
259    // Resolve wallet labels → wallet IDs before dispatch
260    let mut input = input;
261    if let Some(store) = &app.store {
262        if let Err(e) = resolve_wallet_labels(&mut input, store.as_ref()) {
263            let id = extract_id(&input);
264            emit_error(&app.writer, id, &e, Instant::now()).await;
265            return;
266        }
267    }
268
269    match &input {
270        // Wallet operations
271        Input::WalletCreate { .. }
272        | Input::LnWalletCreate { .. }
273        | Input::WalletClose { .. }
274        | Input::WalletList { .. }
275        | Input::Balance { .. }
276        | Input::Restore { .. }
277        | Input::WalletShowSeed { .. }
278        | Input::WalletConfigShow { .. }
279        | Input::WalletConfigSet { .. }
280        | Input::WalletConfigTokenAdd { .. }
281        | Input::WalletConfigTokenRemove { .. } => {
282            wallet::dispatch_wallet(app, input).await;
283            emit_migration_log(app).await;
284            return;
285        }
286
287        // Pay / send / receive operations
288        Input::Receive { .. }
289        | Input::ReceiveClaim { .. }
290        | Input::CashuSend { .. }
291        | Input::CashuReceive { .. }
292        | Input::Send { .. } => {
293            pay::dispatch_pay(app, input).await;
294            emit_migration_log(app).await;
295            return;
296        }
297
298        // History operations
299        Input::HistoryList { .. } | Input::HistoryStatus { .. } | Input::HistoryUpdate { .. } => {
300            history::dispatch_history(app, input).await;
301            emit_migration_log(app).await;
302            return;
303        }
304
305        // Limit operations
306        Input::LimitAdd { .. }
307        | Input::LimitRemove { .. }
308        | Input::LimitList { .. }
309        | Input::LimitSet { .. } => {
310            limit::dispatch_limit(app, input).await;
311            emit_migration_log(app).await;
312            return;
313        }
314
315        // Inline handlers (small enough to keep in mod.rs)
316        Input::Config(_) | Input::Version | Input::Close => {}
317    }
318
319    // Inline handlers for Config, Version, Close
320    match input {
321        Input::Config(patch) => {
322            let start = Instant::now();
323            let ConfigPatch {
324                data_dir,
325                limits,
326                log,
327                exchange_rate,
328                afpay_rpc,
329                providers,
330            } = patch;
331
332            let mut unsupported = Vec::new();
333            if data_dir.is_some() {
334                unsupported.push("data_dir");
335            }
336            if afpay_rpc.is_some() {
337                unsupported.push("afpay_rpc");
338            }
339            if providers.is_some() {
340                unsupported.push("providers");
341            }
342            if exchange_rate.is_some() {
343                unsupported.push("exchange_rate");
344            }
345            if !unsupported.is_empty() {
346                let err = PayError::NotImplemented(format!(
347                    "runtime config only supports 'log' and 'limits'; unsupported fields: {}",
348                    unsupported.join(", ")
349                ));
350                emit_error(&app.writer, None, &err, start).await;
351                return;
352            }
353
354            if let Some(ref v) = limits {
355                if !app.enforce_limits {
356                    let err = PayError::NotImplemented(
357                        "config.limits is unavailable when limits are not enforced locally; configure limits on the RPC daemon"
358                            .to_string(),
359                    );
360                    emit_error(&app.writer, None, &err, start).await;
361                    return;
362                }
363                if let Err(e) = app.spend_ledger.set_limits(v).await {
364                    emit_error(&app.writer, None, &e, start).await;
365                    return;
366                }
367            }
368
369            let mut cfg = app.config.write().await;
370            if let Some(v) = limits {
371                cfg.limits = v;
372            }
373            if let Some(v) = log {
374                cfg.log = agent_first_data::cli_parse_log_filters(&v);
375            }
376            let _ = app.writer.send(Output::Config(cfg.clone())).await;
377        }
378
379        Input::Version => {
380            let _ = app
381                .writer
382                .send(Output::Version {
383                    version: crate::config::VERSION.to_string(),
384                    trace: PongTrace {
385                        uptime_s: app.start_time.elapsed().as_secs(),
386                        requests_total: app.requests_total.load(Ordering::Relaxed),
387                        in_flight: app.in_flight.lock().await.len(),
388                    },
389                })
390                .await;
391        }
392
393        Input::Close => {
394            // Handled in main loop
395        }
396
397        _ => unreachable!(),
398    }
399
400    emit_migration_log(app).await;
401}