1#![cfg_attr(not(any(feature = "redb", feature = "postgres")), allow(dead_code))]
2
3pub mod tokens;
4
5use crate::provider::PayError;
6#[cfg(feature = "exchange-rate")]
7use crate::types::ExchangeRateSourceType;
8use crate::types::{ExchangeRateConfig, SpendLimit, SpendLimitStatus, SpendScope};
9#[cfg(feature = "redb")]
10use serde::de::DeserializeOwned;
11use serde::{Deserialize, Serialize};
12use tokio::sync::Mutex;
13
14#[cfg(feature = "redb")]
15use crate::store::db;
16#[cfg(feature = "redb")]
17use redb::{Database, ReadableDatabase, ReadableTable, TableDefinition};
18#[cfg(feature = "redb")]
19use std::path::{Path, PathBuf};
20
21#[cfg(feature = "redb")]
22const META_COUNTER: TableDefinition<&str, u64> = TableDefinition::new("meta_counter");
23#[cfg(feature = "redb")]
24const RULE_BY_ID: TableDefinition<&str, &str> = TableDefinition::new("rule_by_id_v3");
25#[cfg(feature = "redb")]
26const RESERVATION_BY_ID: TableDefinition<u64, &str> = TableDefinition::new("reservation_by_id");
27#[cfg(feature = "redb")]
28const RESERVATION_ID_BY_OP_ID: TableDefinition<&str, u64> =
29 TableDefinition::new("reservation_id_by_op_id");
30#[cfg(feature = "redb")]
31const SPEND_EVENT_BY_ID: TableDefinition<u64, &str> = TableDefinition::new("spend_event_by_id");
32#[cfg(feature = "redb")]
33const FX_QUOTE_BY_PAIR: TableDefinition<&str, &str> = TableDefinition::new("quote_by_pair");
34#[cfg(feature = "redb")]
35const NEXT_RESERVATION_ID_KEY: &str = "next_reservation_id";
36#[cfg(feature = "redb")]
37const NEXT_EVENT_ID_KEY: &str = "next_event_id";
38#[cfg(feature = "redb")]
39const SPEND_VERSION: u64 = 1;
40#[cfg(feature = "redb")]
41const FX_CACHE_VERSION: u64 = 1;
42
43#[derive(Debug, Clone)]
44pub struct SpendContext {
45 pub network: String,
46 pub wallet: Option<String>,
47 pub amount_native: u64,
48 pub token: Option<String>,
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
52enum ReservationStatus {
53 Pending,
54 Confirmed,
55 Cancelled,
56 Expired,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
60struct SpendReservation {
61 reservation_id: u64,
62 op_id: String,
63 network: String,
64 wallet: Option<String>,
65 #[serde(default)]
66 token: Option<String>,
67 amount_native: u64,
68 amount_usd_cents: Option<u64>,
69 status: ReservationStatus,
70 created_at_epoch_ms: u64,
71 expires_at_epoch_ms: u64,
72 finalized_at_epoch_ms: Option<u64>,
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
76struct SpendEvent {
77 event_id: u64,
78 reservation_id: u64,
79 op_id: String,
80 network: String,
81 wallet: Option<String>,
82 #[serde(default)]
83 token: Option<String>,
84 amount_native: u64,
85 amount_usd_cents: Option<u64>,
86 created_at_epoch_ms: u64,
87 confirmed_at_epoch_ms: u64,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91struct ExchangeRateQuote {
92 pair: String,
93 source: String,
94 price: f64,
95 fetched_at_epoch_ms: u64,
96 expires_at_epoch_ms: u64,
97}
98
99#[allow(dead_code)] enum SpendBackend {
105 #[cfg(feature = "redb")]
106 Redb {
107 data_dir: String,
108 },
109 #[cfg(feature = "postgres")]
110 Postgres {
111 pool: sqlx::PgPool,
112 },
113 None,
114}
115
116pub struct SpendLedger {
121 backend: SpendBackend,
122 exchange_rate: Option<ExchangeRateConfig>,
123 mu: Mutex<()>,
124 fx_stale_warned: std::sync::atomic::AtomicBool,
126}
127
128impl SpendLedger {
129 pub fn new(data_dir: &str, exchange_rate: Option<ExchangeRateConfig>) -> Self {
130 #[cfg(feature = "redb")]
131 let backend = SpendBackend::Redb {
132 data_dir: data_dir.to_string(),
133 };
134 #[cfg(not(feature = "redb"))]
135 let backend = {
136 let _ = data_dir;
137 SpendBackend::None
138 };
139 Self {
140 backend,
141 exchange_rate,
142 mu: Mutex::new(()),
143 fx_stale_warned: std::sync::atomic::AtomicBool::new(false),
144 }
145 }
146
147 #[cfg(feature = "postgres")]
148 pub fn new_postgres(pool: sqlx::PgPool, exchange_rate: Option<ExchangeRateConfig>) -> Self {
149 Self {
150 backend: SpendBackend::Postgres { pool },
151 exchange_rate,
152 mu: Mutex::new(()),
153 fx_stale_warned: std::sync::atomic::AtomicBool::new(false),
154 }
155 }
156
157 pub fn take_fx_stale_warning(&self) -> bool {
159 self.fx_stale_warned
160 .swap(false, std::sync::atomic::Ordering::Relaxed)
161 }
162
163 pub async fn add_limit(&self, limit: &mut SpendLimit) -> Result<String, PayError> {
165 validate_limit(limit, self.exchange_rate.as_ref())?;
166
167 let _guard = self.mu.lock().await;
168
169 match &self.backend {
170 #[cfg(feature = "redb")]
171 SpendBackend::Redb { .. } => self.add_limit_redb(limit),
172 #[cfg(feature = "postgres")]
173 SpendBackend::Postgres { .. } => self.add_limit_postgres(limit).await,
174 SpendBackend::None => Err(PayError::NotImplemented(
175 "no storage backend for spend limits".to_string(),
176 )),
177 }
178 }
179
180 pub async fn remove_limit(&self, _rule_id: &str) -> Result<(), PayError> {
182 let _guard = self.mu.lock().await;
183
184 match &self.backend {
185 #[cfg(feature = "redb")]
186 SpendBackend::Redb { .. } => self.remove_limit_redb(_rule_id),
187 #[cfg(feature = "postgres")]
188 SpendBackend::Postgres { .. } => self.remove_limit_postgres(_rule_id).await,
189 SpendBackend::None => Err(PayError::NotImplemented(
190 "no storage backend for spend limits".to_string(),
191 )),
192 }
193 }
194
195 pub async fn set_limits(&self, limits: &[SpendLimit]) -> Result<(), PayError> {
197 for limit in limits {
198 validate_limit(limit, self.exchange_rate.as_ref())?;
199 }
200
201 let _guard = self.mu.lock().await;
202
203 match &self.backend {
204 #[cfg(feature = "redb")]
205 SpendBackend::Redb { .. } => self.set_limits_redb(limits),
206 #[cfg(feature = "postgres")]
207 SpendBackend::Postgres { .. } => self.set_limits_postgres(limits).await,
208 SpendBackend::None => Err(PayError::NotImplemented(
209 "no storage backend for spend limits".to_string(),
210 )),
211 }
212 }
213
214 pub async fn get_status(&self) -> Result<Vec<SpendLimitStatus>, PayError> {
216 let _guard = self.mu.lock().await;
217
218 match &self.backend {
219 #[cfg(feature = "redb")]
220 SpendBackend::Redb { .. } => self.get_status_redb(),
221 #[cfg(feature = "postgres")]
222 SpendBackend::Postgres { .. } => self.get_status_postgres().await,
223 SpendBackend::None => Ok(Vec::new()),
224 }
225 }
226
227 pub async fn reserve(&self, op_id: &str, ctx: &SpendContext) -> Result<u64, PayError> {
229 if op_id.trim().is_empty() {
230 return Err(PayError::InvalidAmount("op_id cannot be empty".to_string()));
231 }
232 if ctx.network.trim().is_empty() {
233 return Err(PayError::InvalidAmount(
234 "network cannot be empty for spend check".to_string(),
235 ));
236 }
237
238 let _guard = self.mu.lock().await;
239
240 match &self.backend {
241 #[cfg(feature = "redb")]
242 SpendBackend::Redb { .. } => self.reserve_redb(op_id, ctx).await,
243 #[cfg(feature = "postgres")]
244 SpendBackend::Postgres { .. } => self.reserve_postgres(op_id, ctx).await,
245 SpendBackend::None => Err(PayError::NotImplemented(
246 "no storage backend for spend limits".to_string(),
247 )),
248 }
249 }
250
251 pub async fn confirm(&self, _reservation_id: u64) -> Result<(), PayError> {
252 let _guard = self.mu.lock().await;
253
254 match &self.backend {
255 #[cfg(feature = "redb")]
256 SpendBackend::Redb { .. } => self.confirm_redb(_reservation_id),
257 #[cfg(feature = "postgres")]
258 SpendBackend::Postgres { .. } => self.confirm_postgres(_reservation_id).await,
259 SpendBackend::None => Err(PayError::NotImplemented(
260 "no storage backend for spend limits".to_string(),
261 )),
262 }
263 }
264
265 pub async fn cancel(&self, _reservation_id: u64) -> Result<(), PayError> {
266 let _guard = self.mu.lock().await;
267
268 match &self.backend {
269 #[cfg(feature = "redb")]
270 SpendBackend::Redb { .. } => self.cancel_redb(_reservation_id),
271 #[cfg(feature = "postgres")]
272 SpendBackend::Postgres { .. } => self.cancel_postgres(_reservation_id).await,
273 SpendBackend::None => Ok(()),
274 }
275 }
276}
277
278#[cfg(feature = "redb")]
283impl SpendLedger {
284 fn spend_db_path(&self) -> PathBuf {
285 match &self.backend {
286 SpendBackend::Redb { data_dir } => Path::new(data_dir).join("spend").join("spend.redb"),
287 #[allow(unreachable_patterns)]
288 _ => PathBuf::new(),
289 }
290 }
291
292 fn exchange_rate_db_path(&self) -> PathBuf {
293 match &self.backend {
294 SpendBackend::Redb { data_dir } => Path::new(data_dir)
295 .join("spend")
296 .join("exchange-rate-cache.redb"),
297 #[allow(unreachable_patterns)]
298 _ => PathBuf::new(),
299 }
300 }
301
302 fn open_spend_db(&self) -> Result<Database, PayError> {
303 db::open_and_migrate(
304 &self.spend_db_path(),
305 SPEND_VERSION,
306 &[
307 &|_db: &Database| Ok(()),
309 ],
310 )
311 }
312
313 fn open_exchange_rate_db(&self) -> Result<Database, PayError> {
314 db::open_and_migrate(
315 &self.exchange_rate_db_path(),
316 FX_CACHE_VERSION,
317 &[
318 &|_db: &Database| Ok(()),
320 ],
321 )
322 }
323
324 fn add_limit_redb(&self, limit: &mut SpendLimit) -> Result<String, PayError> {
325 let db = self.open_spend_db()?;
326 let rule_id = generate_rule_identifier()?;
327 limit.rule_id = Some(rule_id.clone());
328 let encoded = encode(limit)?;
329 let write_txn = db
330 .begin_write()
331 .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
332 {
333 let mut rule_table = write_txn
334 .open_table(RULE_BY_ID)
335 .map_err(|e| PayError::InternalError(format!("spend open rule table: {e}")))?;
336 rule_table
337 .insert(rule_id.as_str(), encoded.as_str())
338 .map_err(|e| PayError::InternalError(format!("spend insert rule: {e}")))?;
339 }
340 write_txn
341 .commit()
342 .map_err(|e| PayError::InternalError(format!("spend commit add_limit: {e}")))?;
343 Ok(rule_id)
344 }
345
346 fn remove_limit_redb(&self, rule_id: &str) -> Result<(), PayError> {
347 let db = self.open_spend_db()?;
348 let write_txn = db
349 .begin_write()
350 .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
351 {
352 let mut rule_table = write_txn
353 .open_table(RULE_BY_ID)
354 .map_err(|e| PayError::InternalError(format!("spend open rule table: {e}")))?;
355 let existed = rule_table
356 .remove(rule_id)
357 .map_err(|e| PayError::InternalError(format!("spend remove rule: {e}")))?;
358 if existed.is_none() {
359 return Err(PayError::InvalidAmount(format!(
360 "rule_id '{rule_id}' not found"
361 )));
362 }
363 }
364 write_txn
365 .commit()
366 .map_err(|e| PayError::InternalError(format!("spend commit remove_limit: {e}")))
367 }
368
369 fn set_limits_redb(&self, limits: &[SpendLimit]) -> Result<(), PayError> {
370 let db = self.open_spend_db()?;
371 let write_txn = db
372 .begin_write()
373 .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
374 {
375 let mut rule_table = write_txn
376 .open_table(RULE_BY_ID)
377 .map_err(|e| PayError::InternalError(format!("spend open rule table: {e}")))?;
378 let existing_ids = rule_table
380 .iter()
381 .map_err(|e| PayError::InternalError(format!("spend iterate rules: {e}")))?
382 .map(|entry| {
383 entry
384 .map(|(k, _)| k.value().to_string())
385 .map_err(|e| PayError::InternalError(format!("spend read rule key: {e}")))
386 })
387 .collect::<Result<Vec<_>, _>>()?;
388 for rid in existing_ids {
389 rule_table
390 .remove(rid.as_str())
391 .map_err(|e| PayError::InternalError(format!("spend remove rule: {e}")))?;
392 }
393
394 for limit in limits {
396 let mut rule = limit.clone();
397 let rid = generate_rule_identifier()?;
398 rule.rule_id = Some(rid.clone());
399 let encoded = encode(&rule)?;
400 rule_table
401 .insert(rid.as_str(), encoded.as_str())
402 .map_err(|e| PayError::InternalError(format!("spend insert rule: {e}")))?;
403 }
404 }
405 write_txn
406 .commit()
407 .map_err(|e| PayError::InternalError(format!("spend commit set_limits: {e}")))
408 }
409
410 fn get_status_redb(&self) -> Result<Vec<SpendLimitStatus>, PayError> {
411 let db = self.open_spend_db()?;
412 let read_txn = db
413 .begin_read()
414 .map_err(|e| PayError::InternalError(format!("spend begin_read: {e}")))?;
415 let rules = load_rules(&read_txn)?;
416 let reservations = load_reservations(&read_txn)?;
417 let now = now_epoch_ms();
418 let mut out = Vec::with_capacity(rules.len());
419 for rule in rules {
420 let use_usd = rule.scope == SpendScope::GlobalUsdCents;
421 let (spent, oldest_ts) = spent_in_window(&rule, &reservations, now, use_usd)?;
422 let remaining = rule.max_spend.saturating_sub(spent);
423 let window_ms = rule.window_s.saturating_mul(1000);
424 let window_reset_s = oldest_ts
425 .map(|oldest| (oldest.saturating_add(window_ms)).saturating_sub(now) / 1000)
426 .unwrap_or(0);
427 out.push(SpendLimitStatus {
428 rule_id: rule.rule_id.clone().unwrap_or_default(),
429 scope: rule.scope,
430 network: rule.network.clone(),
431 wallet: rule.wallet.clone(),
432 window_s: rule.window_s,
433 max_spend: rule.max_spend,
434 spent,
435 remaining,
436 token: rule.token.clone(),
437 window_reset_s,
438 });
439 }
440 Ok(out)
441 }
442
443 async fn reserve_redb(&self, op_id: &str, ctx: &SpendContext) -> Result<u64, PayError> {
444 let now = now_epoch_ms();
445 let db = self.open_spend_db()?;
446
447 let read_txn = db
448 .begin_read()
449 .map_err(|e| PayError::InternalError(format!("spend begin_read: {e}")))?;
450 let rules = load_rules(&read_txn)?;
451
452 if rules.iter().any(|r| {
453 r.scope == SpendScope::Wallet
454 && r.network.as_deref() == Some(ctx.network.as_str())
455 && ctx.wallet.is_none()
456 }) {
457 return Err(PayError::InvalidAmount(
458 "wallet-scoped limits require an explicit wallet".to_string(),
459 ));
460 }
461
462 let needs_usd = rules.iter().any(|r| r.scope == SpendScope::GlobalUsdCents);
464 let amount_usd_cents = if needs_usd {
465 Some(
466 self.amount_to_usd_cents(&ctx.network, ctx.token.as_deref(), ctx.amount_native)
467 .await?,
468 )
469 } else {
470 None
471 };
472
473 let write_txn = db
474 .begin_write()
475 .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
476
477 let mut encoded_blobs: Vec<String> = Vec::new();
478 let reservation_id = {
479 let mut reservation_index =
480 write_txn.open_table(RESERVATION_ID_BY_OP_ID).map_err(|e| {
481 PayError::InternalError(format!("spend open reservation op index: {e}"))
482 })?;
483 if let Some(existing) = reservation_index
484 .get(op_id)
485 .map_err(|e| PayError::InternalError(format!("spend read op index: {e}")))?
486 {
487 let existing_id = existing.value();
488 return Ok(existing_id);
489 }
490
491 let mut reservation_table = write_txn.open_table(RESERVATION_BY_ID).map_err(|e| {
492 PayError::InternalError(format!("spend open reservation table: {e}"))
493 })?;
494
495 expire_pending(&mut reservation_table, now)?;
496
497 let reservations = reservation_table
498 .iter()
499 .map_err(|e| PayError::InternalError(format!("spend iterate reservations: {e}")))?
500 .map(|entry| {
501 let (_k, v) = entry.map_err(|e| {
502 PayError::InternalError(format!("spend read reservation: {e}"))
503 })?;
504 decode::<SpendReservation>(v.value())
505 .map_err(|e| prepend_err("spend decode reservation", e))
506 })
507 .collect::<Result<Vec<_>, _>>()?;
508
509 for rule in rules.iter() {
510 if !rule_matches_context(
511 rule,
512 &ctx.network,
513 ctx.wallet.as_deref(),
514 ctx.token.as_deref(),
515 ) {
516 continue;
517 }
518
519 let use_usd = rule.scope == SpendScope::GlobalUsdCents;
520 let candidate_amount =
521 amount_for_rule(rule, ctx.amount_native, amount_usd_cents, use_usd)?;
522 let (spent, oldest_ts) = spent_in_window(rule, &reservations, now, use_usd)?;
523 if spent.saturating_add(candidate_amount) > rule.max_spend {
524 let window_ms = rule.window_s.saturating_mul(1000);
525 let remaining_s = oldest_ts
526 .map(|oldest| (oldest.saturating_add(window_ms)).saturating_sub(now) / 1000)
527 .unwrap_or(0);
528
529 return Err(PayError::LimitExceeded {
530 rule_id: rule.rule_id.clone().unwrap_or_default(),
531 scope: rule.scope,
532 scope_key: scope_key(rule),
533 spent,
534 max_spend: rule.max_spend,
535 token: rule.token.clone(),
536 remaining_s,
537 origin: None,
538 });
539 }
540 }
541
542 let reservation_id = next_counter(&write_txn, NEXT_RESERVATION_ID_KEY)?;
543 let reservation = SpendReservation {
544 reservation_id,
545 op_id: op_id.to_string(),
546 network: ctx.network.clone(),
547 wallet: ctx.wallet.clone(),
548 token: ctx.token.clone(),
549 amount_native: ctx.amount_native,
550 amount_usd_cents,
551 status: ReservationStatus::Pending,
552 created_at_epoch_ms: now,
553 expires_at_epoch_ms: now.saturating_add(300_000),
554 finalized_at_epoch_ms: None,
555 };
556 encoded_blobs.push(encode(&reservation)?);
557 let encoded = encoded_blobs
558 .last()
559 .ok_or_else(|| PayError::InternalError("missing reservation blob".to_string()))?;
560 reservation_table
561 .insert(reservation_id, encoded.as_str())
562 .map_err(|e| PayError::InternalError(format!("spend insert reservation: {e}")))?;
563 reservation_index
564 .insert(op_id, reservation_id)
565 .map_err(|e| PayError::InternalError(format!("spend insert op index: {e}")))?;
566 reservation_id
567 };
568
569 write_txn
570 .commit()
571 .map_err(|e| PayError::InternalError(format!("spend commit reserve: {e}")))?;
572 Ok(reservation_id)
573 }
574
575 fn confirm_redb(&self, reservation_id: u64) -> Result<(), PayError> {
576 let db = self.open_spend_db()?;
577 let now = now_epoch_ms();
578
579 let write_txn = db
580 .begin_write()
581 .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
582
583 let mut encoded_blobs: Vec<String> = Vec::new();
584 {
585 let mut reservation_table = write_txn.open_table(RESERVATION_BY_ID).map_err(|e| {
586 PayError::InternalError(format!("spend open reservation table: {e}"))
587 })?;
588 let Some(existing_bytes) = reservation_table
589 .get(reservation_id)
590 .map_err(|e| PayError::InternalError(format!("spend read reservation: {e}")))?
591 .map(|g| g.value().to_string())
592 else {
593 return Err(PayError::InternalError(format!(
594 "reservation {reservation_id} not found"
595 )));
596 };
597
598 let mut reservation: SpendReservation = decode(&existing_bytes)?;
599 if !matches!(reservation.status, ReservationStatus::Pending) {
600 return Ok(());
601 }
602
603 reservation.status = ReservationStatus::Confirmed;
604 reservation.finalized_at_epoch_ms = Some(now);
605 encoded_blobs.push(encode(&reservation)?);
606 let encoded = encoded_blobs
607 .last()
608 .ok_or_else(|| PayError::InternalError("missing reservation blob".to_string()))?;
609 reservation_table
610 .insert(reservation_id, encoded.as_str())
611 .map_err(|e| PayError::InternalError(format!("spend update reservation: {e}")))?;
612
613 let mut events = write_txn
614 .open_table(SPEND_EVENT_BY_ID)
615 .map_err(|e| PayError::InternalError(format!("spend open event table: {e}")))?;
616 let event_id = next_counter(&write_txn, NEXT_EVENT_ID_KEY)?;
617 let event = SpendEvent {
618 event_id,
619 reservation_id,
620 op_id: reservation.op_id,
621 network: reservation.network,
622 wallet: reservation.wallet,
623 token: reservation.token,
624 amount_native: reservation.amount_native,
625 amount_usd_cents: reservation.amount_usd_cents,
626 created_at_epoch_ms: reservation.created_at_epoch_ms,
627 confirmed_at_epoch_ms: now,
628 };
629 encoded_blobs.push(encode(&event)?);
630 let encoded_event = encoded_blobs
631 .last()
632 .ok_or_else(|| PayError::InternalError("missing event blob".to_string()))?;
633 events
634 .insert(event_id, encoded_event.as_str())
635 .map_err(|e| PayError::InternalError(format!("spend insert event: {e}")))?;
636 }
637
638 write_txn
639 .commit()
640 .map_err(|e| PayError::InternalError(format!("spend commit confirm: {e}")))
641 }
642
643 fn cancel_redb(&self, reservation_id: u64) -> Result<(), PayError> {
644 let db = self.open_spend_db()?;
645 let now = now_epoch_ms();
646
647 let write_txn = db
648 .begin_write()
649 .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
650
651 let mut encoded_blobs: Vec<String> = Vec::new();
652 {
653 let mut reservation_table = write_txn.open_table(RESERVATION_BY_ID).map_err(|e| {
654 PayError::InternalError(format!("spend open reservation table: {e}"))
655 })?;
656 let existing = reservation_table
657 .get(reservation_id)
658 .map_err(|e| PayError::InternalError(format!("spend read reservation: {e}")))?;
659 let existing_bytes = existing.map(|g| g.value().to_string());
660 if let Some(existing_bytes) = existing_bytes {
661 let mut reservation: SpendReservation = decode(&existing_bytes)?;
662 if matches!(reservation.status, ReservationStatus::Pending) {
663 reservation.status = ReservationStatus::Cancelled;
664 reservation.finalized_at_epoch_ms = Some(now);
665 encoded_blobs.push(encode(&reservation)?);
666 let encoded = encoded_blobs.last().ok_or_else(|| {
667 PayError::InternalError("missing reservation blob".to_string())
668 })?;
669 reservation_table
670 .insert(reservation_id, encoded.as_str())
671 .map_err(|e| {
672 PayError::InternalError(format!("spend update reservation: {e}"))
673 })?;
674 }
675 }
676 }
677
678 write_txn
679 .commit()
680 .map_err(|e| PayError::InternalError(format!("spend commit cancel: {e}")))
681 }
682}
683
684#[cfg(feature = "postgres")]
689impl SpendLedger {
690 fn pg_pool(&self) -> Result<&sqlx::PgPool, PayError> {
691 match &self.backend {
692 SpendBackend::Postgres { pool } => Ok(pool),
693 _ => Err(PayError::InternalError(
694 "expected postgres spend backend".to_string(),
695 )),
696 }
697 }
698
699 async fn add_limit_postgres(&self, limit: &mut SpendLimit) -> Result<String, PayError> {
700 let pool = self.pg_pool()?;
701 let rule_id = generate_rule_identifier()?;
702 limit.rule_id = Some(rule_id.clone());
703 let rule_json = serde_json::to_value(limit)
704 .map_err(|e| PayError::InternalError(format!("serialize spend rule: {e}")))?;
705
706 sqlx::query("INSERT INTO spend_rules (rule_id, rule) VALUES ($1, $2)")
707 .bind(&rule_id)
708 .bind(&rule_json)
709 .execute(pool)
710 .await
711 .map_err(|e| PayError::InternalError(format!("pg insert spend rule: {e}")))?;
712
713 Ok(rule_id)
714 }
715
716 async fn remove_limit_postgres(&self, rule_id: &str) -> Result<(), PayError> {
717 let pool = self.pg_pool()?;
718 let result = sqlx::query("DELETE FROM spend_rules WHERE rule_id = $1")
719 .bind(rule_id)
720 .execute(pool)
721 .await
722 .map_err(|e| PayError::InternalError(format!("pg delete spend rule: {e}")))?;
723
724 if result.rows_affected() == 0 {
725 return Err(PayError::InvalidAmount(format!(
726 "rule_id '{rule_id}' not found"
727 )));
728 }
729 Ok(())
730 }
731
732 async fn set_limits_postgres(&self, limits: &[SpendLimit]) -> Result<(), PayError> {
733 let pool = self.pg_pool()?;
734 let mut tx = pool
735 .begin()
736 .await
737 .map_err(|e| PayError::InternalError(format!("pg begin tx: {e}")))?;
738
739 sqlx::query("DELETE FROM spend_rules")
740 .execute(&mut *tx)
741 .await
742 .map_err(|e| PayError::InternalError(format!("pg clear spend rules: {e}")))?;
743
744 for limit in limits {
745 let mut rule = limit.clone();
746 let rid = generate_rule_identifier()?;
747 rule.rule_id = Some(rid.clone());
748 let rule_json = serde_json::to_value(&rule)
749 .map_err(|e| PayError::InternalError(format!("serialize spend rule: {e}")))?;
750 sqlx::query("INSERT INTO spend_rules (rule_id, rule) VALUES ($1, $2)")
751 .bind(&rid)
752 .bind(&rule_json)
753 .execute(&mut *tx)
754 .await
755 .map_err(|e| PayError::InternalError(format!("pg insert spend rule: {e}")))?;
756 }
757
758 tx.commit()
759 .await
760 .map_err(|e| PayError::InternalError(format!("pg commit set_limits: {e}")))
761 }
762
763 async fn get_status_postgres(&self) -> Result<Vec<SpendLimitStatus>, PayError> {
764 let pool = self.pg_pool()?;
765 let rules = pg_load_rules(pool).await?;
766 let reservations = pg_load_reservations(pool).await?;
767 let now = now_epoch_ms();
768
769 let mut out = Vec::with_capacity(rules.len());
770 for rule in rules {
771 let use_usd = rule.scope == SpendScope::GlobalUsdCents;
772 let (spent, oldest_ts) = spent_in_window(&rule, &reservations, now, use_usd)?;
773 let remaining = rule.max_spend.saturating_sub(spent);
774 let window_ms = rule.window_s.saturating_mul(1000);
775 let window_reset_s = oldest_ts
776 .map(|oldest| (oldest.saturating_add(window_ms)).saturating_sub(now) / 1000)
777 .unwrap_or(0);
778 out.push(SpendLimitStatus {
779 rule_id: rule.rule_id.clone().unwrap_or_default(),
780 scope: rule.scope,
781 network: rule.network.clone(),
782 wallet: rule.wallet.clone(),
783 window_s: rule.window_s,
784 max_spend: rule.max_spend,
785 spent,
786 remaining,
787 token: rule.token.clone(),
788 window_reset_s,
789 });
790 }
791 Ok(out)
792 }
793
794 async fn reserve_postgres(&self, op_id: &str, ctx: &SpendContext) -> Result<u64, PayError> {
795 use crate::store::postgres_store::SPEND_ADVISORY_LOCK_KEY;
796
797 let pool = self.pg_pool()?;
798 let now = now_epoch_ms();
799
800 let rules = pg_load_rules(pool).await?;
802 if rules.iter().any(|r| {
803 r.scope == SpendScope::Wallet
804 && r.network.as_deref() == Some(ctx.network.as_str())
805 && ctx.wallet.is_none()
806 }) {
807 return Err(PayError::InvalidAmount(
808 "wallet-scoped limits require an explicit wallet".to_string(),
809 ));
810 }
811
812 let needs_usd = rules.iter().any(|r| r.scope == SpendScope::GlobalUsdCents);
813 let amount_usd_cents = if needs_usd {
814 Some(
815 self.amount_to_usd_cents(&ctx.network, ctx.token.as_deref(), ctx.amount_native)
816 .await?,
817 )
818 } else {
819 None
820 };
821
822 let mut tx = pool
824 .begin()
825 .await
826 .map_err(|e| PayError::InternalError(format!("pg begin tx: {e}")))?;
827
828 sqlx::query("SELECT pg_advisory_xact_lock($1)")
829 .bind(SPEND_ADVISORY_LOCK_KEY)
830 .execute(&mut *tx)
831 .await
832 .map_err(|e| PayError::InternalError(format!("pg advisory lock: {e}")))?;
833
834 let existing: Option<(i64,)> =
836 sqlx::query_as("SELECT reservation_id FROM spend_reservations WHERE op_id = $1")
837 .bind(op_id)
838 .fetch_optional(&mut *tx)
839 .await
840 .map_err(|e| PayError::InternalError(format!("pg check op_id: {e}")))?;
841
842 if let Some((rid,)) = existing {
843 return Ok(rid as u64);
844 }
845
846 pg_expire_pending(&mut tx, now).await?;
848
849 let reservations = pg_load_reservations_tx(&mut tx).await?;
851
852 let rules = pg_load_rules_tx(&mut tx).await?;
854
855 for rule in rules.iter() {
857 if !rule_matches_context(
858 rule,
859 &ctx.network,
860 ctx.wallet.as_deref(),
861 ctx.token.as_deref(),
862 ) {
863 continue;
864 }
865
866 let use_usd = rule.scope == SpendScope::GlobalUsdCents;
867 let candidate_amount =
868 amount_for_rule(rule, ctx.amount_native, amount_usd_cents, use_usd)?;
869 let (spent, oldest_ts) = spent_in_window(rule, &reservations, now, use_usd)?;
870 if spent.saturating_add(candidate_amount) > rule.max_spend {
871 let window_ms = rule.window_s.saturating_mul(1000);
872 let remaining_s = oldest_ts
873 .map(|oldest| (oldest.saturating_add(window_ms)).saturating_sub(now) / 1000)
874 .unwrap_or(0);
875
876 return Err(PayError::LimitExceeded {
877 rule_id: rule.rule_id.clone().unwrap_or_default(),
878 scope: rule.scope,
879 scope_key: scope_key(rule),
880 spent,
881 max_spend: rule.max_spend,
882 token: rule.token.clone(),
883 remaining_s,
884 origin: None,
885 });
886 }
887 }
888
889 let reservation = SpendReservation {
891 reservation_id: 0, op_id: op_id.to_string(),
893 network: ctx.network.clone(),
894 wallet: ctx.wallet.clone(),
895 token: ctx.token.clone(),
896 amount_native: ctx.amount_native,
897 amount_usd_cents,
898 status: ReservationStatus::Pending,
899 created_at_epoch_ms: now,
900 expires_at_epoch_ms: now.saturating_add(300_000),
901 finalized_at_epoch_ms: None,
902 };
903 let reservation_json = serde_json::to_value(&reservation)
904 .map_err(|e| PayError::InternalError(format!("serialize reservation: {e}")))?;
905
906 let row: (i64,) = sqlx::query_as(
907 "INSERT INTO spend_reservations (op_id, reservation) \
908 VALUES ($1, $2) RETURNING reservation_id",
909 )
910 .bind(op_id)
911 .bind(&reservation_json)
912 .fetch_one(&mut *tx)
913 .await
914 .map_err(|e| PayError::InternalError(format!("pg insert reservation: {e}")))?;
915
916 let reservation_id = row.0 as u64;
917
918 let mut updated_json = reservation_json;
920 updated_json["reservation_id"] = serde_json::json!(reservation_id);
921 sqlx::query("UPDATE spend_reservations SET reservation = $1 WHERE reservation_id = $2")
922 .bind(&updated_json)
923 .bind(row.0)
924 .execute(&mut *tx)
925 .await
926 .map_err(|e| PayError::InternalError(format!("pg update reservation id: {e}")))?;
927
928 tx.commit()
929 .await
930 .map_err(|e| PayError::InternalError(format!("pg commit reserve: {e}")))?;
931
932 Ok(reservation_id)
933 }
934
935 async fn confirm_postgres(&self, reservation_id: u64) -> Result<(), PayError> {
936 let pool = self.pg_pool()?;
937 let now = now_epoch_ms();
938 let rid = reservation_id as i64;
939
940 let row: Option<(serde_json::Value,)> =
941 sqlx::query_as("SELECT reservation FROM spend_reservations WHERE reservation_id = $1")
942 .bind(rid)
943 .fetch_optional(pool)
944 .await
945 .map_err(|e| PayError::InternalError(format!("pg read reservation: {e}")))?;
946
947 let Some((res_json,)) = row else {
948 return Err(PayError::InternalError(format!(
949 "reservation {reservation_id} not found"
950 )));
951 };
952
953 let mut reservation: SpendReservation = serde_json::from_value(res_json)
954 .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))?;
955
956 if !matches!(reservation.status, ReservationStatus::Pending) {
957 return Ok(());
958 }
959
960 reservation.status = ReservationStatus::Confirmed;
961 reservation.finalized_at_epoch_ms = Some(now);
962 let updated_json = serde_json::to_value(&reservation)
963 .map_err(|e| PayError::InternalError(format!("serialize reservation: {e}")))?;
964
965 let event = SpendEvent {
966 event_id: 0, reservation_id,
968 op_id: reservation.op_id,
969 network: reservation.network,
970 wallet: reservation.wallet,
971 token: reservation.token,
972 amount_native: reservation.amount_native,
973 amount_usd_cents: reservation.amount_usd_cents,
974 created_at_epoch_ms: reservation.created_at_epoch_ms,
975 confirmed_at_epoch_ms: now,
976 };
977 let event_json = serde_json::to_value(&event)
978 .map_err(|e| PayError::InternalError(format!("serialize spend event: {e}")))?;
979
980 let mut tx = pool
981 .begin()
982 .await
983 .map_err(|e| PayError::InternalError(format!("pg begin tx: {e}")))?;
984
985 sqlx::query("UPDATE spend_reservations SET reservation = $1 WHERE reservation_id = $2")
986 .bind(&updated_json)
987 .bind(rid)
988 .execute(&mut *tx)
989 .await
990 .map_err(|e| PayError::InternalError(format!("pg update reservation: {e}")))?;
991
992 sqlx::query("INSERT INTO spend_events (reservation_id, event) VALUES ($1, $2)")
993 .bind(rid)
994 .bind(&event_json)
995 .execute(&mut *tx)
996 .await
997 .map_err(|e| PayError::InternalError(format!("pg insert spend event: {e}")))?;
998
999 tx.commit()
1000 .await
1001 .map_err(|e| PayError::InternalError(format!("pg commit confirm: {e}")))
1002 }
1003
1004 async fn cancel_postgres(&self, reservation_id: u64) -> Result<(), PayError> {
1005 let pool = self.pg_pool()?;
1006 let now = now_epoch_ms();
1007 let rid = reservation_id as i64;
1008
1009 let row: Option<(serde_json::Value,)> =
1010 sqlx::query_as("SELECT reservation FROM spend_reservations WHERE reservation_id = $1")
1011 .bind(rid)
1012 .fetch_optional(pool)
1013 .await
1014 .map_err(|e| PayError::InternalError(format!("pg read reservation: {e}")))?;
1015
1016 if let Some((res_json,)) = row {
1017 let mut reservation: SpendReservation = serde_json::from_value(res_json)
1018 .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))?;
1019
1020 if matches!(reservation.status, ReservationStatus::Pending) {
1021 reservation.status = ReservationStatus::Cancelled;
1022 reservation.finalized_at_epoch_ms = Some(now);
1023 let updated_json = serde_json::to_value(&reservation)
1024 .map_err(|e| PayError::InternalError(format!("serialize reservation: {e}")))?;
1025
1026 sqlx::query(
1027 "UPDATE spend_reservations SET reservation = $1 WHERE reservation_id = $2",
1028 )
1029 .bind(&updated_json)
1030 .bind(rid)
1031 .execute(pool)
1032 .await
1033 .map_err(|e| PayError::InternalError(format!("pg update reservation: {e}")))?;
1034 }
1035 }
1036
1037 Ok(())
1038 }
1039}
1040
1041#[cfg(feature = "postgres")]
1042async fn pg_load_rules(pool: &sqlx::PgPool) -> Result<Vec<SpendLimit>, PayError> {
1043 let rows: Vec<(serde_json::Value,)> =
1044 sqlx::query_as("SELECT rule FROM spend_rules ORDER BY rule_id")
1045 .fetch_all(pool)
1046 .await
1047 .map_err(|e| PayError::InternalError(format!("pg load spend rules: {e}")))?;
1048 rows.into_iter()
1049 .map(|(v,)| {
1050 serde_json::from_value(v)
1051 .map_err(|e| PayError::InternalError(format!("pg parse spend rule: {e}")))
1052 })
1053 .collect()
1054}
1055
1056#[cfg(feature = "postgres")]
1057async fn pg_load_rules_tx(
1058 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1059) -> Result<Vec<SpendLimit>, PayError> {
1060 let rows: Vec<(serde_json::Value,)> =
1061 sqlx::query_as("SELECT rule FROM spend_rules ORDER BY rule_id")
1062 .fetch_all(&mut **tx)
1063 .await
1064 .map_err(|e| PayError::InternalError(format!("pg load spend rules: {e}")))?;
1065 rows.into_iter()
1066 .map(|(v,)| {
1067 serde_json::from_value(v)
1068 .map_err(|e| PayError::InternalError(format!("pg parse spend rule: {e}")))
1069 })
1070 .collect()
1071}
1072
1073#[cfg(feature = "postgres")]
1074async fn pg_load_reservations(pool: &sqlx::PgPool) -> Result<Vec<SpendReservation>, PayError> {
1075 let rows: Vec<(serde_json::Value,)> =
1076 sqlx::query_as("SELECT reservation FROM spend_reservations ORDER BY reservation_id")
1077 .fetch_all(pool)
1078 .await
1079 .map_err(|e| PayError::InternalError(format!("pg load reservations: {e}")))?;
1080 rows.into_iter()
1081 .map(|(v,)| {
1082 serde_json::from_value(v)
1083 .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))
1084 })
1085 .collect()
1086}
1087
1088#[cfg(feature = "postgres")]
1089async fn pg_load_reservations_tx(
1090 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1091) -> Result<Vec<SpendReservation>, PayError> {
1092 let rows: Vec<(serde_json::Value,)> =
1093 sqlx::query_as("SELECT reservation FROM spend_reservations ORDER BY reservation_id")
1094 .fetch_all(&mut **tx)
1095 .await
1096 .map_err(|e| PayError::InternalError(format!("pg load reservations: {e}")))?;
1097 rows.into_iter()
1098 .map(|(v,)| {
1099 serde_json::from_value(v)
1100 .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))
1101 })
1102 .collect()
1103}
1104
1105#[cfg(feature = "postgres")]
1106async fn pg_expire_pending(
1107 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1108 now_ms: u64,
1109) -> Result<(), PayError> {
1110 let rows: Vec<(i64, serde_json::Value)> =
1112 sqlx::query_as("SELECT reservation_id, reservation FROM spend_reservations")
1113 .fetch_all(&mut **tx)
1114 .await
1115 .map_err(|e| {
1116 PayError::InternalError(format!("pg load reservations for expire: {e}"))
1117 })?;
1118
1119 for (rid, res_json) in rows {
1120 let mut reservation: SpendReservation = serde_json::from_value(res_json)
1121 .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))?;
1122 if matches!(reservation.status, ReservationStatus::Pending)
1123 && reservation.expires_at_epoch_ms <= now_ms
1124 {
1125 reservation.status = ReservationStatus::Expired;
1126 reservation.finalized_at_epoch_ms = Some(now_ms);
1127 let updated = serde_json::to_value(&reservation)
1128 .map_err(|e| PayError::InternalError(format!("serialize reservation: {e}")))?;
1129 sqlx::query("UPDATE spend_reservations SET reservation = $1 WHERE reservation_id = $2")
1130 .bind(&updated)
1131 .bind(rid)
1132 .execute(&mut **tx)
1133 .await
1134 .map_err(|e| PayError::InternalError(format!("pg expire reservation: {e}")))?;
1135 }
1136 }
1137 Ok(())
1138}
1139
1140impl SpendLedger {
1145 async fn amount_to_usd_cents(
1146 &self,
1147 network: &str,
1148 token: Option<&str>,
1149 amount_native: u64,
1150 ) -> Result<u64, PayError> {
1151 let (symbol, divisor) = token_asset(network, token).ok_or_else(|| {
1152 PayError::InvalidAmount(format!(
1153 "network '{network}' token '{token:?}' is unsupported for global-usd-cents limits"
1154 ))
1155 })?;
1156
1157 let quote = self.get_or_fetch_quote(symbol, "USD").await?;
1158
1159 let ttl_ms = quote
1161 .expires_at_epoch_ms
1162 .saturating_sub(quote.fetched_at_epoch_ms);
1163 let age_ms = now_epoch_ms().saturating_sub(quote.fetched_at_epoch_ms);
1164 if ttl_ms > 0 && age_ms > ttl_ms * 4 / 5 {
1165 self.fx_stale_warned
1166 .store(true, std::sync::atomic::Ordering::Relaxed);
1167 }
1168
1169 let usd = (amount_native as f64 / divisor) * quote.price;
1170 if !usd.is_finite() || usd < 0f64 {
1171 return Err(PayError::InternalError(
1172 "invalid exchange-rate conversion result".to_string(),
1173 ));
1174 }
1175 Ok((usd * 100f64).round() as u64)
1176 }
1177
1178 async fn get_or_fetch_quote(
1179 &self,
1180 base: &str,
1181 quote: &str,
1182 ) -> Result<ExchangeRateQuote, PayError> {
1183 let pair = format!(
1184 "{}/{}",
1185 base.to_ascii_uppercase(),
1186 quote.to_ascii_uppercase()
1187 );
1188 let now = now_epoch_ms();
1189
1190 #[cfg(feature = "redb")]
1192 if let SpendBackend::Redb { .. } = &self.backend {
1193 let fx_db = self.open_exchange_rate_db()?;
1194 let read_txn = fx_db
1195 .begin_read()
1196 .map_err(|e| PayError::InternalError(format!("fx begin_read: {e}")))?;
1197 if let Ok(table) = read_txn.open_table(FX_QUOTE_BY_PAIR) {
1198 if let Some(entry) = table
1199 .get(pair.as_str())
1200 .map_err(|e| PayError::InternalError(format!("fx read quote: {e}")))?
1201 {
1202 let cached: ExchangeRateQuote = decode(entry.value())?;
1203 if cached.expires_at_epoch_ms > now {
1204 return Ok(cached);
1205 }
1206 }
1207 }
1208 }
1209
1210 #[cfg(feature = "postgres")]
1212 if let SpendBackend::Postgres { pool } = &self.backend {
1213 let row: Option<(serde_json::Value,)> =
1214 sqlx::query_as("SELECT quote FROM exchange_rate_cache WHERE pair = $1")
1215 .bind(&pair)
1216 .fetch_optional(pool)
1217 .await
1218 .map_err(|e| PayError::InternalError(format!("pg fx read cache: {e}")))?;
1219 if let Some((quote_json,)) = row {
1220 let cached: ExchangeRateQuote = serde_json::from_value(quote_json)
1221 .map_err(|e| PayError::InternalError(format!("pg fx parse cache: {e}")))?;
1222 if cached.expires_at_epoch_ms > now {
1223 return Ok(cached);
1224 }
1225 }
1226 }
1227
1228 let (fetched_price, source_name) = self.fetch_exchange_rate_http(base, quote).await?;
1229 let ttl_s = self
1230 .exchange_rate
1231 .as_ref()
1232 .map(|cfg| cfg.ttl_s)
1233 .unwrap_or(300)
1234 .max(1);
1235 let new_quote = ExchangeRateQuote {
1236 pair: pair.clone(),
1237 source: source_name,
1238 price: fetched_price,
1239 fetched_at_epoch_ms: now,
1240 expires_at_epoch_ms: now.saturating_add(ttl_s.saturating_mul(1000)),
1241 };
1242
1243 #[cfg(feature = "redb")]
1245 if let SpendBackend::Redb { .. } = &self.backend {
1246 let fx_db = self.open_exchange_rate_db()?;
1247 let write_txn = fx_db
1248 .begin_write()
1249 .map_err(|e| PayError::InternalError(format!("fx begin_write: {e}")))?;
1250 let mut encoded_blobs: Vec<String> = Vec::new();
1251 {
1252 let mut table = write_txn
1253 .open_table(FX_QUOTE_BY_PAIR)
1254 .map_err(|e| PayError::InternalError(format!("fx open quote table: {e}")))?;
1255 encoded_blobs.push(encode(&new_quote)?);
1256 let encoded = encoded_blobs
1257 .last()
1258 .ok_or_else(|| PayError::InternalError("missing quote blob".to_string()))?;
1259 table
1260 .insert(pair.as_str(), encoded.as_str())
1261 .map_err(|e| PayError::InternalError(format!("fx insert quote: {e}")))?;
1262 }
1263 write_txn
1264 .commit()
1265 .map_err(|e| PayError::InternalError(format!("fx commit write: {e}")))?;
1266 }
1267
1268 #[cfg(feature = "postgres")]
1270 if let SpendBackend::Postgres { pool } = &self.backend {
1271 let quote_json = serde_json::to_value(&new_quote)
1272 .map_err(|e| PayError::InternalError(format!("serialize fx quote: {e}")))?;
1273 let _ = sqlx::query(
1274 "INSERT INTO exchange_rate_cache (pair, quote) VALUES ($1, $2) \
1275 ON CONFLICT (pair) DO UPDATE SET quote = $2",
1276 )
1277 .bind(&pair)
1278 .bind("e_json)
1279 .execute(pool)
1280 .await;
1281 }
1282
1283 Ok(new_quote)
1284 }
1285
1286 #[cfg(feature = "exchange-rate")]
1287 async fn fetch_exchange_rate_http(
1288 &self,
1289 base: &str,
1290 quote_currency: &str,
1291 ) -> Result<(f64, String), PayError> {
1292 let cfg = self.exchange_rate.as_ref().cloned().unwrap_or_default();
1293
1294 if cfg.sources.is_empty() {
1295 return Err(PayError::InvalidAmount(
1296 "exchange_rate.sources is empty — no exchange-rate API configured".to_string(),
1297 ));
1298 }
1299
1300 let client = reqwest::Client::new();
1301 let mut last_err = String::new();
1302
1303 for source in &cfg.sources {
1304 match fetch_from_source(&client, source, base, quote_currency).await {
1305 Ok(price) => return Ok((price, source.endpoint.clone())),
1306 Err(e) => {
1307 last_err =
1308 format!("{} ({}): {e}", source.endpoint, source.source_type.as_str());
1309 }
1310 }
1311 }
1312
1313 Err(PayError::NetworkError(format!(
1314 "all exchange-rate sources failed; last: {last_err}"
1315 )))
1316 }
1317
1318 #[cfg(not(feature = "exchange-rate"))]
1319 async fn fetch_exchange_rate_http(
1320 &self,
1321 _base: &str,
1322 _quote_currency: &str,
1323 ) -> Result<(f64, String), PayError> {
1324 Err(PayError::NotImplemented(
1325 "exchange-rate HTTP support is not built in this feature set".to_string(),
1326 ))
1327 }
1328}
1329
1330fn now_epoch_ms() -> u64 {
1335 std::time::SystemTime::now()
1336 .duration_since(std::time::UNIX_EPOCH)
1337 .map(|d| d.as_millis() as u64)
1338 .unwrap_or(0)
1339}
1340
1341fn token_asset(network: &str, token: Option<&str>) -> Option<(&'static str, f64)> {
1342 match token.map(|t| t.to_ascii_lowercase()).as_deref() {
1343 Some("sol") => Some(("SOL", 1e9)),
1344 Some("eth") => Some(("ETH", 1e18)),
1345 Some("usdc" | "usdt") => Some(("USD", 1e6)),
1346 Some(_) => None,
1347 None => {
1348 let p = network.to_ascii_lowercase();
1349 if p.starts_with("ln") || p == "cashu" || p == "btc" {
1350 Some(("BTC", 1e8))
1351 } else {
1352 None
1353 }
1354 }
1355 }
1356}
1357
1358#[cfg(feature = "exchange-rate")]
1359fn extract_price_generic(value: &serde_json::Value) -> Option<f64> {
1360 value
1361 .get("price")
1362 .and_then(|v| v.as_f64())
1363 .or_else(|| value.get("rate").and_then(|v| v.as_f64()))
1364 .or_else(|| value.get("usd_per_base").and_then(|v| v.as_f64()))
1365 .or_else(|| {
1366 value
1367 .get("data")
1368 .and_then(|d| d.get("price"))
1369 .and_then(|v| v.as_f64())
1370 })
1371}
1372
1373#[cfg(feature = "exchange-rate")]
1374impl ExchangeRateSourceType {
1375 fn as_str(self) -> &'static str {
1376 match self {
1377 Self::Generic => "generic",
1378 Self::CoinGecko => "coingecko",
1379 Self::Kraken => "kraken",
1380 }
1381 }
1382}
1383
1384#[cfg(feature = "exchange-rate")]
1385fn coingecko_coin_id(symbol: &str) -> Option<&'static str> {
1386 match symbol.to_ascii_uppercase().as_str() {
1387 "BTC" => Some("bitcoin"),
1388 "SOL" => Some("solana"),
1389 "ETH" => Some("ethereum"),
1390 _ => None,
1391 }
1392}
1393
1394#[cfg(feature = "exchange-rate")]
1395fn kraken_pair(symbol: &str) -> Option<&'static str> {
1396 match symbol.to_ascii_uppercase().as_str() {
1397 "BTC" => Some("XBTUSD"),
1398 "SOL" => Some("SOLUSD"),
1399 "ETH" => Some("ETHUSD"),
1400 _ => None,
1401 }
1402}
1403
1404#[cfg(feature = "exchange-rate")]
1405async fn fetch_from_source(
1406 client: &reqwest::Client,
1407 source: &crate::types::ExchangeRateSource,
1408 base: &str,
1409 quote_currency: &str,
1410) -> Result<f64, String> {
1411 type PriceExtractor = Box<dyn Fn(&serde_json::Value) -> Option<f64> + Send>;
1412 let (url, extract_fn): (String, PriceExtractor) = match source.source_type {
1413 ExchangeRateSourceType::Kraken => {
1414 let pair = kraken_pair(base)
1415 .ok_or_else(|| format!("kraken: unsupported base asset '{base}'"))?;
1416 let url = format!("{}/0/public/Ticker?pair={pair}", source.endpoint);
1417 let pair_owned = pair.to_string();
1418 (
1419 url,
1420 Box::new(move |v: &serde_json::Value| {
1421 let result = v.get("result")?;
1422 let ticker = result
1423 .get(&pair_owned)
1424 .or_else(|| result.as_object().and_then(|m| m.values().next()))?;
1425 let price_str = ticker.get("c")?.as_array()?.first()?.as_str()?;
1426 price_str.parse::<f64>().ok()
1427 }),
1428 )
1429 }
1430 ExchangeRateSourceType::CoinGecko => {
1431 let coin_id = coingecko_coin_id(base)
1432 .ok_or_else(|| format!("coingecko: unsupported base asset '{base}'"))?;
1433 let vs = quote_currency.to_ascii_lowercase();
1434 let url = format!(
1435 "{}/simple/price?ids={coin_id}&vs_currencies={vs}",
1436 source.endpoint
1437 );
1438 let coin_id_owned = coin_id.to_string();
1439 let vs_owned = vs.clone();
1440 (
1441 url,
1442 Box::new(move |v: &serde_json::Value| {
1443 v.get(&coin_id_owned)?.get(&vs_owned)?.as_f64()
1444 }),
1445 )
1446 }
1447 ExchangeRateSourceType::Generic => {
1448 let sep = if source.endpoint.contains('?') {
1449 '&'
1450 } else {
1451 '?'
1452 };
1453 let url = format!(
1454 "{}{sep}base={}"e={}",
1455 source.endpoint,
1456 base.to_ascii_uppercase(),
1457 quote_currency.to_ascii_uppercase()
1458 );
1459 (url, Box::new(extract_price_generic))
1460 }
1461 };
1462
1463 let mut req = client.get(&url);
1464 if let Some(key) = &source.api_key {
1465 req = req.header("Authorization", format!("Bearer {key}"));
1466 req = req.header("X-Api-Key", key);
1467 }
1468
1469 let resp = req
1470 .send()
1471 .await
1472 .map_err(|e| format!("request failed: {e}"))?;
1473 if !resp.status().is_success() {
1474 return Err(format!("status {}", resp.status()));
1475 }
1476
1477 let value: serde_json::Value = resp
1478 .json()
1479 .await
1480 .map_err(|e| format!("parse failed: {e}"))?;
1481
1482 extract_fn(&value).ok_or_else(|| "could not extract price from response".to_string())
1483}
1484
1485#[cfg(feature = "redb")]
1486fn encode<T: Serialize>(value: &T) -> Result<String, PayError> {
1487 serde_json::to_string(value)
1488 .map_err(|e| PayError::InternalError(format!("spend encode failed: {e}")))
1489}
1490
1491#[cfg(feature = "redb")]
1492fn decode<T: DeserializeOwned>(encoded: &str) -> Result<T, PayError> {
1493 serde_json::from_str(encoded).map_err(|e| {
1494 let preview_len = encoded.len().min(48);
1495 let preview = &encoded[..preview_len];
1496 PayError::InternalError(format!(
1497 "spend decode failed (len={}, preview={}): {e}",
1498 encoded.len(),
1499 preview
1500 ))
1501 })
1502}
1503
1504#[cfg(feature = "redb")]
1505fn prepend_err(prefix: &str, err: PayError) -> PayError {
1506 match err {
1507 PayError::InternalError(msg) => PayError::InternalError(format!("{prefix}: {msg}")),
1508 other => other,
1509 }
1510}
1511
1512fn generate_rule_identifier() -> Result<String, PayError> {
1513 let mut buf = [0u8; 4];
1514 getrandom::fill(&mut buf).map_err(|e| PayError::InternalError(format!("rng failed: {e}")))?;
1515 Ok(format!("r_{}", hex::encode(buf)))
1516}
1517
1518fn validate_limit(
1519 rule: &SpendLimit,
1520 exchange_rate: Option<&ExchangeRateConfig>,
1521) -> Result<(), PayError> {
1522 if rule.window_s == 0 {
1523 return Err(PayError::InvalidAmount(
1524 "limit rule has zero window_s".to_string(),
1525 ));
1526 }
1527 if rule.max_spend == 0 {
1528 return Err(PayError::InvalidAmount(
1529 "limit rule has zero max_spend".to_string(),
1530 ));
1531 }
1532
1533 match rule.scope {
1534 SpendScope::GlobalUsdCents => {
1535 if rule.network.is_some() || rule.wallet.is_some() {
1536 return Err(PayError::InvalidAmount(
1537 "scope=global-usd-cents cannot set network/wallet".to_string(),
1538 ));
1539 }
1540 if rule.token.is_some() {
1541 return Err(PayError::InvalidAmount(
1542 "scope=global-usd-cents cannot set token".to_string(),
1543 ));
1544 }
1545 }
1546 SpendScope::Network => {
1547 if rule.network.as_deref().unwrap_or("").trim().is_empty() {
1548 return Err(PayError::InvalidAmount(
1549 "scope=network requires network".to_string(),
1550 ));
1551 }
1552 if rule.wallet.is_some() {
1553 return Err(PayError::InvalidAmount(
1554 "scope=network cannot set wallet".to_string(),
1555 ));
1556 }
1557 }
1558 SpendScope::Wallet => {
1559 if rule.network.as_deref().unwrap_or("").trim().is_empty() {
1560 return Err(PayError::InvalidAmount(
1561 "scope=wallet requires network".to_string(),
1562 ));
1563 }
1564 if rule.wallet.as_deref().unwrap_or("").trim().is_empty() {
1565 return Err(PayError::InvalidAmount(
1566 "scope=wallet requires wallet".to_string(),
1567 ));
1568 }
1569 }
1570 }
1571
1572 if rule.scope == SpendScope::GlobalUsdCents && exchange_rate.is_none() {
1573 return Err(PayError::InvalidAmount(
1574 "scope=global-usd-cents requires config.exchange_rate".to_string(),
1575 ));
1576 }
1577 Ok(())
1578}
1579
1580#[cfg(feature = "redb")]
1581fn load_rules(read_txn: &redb::ReadTransaction) -> Result<Vec<SpendLimit>, PayError> {
1582 let Ok(rule_table) = read_txn.open_table(RULE_BY_ID) else {
1583 return Ok(vec![]);
1584 };
1585 rule_table
1586 .iter()
1587 .map_err(|e| PayError::InternalError(format!("spend iterate rules: {e}")))?
1588 .map(|entry| {
1589 let (_k, v) = entry
1590 .map_err(|e| PayError::InternalError(format!("spend read rule entry: {e}")))?;
1591 decode::<SpendLimit>(v.value()).map_err(|e| prepend_err("spend decode rule", e))
1592 })
1593 .collect()
1594}
1595
1596#[cfg(feature = "redb")]
1597fn load_reservations(read_txn: &redb::ReadTransaction) -> Result<Vec<SpendReservation>, PayError> {
1598 let Ok(table) = read_txn.open_table(RESERVATION_BY_ID) else {
1599 return Ok(vec![]);
1600 };
1601 table
1602 .iter()
1603 .map_err(|e| PayError::InternalError(format!("spend iterate reservations: {e}")))?
1604 .map(|entry| {
1605 let (_k, v) = entry
1606 .map_err(|e| PayError::InternalError(format!("spend read reservation: {e}")))?;
1607 decode::<SpendReservation>(v.value())
1608 .map_err(|e| prepend_err("spend decode reservation", e))
1609 })
1610 .collect()
1611}
1612
1613#[cfg(feature = "redb")]
1614fn expire_pending(_table: &mut redb::Table<u64, &str>, _now_ms: u64) -> Result<(), PayError> {
1615 Ok(())
1616}
1617
1618fn amount_for_rule(
1619 _rule: &SpendLimit,
1620 amount_native: u64,
1621 amount_usd_cents: Option<u64>,
1622 use_usd: bool,
1623) -> Result<u64, PayError> {
1624 if use_usd {
1625 amount_usd_cents.ok_or_else(|| {
1626 PayError::InternalError("missing USD amount for non-native unit rule".to_string())
1627 })
1628 } else {
1629 Ok(amount_native)
1630 }
1631}
1632
1633fn reservation_active_for_window(r: &SpendReservation, now_ms: u64) -> bool {
1634 match r.status {
1635 ReservationStatus::Confirmed => true,
1636 ReservationStatus::Pending => r.expires_at_epoch_ms > now_ms,
1637 ReservationStatus::Cancelled | ReservationStatus::Expired => false,
1638 }
1639}
1640
1641fn rule_matches_context(
1642 rule: &SpendLimit,
1643 network: &str,
1644 wallet: Option<&str>,
1645 token: Option<&str>,
1646) -> bool {
1647 if let Some(rule_token) = &rule.token {
1648 match token {
1649 Some(ctx_token) if ctx_token.eq_ignore_ascii_case(rule_token) => {}
1650 _ => return false,
1651 }
1652 }
1653 match rule.scope {
1654 SpendScope::GlobalUsdCents => true,
1655 SpendScope::Network => rule.network.as_deref() == Some(network),
1656 SpendScope::Wallet => {
1657 rule.network.as_deref() == Some(network) && rule.wallet.as_deref() == wallet
1658 }
1659 }
1660}
1661
1662fn scope_key(rule: &SpendLimit) -> String {
1663 match rule.scope {
1664 SpendScope::GlobalUsdCents => "global-usd-cents".to_string(),
1665 SpendScope::Network => rule.network.clone().unwrap_or_default(),
1666 SpendScope::Wallet => format!(
1667 "{}/{}",
1668 rule.network.clone().unwrap_or_default(),
1669 rule.wallet.clone().unwrap_or_default()
1670 ),
1671 }
1672}
1673
1674fn spent_in_window(
1675 rule: &SpendLimit,
1676 reservations: &[SpendReservation],
1677 now_ms: u64,
1678 use_usd: bool,
1679) -> Result<(u64, Option<u64>), PayError> {
1680 let window_ms = rule.window_s.saturating_mul(1000);
1681 let cutoff = now_ms.saturating_sub(window_ms);
1682
1683 let mut spent = 0u64;
1684 let mut oldest: Option<u64> = None;
1685
1686 for r in reservations {
1687 if !reservation_active_for_window(r, now_ms) {
1688 continue;
1689 }
1690 if r.created_at_epoch_ms < cutoff {
1691 continue;
1692 }
1693 if !rule_matches_context(rule, &r.network, r.wallet.as_deref(), r.token.as_deref()) {
1694 continue;
1695 }
1696
1697 let amount = if use_usd {
1698 r.amount_usd_cents.ok_or_else(|| {
1699 PayError::InternalError("reservation missing USD amount".to_string())
1700 })?
1701 } else {
1702 r.amount_native
1703 };
1704 spent = spent.saturating_add(amount);
1705 oldest = Some(oldest.map_or(r.created_at_epoch_ms, |v| v.min(r.created_at_epoch_ms)));
1706 }
1707
1708 Ok((spent, oldest))
1709}
1710
1711#[cfg(feature = "redb")]
1712fn next_counter(write_txn: &redb::WriteTransaction, key: &str) -> Result<u64, PayError> {
1713 let mut meta = write_txn
1714 .open_table(META_COUNTER)
1715 .map_err(|e| PayError::InternalError(format!("spend open meta table: {e}")))?;
1716 let current = match meta
1717 .get(key)
1718 .map_err(|e| PayError::InternalError(format!("spend read counter {key}: {e}")))?
1719 {
1720 Some(v) => v.value(),
1721 None => 0,
1722 };
1723 let next = current.saturating_add(1);
1724 meta.insert(key, next)
1725 .map_err(|e| PayError::InternalError(format!("spend write counter {key}: {e}")))?;
1726 Ok(next)
1727}
1728
1729#[cfg(test)]
1730mod tests {
1731 use super::*;
1732
1733 fn make_limit(scope: SpendScope, network: Option<&str>, wallet: Option<&str>) -> SpendLimit {
1734 SpendLimit {
1735 rule_id: None,
1736 scope,
1737 network: network.map(|s| s.to_string()),
1738 wallet: wallet.map(|s| s.to_string()),
1739 window_s: 3600,
1740 max_spend: 1000,
1741 token: None,
1742 }
1743 }
1744
1745 #[cfg(feature = "redb")]
1746 #[tokio::test]
1747 async fn provider_limit_reserve_and_confirm() {
1748 let tmp = tempfile::tempdir().unwrap();
1749 let ledger = SpendLedger::new(tmp.path().to_str().unwrap(), None);
1750
1751 ledger
1752 .set_limits(&[make_limit(SpendScope::Network, Some("cashu"), None)])
1753 .await
1754 .unwrap();
1755
1756 let ctx = SpendContext {
1757 network: "cashu".to_string(),
1758 wallet: Some("w_01".to_string()),
1759 amount_native: 400,
1760 token: None,
1761 };
1762 let r1 = ledger.reserve("op_1", &ctx).await.unwrap();
1763 ledger.confirm(r1).await.unwrap();
1764
1765 let r2 = ledger.reserve("op_2", &ctx).await.unwrap();
1766 let err = ledger.reserve("op_3", &ctx).await.unwrap_err();
1767 assert!(matches!(err, PayError::LimitExceeded { .. }));
1768
1769 ledger.cancel(r2).await.unwrap();
1770 }
1771
1772 #[cfg(feature = "redb")]
1773 #[tokio::test]
1774 async fn wallet_scope_requires_wallet_context() {
1775 let tmp = tempfile::tempdir().unwrap();
1776 let ledger = SpendLedger::new(tmp.path().to_str().unwrap(), None);
1777
1778 ledger
1779 .set_limits(&[make_limit(SpendScope::Wallet, Some("cashu"), Some("w_abc"))])
1780 .await
1781 .unwrap();
1782
1783 let ctx = SpendContext {
1784 network: "cashu".to_string(),
1785 wallet: None,
1786 amount_native: 1,
1787 token: None,
1788 };
1789 let err = ledger.reserve("op_1", &ctx).await.unwrap_err();
1790 assert!(matches!(err, PayError::InvalidAmount(_)));
1791 }
1792
1793 #[tokio::test]
1794 async fn global_usd_cents_scope_requires_exchange_rate_config() {
1795 let tmp = tempfile::tempdir().unwrap();
1796 let ledger = SpendLedger::new(tmp.path().to_str().unwrap(), None);
1797
1798 let err = ledger
1799 .set_limits(&[SpendLimit {
1800 rule_id: None,
1801 scope: SpendScope::GlobalUsdCents,
1802 network: None,
1803 wallet: None,
1804 window_s: 3600,
1805 max_spend: 100,
1806 token: None,
1807 }])
1808 .await
1809 .unwrap_err();
1810
1811 assert!(matches!(err, PayError::InvalidAmount(_)));
1812 }
1813
1814 #[cfg(feature = "redb")]
1815 #[tokio::test]
1816 async fn network_scope_native_token_ok_without_exchange_rate() {
1817 let tmp = tempfile::tempdir().unwrap();
1818 let ledger = SpendLedger::new(tmp.path().to_str().unwrap(), None);
1819
1820 ledger
1821 .set_limits(&[SpendLimit {
1822 rule_id: None,
1823 scope: SpendScope::Network,
1824 network: Some("cashu".to_string()),
1825 wallet: None,
1826 window_s: 3600,
1827 max_spend: 100,
1828 token: None,
1829 }])
1830 .await
1831 .expect("network scope should not require exchange_rate");
1832 }
1833}