Skip to main content

agent_first_pay/handler/
mod.rs

1#[macro_use]
2mod helpers;
3mod history;
4mod limit;
5mod pay;
6mod receive_watch;
7mod spend_guard;
8mod wallet;
9
10#[cfg(any(
11    feature = "btc-esplora",
12    feature = "btc-core",
13    feature = "btc-electrum"
14))]
15use crate::provider::btc::BtcProvider;
16#[cfg(feature = "cashu")]
17use crate::provider::cashu::CashuProvider;
18#[cfg(feature = "evm")]
19use crate::provider::evm::EvmProvider;
20#[cfg(any(feature = "ln-nwc", feature = "ln-phoenixd", feature = "ln-lnbits"))]
21use crate::provider::ln::LnProvider;
22#[cfg(feature = "rpc")]
23use crate::provider::remote::RemoteProvider;
24#[cfg(feature = "sol")]
25use crate::provider::sol::SolProvider;
26use crate::provider::{PayError, PayProvider, StubProvider};
27use crate::spend::SpendLedger;
28use crate::store::StorageBackend;
29use crate::types::*;
30use std::collections::HashMap;
31use std::sync::atomic::{AtomicU64, Ordering};
32use std::sync::Arc;
33use std::time::Instant;
34use tokio::sync::{mpsc, Mutex, RwLock};
35use tokio::task::JoinHandle;
36
37use helpers::*;
38
39pub struct App {
40    pub config: RwLock<RuntimeConfig>,
41    pub providers: HashMap<Network, Box<dyn PayProvider>>,
42    pub writer: mpsc::Sender<Output>,
43    pub in_flight: Mutex<HashMap<String, JoinHandle<()>>>,
44    pub requests_total: AtomicU64,
45    pub start_time: Instant,
46    /// True if any provider uses local data (needs data-dir lock for writes).
47    #[cfg(feature = "redb")]
48    pub has_local_providers: bool,
49    /// Whether this node enforces spend limits.
50    /// RPC mode: always true. CLI/pipe with all remote: false. CLI/pipe with any local: true.
51    pub enforce_limits: bool,
52    pub spend_ledger: SpendLedger,
53    /// Storage backend for wallet metadata and transaction history.
54    /// None when running in frontend-only mode (no local DB, only remote RPC).
55    pub store: Option<Arc<StorageBackend>>,
56}
57
58impl App {
59    /// Create a new App. If `enforce_limits_override` is Some, use that value;
60    /// otherwise auto-detect: enforce if any provider writes locally.
61    pub fn new(
62        config: RuntimeConfig,
63        writer: mpsc::Sender<Output>,
64        enforce_limits_override: Option<bool>,
65        store: Option<StorageBackend>,
66    ) -> Self {
67        let store = store.map(Arc::new);
68        let mut providers: HashMap<Network, Box<dyn PayProvider>> = HashMap::new();
69
70        for network in &[
71            Network::Ln,
72            Network::Sol,
73            Network::Evm,
74            Network::Cashu,
75            Network::Btc,
76        ] {
77            let key = network.to_string();
78            if let Some(rpc_name) = config.providers.get(&key) {
79                // Look up the afpay_rpc node by name
80                if let Some(rpc_cfg) = config.afpay_rpc.get(rpc_name) {
81                    #[cfg(feature = "rpc")]
82                    {
83                        let secret = rpc_cfg.endpoint_secret.as_deref().unwrap_or("");
84                        providers.insert(
85                            *network,
86                            Box::new(RemoteProvider::new(&rpc_cfg.endpoint, secret, *network)),
87                        );
88                    }
89                    #[cfg(not(feature = "rpc"))]
90                    {
91                        let _ = rpc_cfg;
92                        providers.insert(*network, Box::new(StubProvider::new(*network)));
93                    }
94                } else {
95                    // Unknown afpay_rpc name — insert stub so errors surface at runtime
96                    providers.insert(*network, Box::new(StubProvider::new(*network)));
97                }
98            } else {
99                #[allow(unreachable_patterns)]
100                match network {
101                    #[cfg(feature = "cashu")]
102                    Network::Cashu => {
103                        if let Some(s) = &store {
104                            let pg_url = config
105                                .postgres_url_secret
106                                .clone()
107                                .filter(|_| config.storage_backend.as_deref() == Some("postgres"));
108                            providers.insert(
109                                *network,
110                                Box::new(CashuProvider::new(&config.data_dir, pg_url, s.clone())),
111                            );
112                        } else {
113                            providers.insert(*network, Box::new(StubProvider::new(*network)));
114                        }
115                    }
116                    #[cfg(any(feature = "ln-nwc", feature = "ln-phoenixd", feature = "ln-lnbits"))]
117                    Network::Ln => {
118                        if let Some(s) = &store {
119                            providers.insert(
120                                *network,
121                                Box::new(LnProvider::new(&config.data_dir, s.clone())),
122                            );
123                        } else {
124                            providers.insert(*network, Box::new(StubProvider::new(*network)));
125                        }
126                    }
127                    #[cfg(feature = "sol")]
128                    Network::Sol => {
129                        if let Some(s) = &store {
130                            providers.insert(
131                                *network,
132                                Box::new(SolProvider::new(&config.data_dir, s.clone())),
133                            );
134                        } else {
135                            providers.insert(*network, Box::new(StubProvider::new(*network)));
136                        }
137                    }
138                    #[cfg(feature = "evm")]
139                    Network::Evm => {
140                        if let Some(s) = &store {
141                            providers.insert(
142                                *network,
143                                Box::new(EvmProvider::new(&config.data_dir, s.clone())),
144                            );
145                        } else {
146                            providers.insert(*network, Box::new(StubProvider::new(*network)));
147                        }
148                    }
149                    #[cfg(any(
150                        feature = "btc-esplora",
151                        feature = "btc-core",
152                        feature = "btc-electrum"
153                    ))]
154                    Network::Btc => {
155                        if let Some(s) = &store {
156                            providers.insert(
157                                *network,
158                                Box::new(BtcProvider::new(&config.data_dir, s.clone())),
159                            );
160                        } else {
161                            providers.insert(*network, Box::new(StubProvider::new(*network)));
162                        }
163                    }
164                    _ => {
165                        providers.insert(*network, Box::new(StubProvider::new(*network)));
166                    }
167                }
168            }
169        }
170
171        let has_local = providers.values().any(|p| p.writes_locally());
172        let spend_ledger = match store.as_deref() {
173            #[cfg(feature = "postgres")]
174            Some(StorageBackend::Postgres(pg)) => {
175                SpendLedger::new_postgres(pg.pool().clone(), config.exchange_rate.clone())
176            }
177            _ => SpendLedger::new(&config.data_dir, config.exchange_rate.clone()),
178        };
179        Self {
180            config: RwLock::new(config),
181            providers,
182            writer,
183            in_flight: Mutex::new(HashMap::new()),
184            requests_total: AtomicU64::new(0),
185            start_time: Instant::now(),
186            #[cfg(feature = "redb")]
187            has_local_providers: has_local,
188            enforce_limits: enforce_limits_override.unwrap_or(has_local),
189            spend_ledger,
190            store,
191        }
192    }
193}
194
195/// Unified startup validation for long-lived modes.
196/// Pings all configured remote afpay_rpc nodes (deduplicated) and validates provider mappings.
197pub async fn startup_provider_validation_errors(config: &RuntimeConfig) -> Vec<Output> {
198    let mut errors = Vec::new();
199
200    // Validate that all provider values reference known afpay_rpc names
201    for (network, rpc_name) in &config.providers {
202        if !config.afpay_rpc.contains_key(rpc_name) {
203            errors.push(Output::Error {
204                id: None,
205                error_code: "invalid_config".to_string(),
206                error: format!(
207                    "providers.{network} references unknown afpay_rpc node '{rpc_name}'"
208                ),
209                hint: Some(format!(
210                    "add [afpay_rpc.{rpc_name}] with endpoint and endpoint_secret to config.toml"
211                )),
212                retryable: false,
213                trace: Trace::from_duration(0),
214            });
215        }
216    }
217    if !errors.is_empty() {
218        return errors;
219    }
220
221    // Ping each unique afpay_rpc endpoint once
222    #[cfg(feature = "rpc")]
223    {
224        let mut pinged: std::collections::HashSet<String> = std::collections::HashSet::new();
225        for (rpc_name, rpc_cfg) in &config.afpay_rpc {
226            if !pinged.insert(rpc_cfg.endpoint.clone()) {
227                continue;
228            }
229            // Find any network that maps to this rpc_name (for the RemoteProvider constructor)
230            let network = config
231                .providers
232                .iter()
233                .find(|(_, name)| *name == rpc_name)
234                .and_then(|(k, _)| k.parse::<Network>().ok())
235                .unwrap_or(Network::Cashu);
236            let secret = rpc_cfg.endpoint_secret.as_deref().unwrap_or("");
237            let provider = RemoteProvider::new(&rpc_cfg.endpoint, secret, network);
238            if let Err(err) = provider.ping().await {
239                errors.push(Output::Error {
240                    id: None,
241                    error_code: "provider_unreachable".to_string(),
242                    error: format!("afpay_rpc.{rpc_name} ({}): {err}", rpc_cfg.endpoint),
243                    hint: Some("check endpoint address and that the daemon is running".to_string()),
244                    retryable: true,
245                    trace: Trace::from_duration(0),
246                });
247            }
248        }
249    }
250    errors
251}
252
253pub async fn dispatch(app: &App, input: Input) {
254    // Acquire per-operation file lock for redb write operations.
255    // Postgres handles its own concurrency; no file lock needed.
256    #[cfg(feature = "redb")]
257    let _lock = if app.has_local_providers
258        && needs_write_lock(&input)
259        && matches!(app.store.as_deref(), Some(StorageBackend::Redb(..)) | None)
260    {
261        match acquire_write_lock(app).await {
262            Ok(guard) => Some(guard),
263            Err(e) => {
264                let id = extract_id(&input);
265                emit_error(&app.writer, id, &e, Instant::now()).await;
266                return;
267            }
268        }
269    } else {
270        None
271    };
272
273    // Resolve wallet labels → wallet IDs before dispatch
274    let mut input = input;
275    if let Some(store) = &app.store {
276        if let Err(e) = resolve_wallet_labels(&mut input, store.as_ref()) {
277            let id = extract_id(&input);
278            emit_error(&app.writer, id, &e, Instant::now()).await;
279            return;
280        }
281    }
282
283    match &input {
284        // Wallet operations
285        Input::WalletCreate { .. }
286        | Input::LnWalletCreate { .. }
287        | Input::WalletClose { .. }
288        | Input::WalletList { .. }
289        | Input::Balance { .. }
290        | Input::Restore { .. }
291        | Input::WalletShowSeed { .. }
292        | Input::WalletConfigShow { .. }
293        | Input::WalletConfigSet { .. }
294        | Input::WalletConfigTokenAdd { .. }
295        | Input::WalletConfigTokenRemove { .. } => {
296            wallet::dispatch_wallet(app, input).await;
297            emit_migration_log(app).await;
298            return;
299        }
300
301        // Pay / send / receive operations
302        Input::Receive { .. }
303        | Input::ReceiveClaim { .. }
304        | Input::CashuSend { .. }
305        | Input::CashuReceive { .. }
306        | Input::Send { .. } => {
307            pay::dispatch_pay(app, input).await;
308            emit_migration_log(app).await;
309            return;
310        }
311
312        // History operations
313        Input::HistoryList { .. } | Input::HistoryStatus { .. } | Input::HistoryUpdate { .. } => {
314            history::dispatch_history(app, input).await;
315            emit_migration_log(app).await;
316            return;
317        }
318
319        // Limit operations
320        Input::LimitAdd { .. }
321        | Input::LimitRemove { .. }
322        | Input::LimitList { .. }
323        | Input::LimitSet { .. } => {
324            limit::dispatch_limit(app, input).await;
325            emit_migration_log(app).await;
326            return;
327        }
328
329        // Inline handlers (small enough to keep in mod.rs)
330        Input::Config(_) | Input::ConfigShow { .. } | Input::Version | Input::Close => {}
331    }
332
333    // Inline handlers for Config, ConfigShow, Version, Close
334    match input {
335        Input::ConfigShow { .. } => {
336            let cfg = app.config.read().await;
337            let _ = app.writer.send(Output::Config(cfg.clone())).await;
338        }
339        Input::Config(patch) => {
340            let start = Instant::now();
341            let ConfigPatch {
342                data_dir,
343                log,
344                exchange_rate,
345                afpay_rpc,
346                providers,
347            } = patch;
348
349            let mut unsupported = Vec::new();
350            if data_dir.is_some() {
351                unsupported.push("data_dir");
352            }
353            if afpay_rpc.is_some() {
354                unsupported.push("afpay_rpc");
355            }
356            if providers.is_some() {
357                unsupported.push("providers");
358            }
359            if exchange_rate.is_some() {
360                unsupported.push("exchange_rate");
361            }
362            if !unsupported.is_empty() {
363                let err = PayError::NotImplemented(format!(
364                    "runtime config only supports 'log'; unsupported fields: {}",
365                    unsupported.join(", ")
366                ));
367                emit_error(&app.writer, None, &err, start).await;
368                return;
369            }
370
371            let mut cfg = app.config.write().await;
372            if let Some(v) = log {
373                cfg.log = agent_first_data::cli_parse_log_filters(&v);
374            }
375            let _ = app.writer.send(Output::Config(cfg.clone())).await;
376        }
377
378        Input::Version => {
379            let _ = app
380                .writer
381                .send(Output::Version {
382                    version: crate::config::VERSION.to_string(),
383                    protocol_version: JSON_PROTOCOL_VERSION,
384                    trace: PongTrace {
385                        uptime_s: app.start_time.elapsed().as_secs(),
386                        requests_total: app.requests_total.load(Ordering::Relaxed),
387                        in_flight: app.in_flight.lock().await.len(),
388                    },
389                })
390                .await;
391        }
392
393        Input::Close => {
394            // Handled in main loop
395        }
396
397        _ => unreachable!(),
398    }
399
400    emit_migration_log(app).await;
401}