1#[macro_use]
2mod helpers;
3mod history;
4mod limit;
5mod pay;
6mod wallet;
7
8#[cfg(any(
9 feature = "btc-esplora",
10 feature = "btc-core",
11 feature = "btc-electrum"
12))]
13use crate::provider::btc::BtcProvider;
14#[cfg(feature = "cashu")]
15use crate::provider::cashu::CashuProvider;
16#[cfg(feature = "evm")]
17use crate::provider::evm::EvmProvider;
18#[cfg(any(feature = "ln-nwc", feature = "ln-phoenixd", feature = "ln-lnbits"))]
19use crate::provider::ln::LnProvider;
20use crate::provider::remote::RemoteProvider;
21#[cfg(feature = "sol")]
22use crate::provider::sol::SolProvider;
23use crate::provider::{PayError, PayProvider, StubProvider};
24use crate::spend::SpendLedger;
25use crate::store::StorageBackend;
26use crate::types::*;
27use std::collections::HashMap;
28use std::sync::atomic::{AtomicU64, Ordering};
29use std::sync::Arc;
30use std::time::Instant;
31use tokio::sync::{mpsc, Mutex, RwLock};
32use tokio::task::JoinHandle;
33
34use helpers::*;
35
36pub struct App {
37 pub config: RwLock<RuntimeConfig>,
38 pub providers: HashMap<Network, Box<dyn PayProvider>>,
39 pub writer: mpsc::Sender<Output>,
40 pub in_flight: Mutex<HashMap<String, JoinHandle<()>>>,
41 pub requests_total: AtomicU64,
42 pub start_time: Instant,
43 #[cfg(feature = "redb")]
45 pub has_local_providers: bool,
46 pub enforce_limits: bool,
49 pub spend_ledger: SpendLedger,
50 pub store: Option<Arc<StorageBackend>>,
53}
54
55impl App {
56 pub fn new(
59 config: RuntimeConfig,
60 writer: mpsc::Sender<Output>,
61 enforce_limits_override: Option<bool>,
62 store: Option<StorageBackend>,
63 ) -> Self {
64 let store = store.map(Arc::new);
65 let mut providers: HashMap<Network, Box<dyn PayProvider>> = HashMap::new();
66
67 for network in &[
68 Network::Ln,
69 Network::Sol,
70 Network::Evm,
71 Network::Cashu,
72 Network::Btc,
73 ] {
74 let key = network.to_string();
75 if let Some(rpc_name) = config.providers.get(&key) {
76 if let Some(rpc_cfg) = config.afpay_rpc.get(rpc_name) {
78 let secret = rpc_cfg.endpoint_secret.as_deref().unwrap_or("");
79 providers.insert(
80 *network,
81 Box::new(RemoteProvider::new(&rpc_cfg.endpoint, secret, *network)),
82 );
83 } else {
84 providers.insert(*network, Box::new(StubProvider::new(*network)));
86 }
87 } else {
88 #[allow(unreachable_patterns)]
89 match network {
90 #[cfg(feature = "cashu")]
91 Network::Cashu => {
92 if let Some(s) = &store {
93 let pg_url = config
94 .postgres_url_secret
95 .clone()
96 .filter(|_| config.storage_backend.as_deref() == Some("postgres"));
97 providers.insert(
98 *network,
99 Box::new(CashuProvider::new(&config.data_dir, pg_url, s.clone())),
100 );
101 } else {
102 providers.insert(*network, Box::new(StubProvider::new(*network)));
103 }
104 }
105 #[cfg(any(feature = "ln-nwc", feature = "ln-phoenixd", feature = "ln-lnbits"))]
106 Network::Ln => {
107 if let Some(s) = &store {
108 providers.insert(
109 *network,
110 Box::new(LnProvider::new(&config.data_dir, s.clone())),
111 );
112 } else {
113 providers.insert(*network, Box::new(StubProvider::new(*network)));
114 }
115 }
116 #[cfg(feature = "sol")]
117 Network::Sol => {
118 if let Some(s) = &store {
119 providers.insert(
120 *network,
121 Box::new(SolProvider::new(&config.data_dir, s.clone())),
122 );
123 } else {
124 providers.insert(*network, Box::new(StubProvider::new(*network)));
125 }
126 }
127 #[cfg(feature = "evm")]
128 Network::Evm => {
129 if let Some(s) = &store {
130 providers.insert(
131 *network,
132 Box::new(EvmProvider::new(&config.data_dir, s.clone())),
133 );
134 } else {
135 providers.insert(*network, Box::new(StubProvider::new(*network)));
136 }
137 }
138 #[cfg(any(
139 feature = "btc-esplora",
140 feature = "btc-core",
141 feature = "btc-electrum"
142 ))]
143 Network::Btc => {
144 if let Some(s) = &store {
145 providers.insert(
146 *network,
147 Box::new(BtcProvider::new(&config.data_dir, s.clone())),
148 );
149 } else {
150 providers.insert(*network, Box::new(StubProvider::new(*network)));
151 }
152 }
153 _ => {
154 providers.insert(*network, Box::new(StubProvider::new(*network)));
155 }
156 }
157 }
158 }
159
160 let has_local = providers.values().any(|p| p.writes_locally());
161 let spend_ledger = match store.as_deref() {
162 #[cfg(feature = "postgres")]
163 Some(StorageBackend::Postgres(pg)) => {
164 SpendLedger::new_postgres(pg.pool().clone(), config.exchange_rate.clone())
165 }
166 _ => SpendLedger::new(&config.data_dir, config.exchange_rate.clone()),
167 };
168 Self {
169 config: RwLock::new(config),
170 providers,
171 writer,
172 in_flight: Mutex::new(HashMap::new()),
173 requests_total: AtomicU64::new(0),
174 start_time: Instant::now(),
175 #[cfg(feature = "redb")]
176 has_local_providers: has_local,
177 enforce_limits: enforce_limits_override.unwrap_or(has_local),
178 spend_ledger,
179 store,
180 }
181 }
182}
183
184pub async fn startup_provider_validation_errors(config: &RuntimeConfig) -> Vec<Output> {
187 let mut errors = Vec::new();
188
189 for (network, rpc_name) in &config.providers {
191 if !config.afpay_rpc.contains_key(rpc_name) {
192 errors.push(Output::Error {
193 id: None,
194 error_code: "invalid_config".to_string(),
195 error: format!(
196 "providers.{network} references unknown afpay_rpc node '{rpc_name}'"
197 ),
198 hint: Some(format!(
199 "add [afpay_rpc.{rpc_name}] with endpoint and endpoint_secret to config.toml"
200 )),
201 retryable: false,
202 trace: Trace::from_duration(0),
203 });
204 }
205 }
206 if !errors.is_empty() {
207 return errors;
208 }
209
210 let mut pinged: std::collections::HashSet<String> = std::collections::HashSet::new();
212 for (rpc_name, rpc_cfg) in &config.afpay_rpc {
213 if !pinged.insert(rpc_cfg.endpoint.clone()) {
214 continue;
215 }
216 let network = config
218 .providers
219 .iter()
220 .find(|(_, name)| *name == rpc_name)
221 .and_then(|(k, _)| k.parse::<Network>().ok())
222 .unwrap_or(Network::Cashu);
223 let secret = rpc_cfg.endpoint_secret.as_deref().unwrap_or("");
224 let provider = RemoteProvider::new(&rpc_cfg.endpoint, secret, network);
225 if let Err(err) = provider.ping().await {
226 errors.push(Output::Error {
227 id: None,
228 error_code: "provider_unreachable".to_string(),
229 error: format!("afpay_rpc.{rpc_name} ({}): {err}", rpc_cfg.endpoint),
230 hint: Some("check endpoint address and that the daemon is running".to_string()),
231 retryable: true,
232 trace: Trace::from_duration(0),
233 });
234 }
235 }
236 errors
237}
238
239pub async fn dispatch(app: &App, input: Input) {
240 #[cfg(feature = "redb")]
243 let _lock = if app.has_local_providers
244 && needs_write_lock(&input)
245 && matches!(app.store.as_deref(), Some(StorageBackend::Redb(..)) | None)
246 {
247 match acquire_write_lock(app).await {
248 Ok(guard) => Some(guard),
249 Err(e) => {
250 let id = extract_id(&input);
251 emit_error(&app.writer, id, &e, Instant::now()).await;
252 return;
253 }
254 }
255 } else {
256 None
257 };
258
259 let mut input = input;
261 if let Some(store) = &app.store {
262 if let Err(e) = resolve_wallet_labels(&mut input, store.as_ref()) {
263 let id = extract_id(&input);
264 emit_error(&app.writer, id, &e, Instant::now()).await;
265 return;
266 }
267 }
268
269 match &input {
270 Input::WalletCreate { .. }
272 | Input::LnWalletCreate { .. }
273 | Input::WalletClose { .. }
274 | Input::WalletList { .. }
275 | Input::Balance { .. }
276 | Input::Restore { .. }
277 | Input::WalletShowSeed { .. }
278 | Input::WalletConfigShow { .. }
279 | Input::WalletConfigSet { .. }
280 | Input::WalletConfigTokenAdd { .. }
281 | Input::WalletConfigTokenRemove { .. } => {
282 wallet::dispatch_wallet(app, input).await;
283 emit_migration_log(app).await;
284 return;
285 }
286
287 Input::Receive { .. }
289 | Input::ReceiveClaim { .. }
290 | Input::CashuSend { .. }
291 | Input::CashuReceive { .. }
292 | Input::Send { .. } => {
293 pay::dispatch_pay(app, input).await;
294 emit_migration_log(app).await;
295 return;
296 }
297
298 Input::HistoryList { .. } | Input::HistoryStatus { .. } | Input::HistoryUpdate { .. } => {
300 history::dispatch_history(app, input).await;
301 emit_migration_log(app).await;
302 return;
303 }
304
305 Input::LimitAdd { .. }
307 | Input::LimitRemove { .. }
308 | Input::LimitList { .. }
309 | Input::LimitSet { .. } => {
310 limit::dispatch_limit(app, input).await;
311 emit_migration_log(app).await;
312 return;
313 }
314
315 Input::Config(_) | Input::Version | Input::Close => {}
317 }
318
319 match input {
321 Input::Config(patch) => {
322 let start = Instant::now();
323 let ConfigPatch {
324 data_dir,
325 limits,
326 log,
327 exchange_rate,
328 afpay_rpc,
329 providers,
330 } = patch;
331
332 let mut unsupported = Vec::new();
333 if data_dir.is_some() {
334 unsupported.push("data_dir");
335 }
336 if afpay_rpc.is_some() {
337 unsupported.push("afpay_rpc");
338 }
339 if providers.is_some() {
340 unsupported.push("providers");
341 }
342 if exchange_rate.is_some() {
343 unsupported.push("exchange_rate");
344 }
345 if !unsupported.is_empty() {
346 let err = PayError::NotImplemented(format!(
347 "runtime config only supports 'log' and 'limits'; unsupported fields: {}",
348 unsupported.join(", ")
349 ));
350 emit_error(&app.writer, None, &err, start).await;
351 return;
352 }
353
354 if let Some(ref v) = limits {
355 if !app.enforce_limits {
356 let err = PayError::NotImplemented(
357 "config.limits is unavailable when limits are not enforced locally; configure limits on the RPC daemon"
358 .to_string(),
359 );
360 emit_error(&app.writer, None, &err, start).await;
361 return;
362 }
363 if let Err(e) = app.spend_ledger.set_limits(v).await {
364 emit_error(&app.writer, None, &e, start).await;
365 return;
366 }
367 }
368
369 let mut cfg = app.config.write().await;
370 if let Some(v) = limits {
371 cfg.limits = v;
372 }
373 if let Some(v) = log {
374 cfg.log = agent_first_data::cli_parse_log_filters(&v);
375 }
376 let _ = app.writer.send(Output::Config(cfg.clone())).await;
377 }
378
379 Input::Version => {
380 let _ = app
381 .writer
382 .send(Output::Version {
383 version: crate::config::VERSION.to_string(),
384 trace: PongTrace {
385 uptime_s: app.start_time.elapsed().as_secs(),
386 requests_total: app.requests_total.load(Ordering::Relaxed),
387 in_flight: app.in_flight.lock().await.len(),
388 },
389 })
390 .await;
391 }
392
393 Input::Close => {
394 }
396
397 _ => unreachable!(),
398 }
399
400 emit_migration_log(app).await;
401}