1#[macro_use]
2mod helpers;
3mod history;
4mod limit;
5mod pay;
6mod receive_watch;
7mod spend_guard;
8mod wallet;
9
10#[cfg(any(
11 feature = "btc-esplora",
12 feature = "btc-core",
13 feature = "btc-electrum"
14))]
15use crate::provider::btc::BtcProvider;
16#[cfg(feature = "cashu")]
17use crate::provider::cashu::CashuProvider;
18#[cfg(feature = "evm")]
19use crate::provider::evm::EvmProvider;
20#[cfg(any(feature = "ln-nwc", feature = "ln-phoenixd", feature = "ln-lnbits"))]
21use crate::provider::ln::LnProvider;
22#[cfg(feature = "rpc")]
23use crate::provider::remote::RemoteProvider;
24#[cfg(feature = "sol")]
25use crate::provider::sol::SolProvider;
26use crate::provider::{PayError, PayProvider, StubProvider};
27use crate::spend::SpendLedger;
28use crate::store::StorageBackend;
29use crate::types::*;
30use std::collections::HashMap;
31use std::sync::atomic::{AtomicU64, Ordering};
32use std::sync::Arc;
33use std::time::Instant;
34use tokio::sync::{mpsc, Mutex, RwLock};
35use tokio::task::JoinHandle;
36
37use helpers::*;
38
39pub struct App {
40 pub config: RwLock<RuntimeConfig>,
41 pub providers: HashMap<Network, Box<dyn PayProvider>>,
42 pub writer: mpsc::Sender<Output>,
43 pub in_flight: Mutex<HashMap<String, JoinHandle<()>>>,
44 pub requests_total: AtomicU64,
45 pub start_time: Instant,
46 #[cfg(feature = "redb")]
48 pub has_local_providers: bool,
49 pub enforce_limits: bool,
52 pub spend_ledger: SpendLedger,
53 pub store: Option<Arc<StorageBackend>>,
56}
57
58impl App {
59 pub fn new(
62 config: RuntimeConfig,
63 writer: mpsc::Sender<Output>,
64 enforce_limits_override: Option<bool>,
65 store: Option<StorageBackend>,
66 ) -> Self {
67 let store = store.map(Arc::new);
68 let mut providers: HashMap<Network, Box<dyn PayProvider>> = HashMap::new();
69
70 for network in &[
71 Network::Ln,
72 Network::Sol,
73 Network::Evm,
74 Network::Cashu,
75 Network::Btc,
76 ] {
77 let key = network.to_string();
78 if let Some(rpc_name) = config.providers.get(&key) {
79 if let Some(rpc_cfg) = config.afpay_rpc.get(rpc_name) {
81 #[cfg(feature = "rpc")]
82 {
83 let secret = rpc_cfg.endpoint_secret.as_deref().unwrap_or("");
84 providers.insert(
85 *network,
86 Box::new(RemoteProvider::new(&rpc_cfg.endpoint, secret, *network)),
87 );
88 }
89 #[cfg(not(feature = "rpc"))]
90 {
91 let _ = rpc_cfg;
92 providers.insert(*network, Box::new(StubProvider::new(*network)));
93 }
94 } else {
95 providers.insert(*network, Box::new(StubProvider::new(*network)));
97 }
98 } else {
99 #[allow(unreachable_patterns)]
100 match network {
101 #[cfg(feature = "cashu")]
102 Network::Cashu => {
103 if let Some(s) = &store {
104 let pg_url = config
105 .postgres_url_secret
106 .clone()
107 .filter(|_| config.storage_backend.as_deref() == Some("postgres"));
108 providers.insert(
109 *network,
110 Box::new(CashuProvider::new(&config.data_dir, pg_url, s.clone())),
111 );
112 } else {
113 providers.insert(*network, Box::new(StubProvider::new(*network)));
114 }
115 }
116 #[cfg(any(feature = "ln-nwc", feature = "ln-phoenixd", feature = "ln-lnbits"))]
117 Network::Ln => {
118 if let Some(s) = &store {
119 providers.insert(
120 *network,
121 Box::new(LnProvider::new(&config.data_dir, s.clone())),
122 );
123 } else {
124 providers.insert(*network, Box::new(StubProvider::new(*network)));
125 }
126 }
127 #[cfg(feature = "sol")]
128 Network::Sol => {
129 if let Some(s) = &store {
130 providers.insert(
131 *network,
132 Box::new(SolProvider::new(&config.data_dir, s.clone())),
133 );
134 } else {
135 providers.insert(*network, Box::new(StubProvider::new(*network)));
136 }
137 }
138 #[cfg(feature = "evm")]
139 Network::Evm => {
140 if let Some(s) = &store {
141 providers.insert(
142 *network,
143 Box::new(EvmProvider::new(&config.data_dir, s.clone())),
144 );
145 } else {
146 providers.insert(*network, Box::new(StubProvider::new(*network)));
147 }
148 }
149 #[cfg(any(
150 feature = "btc-esplora",
151 feature = "btc-core",
152 feature = "btc-electrum"
153 ))]
154 Network::Btc => {
155 if let Some(s) = &store {
156 providers.insert(
157 *network,
158 Box::new(BtcProvider::new(&config.data_dir, s.clone())),
159 );
160 } else {
161 providers.insert(*network, Box::new(StubProvider::new(*network)));
162 }
163 }
164 _ => {
165 providers.insert(*network, Box::new(StubProvider::new(*network)));
166 }
167 }
168 }
169 }
170
171 let has_local = providers.values().any(|p| p.writes_locally());
172 let spend_ledger = match store.as_deref() {
173 #[cfg(feature = "postgres")]
174 Some(StorageBackend::Postgres(pg)) => {
175 SpendLedger::new_postgres(pg.pool().clone(), config.exchange_rate.clone())
176 }
177 _ => SpendLedger::new(&config.data_dir, config.exchange_rate.clone()),
178 };
179 Self {
180 config: RwLock::new(config),
181 providers,
182 writer,
183 in_flight: Mutex::new(HashMap::new()),
184 requests_total: AtomicU64::new(0),
185 start_time: Instant::now(),
186 #[cfg(feature = "redb")]
187 has_local_providers: has_local,
188 enforce_limits: enforce_limits_override.unwrap_or(has_local),
189 spend_ledger,
190 store,
191 }
192 }
193}
194
195pub async fn startup_provider_validation_errors(config: &RuntimeConfig) -> Vec<Output> {
198 let mut errors = Vec::new();
199
200 for (network, rpc_name) in &config.providers {
202 if !config.afpay_rpc.contains_key(rpc_name) {
203 errors.push(Output::Error {
204 id: None,
205 error_code: "invalid_config".to_string(),
206 error: format!(
207 "providers.{network} references unknown afpay_rpc node '{rpc_name}'"
208 ),
209 hint: Some(format!(
210 "add [afpay_rpc.{rpc_name}] with endpoint and endpoint_secret to config.toml"
211 )),
212 retryable: false,
213 trace: Trace::from_duration(0),
214 });
215 }
216 }
217 if !errors.is_empty() {
218 return errors;
219 }
220
221 #[cfg(feature = "rpc")]
223 {
224 let mut pinged: std::collections::HashSet<String> = std::collections::HashSet::new();
225 for (rpc_name, rpc_cfg) in &config.afpay_rpc {
226 if !pinged.insert(rpc_cfg.endpoint.clone()) {
227 continue;
228 }
229 let network = config
231 .providers
232 .iter()
233 .find(|(_, name)| *name == rpc_name)
234 .and_then(|(k, _)| k.parse::<Network>().ok())
235 .unwrap_or(Network::Cashu);
236 let secret = rpc_cfg.endpoint_secret.as_deref().unwrap_or("");
237 let provider = RemoteProvider::new(&rpc_cfg.endpoint, secret, network);
238 if let Err(err) = provider.ping().await {
239 errors.push(Output::Error {
240 id: None,
241 error_code: "provider_unreachable".to_string(),
242 error: format!("afpay_rpc.{rpc_name} ({}): {err}", rpc_cfg.endpoint),
243 hint: Some("check endpoint address and that the daemon is running".to_string()),
244 retryable: true,
245 trace: Trace::from_duration(0),
246 });
247 }
248 }
249 }
250 errors
251}
252
253pub async fn dispatch(app: &App, input: Input) {
254 #[cfg(feature = "redb")]
257 let _lock = if app.has_local_providers
258 && needs_write_lock(&input)
259 && matches!(app.store.as_deref(), Some(StorageBackend::Redb(..)) | None)
260 {
261 match acquire_write_lock(app).await {
262 Ok(guard) => Some(guard),
263 Err(e) => {
264 let id = extract_id(&input);
265 emit_error(&app.writer, id, &e, Instant::now()).await;
266 return;
267 }
268 }
269 } else {
270 None
271 };
272
273 let mut input = input;
275 if let Some(store) = &app.store {
276 if let Err(e) = resolve_wallet_labels(&mut input, store.as_ref()) {
277 let id = extract_id(&input);
278 emit_error(&app.writer, id, &e, Instant::now()).await;
279 return;
280 }
281 }
282
283 match &input {
284 Input::WalletCreate { .. }
286 | Input::LnWalletCreate { .. }
287 | Input::WalletClose { .. }
288 | Input::WalletList { .. }
289 | Input::Balance { .. }
290 | Input::Restore { .. }
291 | Input::WalletShowSeed { .. }
292 | Input::WalletConfigShow { .. }
293 | Input::WalletConfigSet { .. }
294 | Input::WalletConfigTokenAdd { .. }
295 | Input::WalletConfigTokenRemove { .. } => {
296 wallet::dispatch_wallet(app, input).await;
297 emit_migration_log(app).await;
298 return;
299 }
300
301 Input::Receive { .. }
303 | Input::ReceiveClaim { .. }
304 | Input::CashuSend { .. }
305 | Input::CashuReceive { .. }
306 | Input::Send { .. } => {
307 pay::dispatch_pay(app, input).await;
308 emit_migration_log(app).await;
309 return;
310 }
311
312 Input::HistoryList { .. } | Input::HistoryStatus { .. } | Input::HistoryUpdate { .. } => {
314 history::dispatch_history(app, input).await;
315 emit_migration_log(app).await;
316 return;
317 }
318
319 Input::LimitAdd { .. }
321 | Input::LimitRemove { .. }
322 | Input::LimitList { .. }
323 | Input::LimitSet { .. } => {
324 limit::dispatch_limit(app, input).await;
325 emit_migration_log(app).await;
326 return;
327 }
328
329 Input::Config(_) | Input::ConfigShow { .. } | Input::Version | Input::Close => {}
331 }
332
333 match input {
335 Input::ConfigShow { .. } => {
336 let cfg = app.config.read().await;
337 let _ = app.writer.send(Output::Config(cfg.clone())).await;
338 }
339 Input::Config(patch) => {
340 let start = Instant::now();
341 let ConfigPatch {
342 data_dir,
343 log,
344 exchange_rate,
345 afpay_rpc,
346 providers,
347 } = patch;
348
349 let mut unsupported = Vec::new();
350 if data_dir.is_some() {
351 unsupported.push("data_dir");
352 }
353 if afpay_rpc.is_some() {
354 unsupported.push("afpay_rpc");
355 }
356 if providers.is_some() {
357 unsupported.push("providers");
358 }
359 if exchange_rate.is_some() {
360 unsupported.push("exchange_rate");
361 }
362 if !unsupported.is_empty() {
363 let err = PayError::NotImplemented(format!(
364 "runtime config only supports 'log'; unsupported fields: {}",
365 unsupported.join(", ")
366 ));
367 emit_error(&app.writer, None, &err, start).await;
368 return;
369 }
370
371 let mut cfg = app.config.write().await;
372 if let Some(v) = log {
373 cfg.log = agent_first_data::cli_parse_log_filters(&v);
374 }
375 let _ = app.writer.send(Output::Config(cfg.clone())).await;
376 }
377
378 Input::Version => {
379 let _ = app
380 .writer
381 .send(Output::Version {
382 version: crate::config::VERSION.to_string(),
383 protocol_version: JSON_PROTOCOL_VERSION,
384 trace: PongTrace {
385 uptime_s: app.start_time.elapsed().as_secs(),
386 requests_total: app.requests_total.load(Ordering::Relaxed),
387 in_flight: app.in_flight.lock().await.len(),
388 },
389 })
390 .await;
391 }
392
393 Input::Close => {
394 }
396
397 _ => unreachable!(),
398 }
399
400 emit_migration_log(app).await;
401}