1#![cfg_attr(not(any(feature = "redb", feature = "postgres")), allow(dead_code))]
2
3pub mod tokens;
4
5use crate::provider::PayError;
6#[cfg(feature = "exchange-rate")]
7use crate::types::ExchangeRateSourceType;
8use crate::types::{ExchangeRateConfig, SpendLimit, SpendLimitStatus, SpendScope};
9#[cfg(feature = "redb")]
10use serde::de::DeserializeOwned;
11use serde::{Deserialize, Serialize};
12use tokio::sync::Mutex;
13
14#[cfg(feature = "redb")]
15use crate::store::db;
16#[cfg(feature = "redb")]
17use redb::{Database, ReadableDatabase, ReadableTable, TableDefinition};
18#[cfg(feature = "redb")]
19use std::path::{Path, PathBuf};
20
21#[cfg(feature = "redb")]
22const META_COUNTER: TableDefinition<&str, u64> = TableDefinition::new("meta_counter");
23#[cfg(feature = "redb")]
24const RULE_BY_ID: TableDefinition<&str, &str> = TableDefinition::new("rule_by_id_v3");
25#[cfg(feature = "redb")]
26const RESERVATION_BY_ID: TableDefinition<u64, &str> = TableDefinition::new("reservation_by_id");
27#[cfg(feature = "redb")]
28const RESERVATION_ID_BY_OP_ID: TableDefinition<&str, u64> =
29 TableDefinition::new("reservation_id_by_op_id");
30#[cfg(feature = "redb")]
31const SPEND_EVENT_BY_ID: TableDefinition<u64, &str> = TableDefinition::new("spend_event_by_id");
32#[cfg(feature = "redb")]
33const FX_QUOTE_BY_PAIR: TableDefinition<&str, &str> = TableDefinition::new("quote_by_pair");
34#[cfg(feature = "redb")]
35const NEXT_RESERVATION_ID_KEY: &str = "next_reservation_id";
36#[cfg(feature = "redb")]
37const NEXT_EVENT_ID_KEY: &str = "next_event_id";
38#[cfg(feature = "redb")]
39const SPEND_VERSION: u64 = 1;
40#[cfg(feature = "redb")]
41const FX_CACHE_VERSION: u64 = 1;
42
43#[derive(Debug, Clone)]
44pub struct SpendContext {
45 pub network: String,
46 pub wallet: Option<String>,
47 pub amount_native: u64,
48 pub token: Option<String>,
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
52enum ReservationStatus {
53 Pending,
54 Confirmed,
55 Cancelled,
56 Expired,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
60struct SpendReservation {
61 reservation_id: u64,
62 op_id: String,
63 network: String,
64 wallet: Option<String>,
65 #[serde(default)]
66 token: Option<String>,
67 amount_native: u64,
68 amount_usd_cents: Option<u64>,
69 status: ReservationStatus,
70 created_at_epoch_ms: u64,
71 expires_at_epoch_ms: u64,
72 finalized_at_epoch_ms: Option<u64>,
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
76struct SpendEvent {
77 event_id: u64,
78 reservation_id: u64,
79 op_id: String,
80 network: String,
81 wallet: Option<String>,
82 #[serde(default)]
83 token: Option<String>,
84 amount_native: u64,
85 amount_usd_cents: Option<u64>,
86 created_at_epoch_ms: u64,
87 confirmed_at_epoch_ms: u64,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91struct ExchangeRateQuote {
92 pair: String,
93 source: String,
94 price: f64,
95 fetched_at_epoch_ms: u64,
96 expires_at_epoch_ms: u64,
97}
98
99#[allow(dead_code)] enum SpendBackend {
105 #[cfg(feature = "redb")]
106 Redb {
107 data_dir: String,
108 },
109 #[cfg(feature = "postgres")]
110 Postgres {
111 pool: sqlx::PgPool,
112 },
113 None,
114}
115
116pub struct SpendLedger {
121 backend: SpendBackend,
122 exchange_rate: Option<ExchangeRateConfig>,
123 mu: Mutex<()>,
124 fx_stale_warned: std::sync::atomic::AtomicBool,
126}
127
128impl SpendLedger {
129 pub fn new(data_dir: &str, exchange_rate: Option<ExchangeRateConfig>) -> Self {
130 #[cfg(feature = "redb")]
131 let backend = SpendBackend::Redb {
132 data_dir: data_dir.to_string(),
133 };
134 #[cfg(not(feature = "redb"))]
135 let backend = {
136 let _ = data_dir;
137 SpendBackend::None
138 };
139 Self {
140 backend,
141 exchange_rate,
142 mu: Mutex::new(()),
143 fx_stale_warned: std::sync::atomic::AtomicBool::new(false),
144 }
145 }
146
147 #[cfg(feature = "postgres")]
148 pub fn new_postgres(pool: sqlx::PgPool, exchange_rate: Option<ExchangeRateConfig>) -> Self {
149 Self {
150 backend: SpendBackend::Postgres { pool },
151 exchange_rate,
152 mu: Mutex::new(()),
153 fx_stale_warned: std::sync::atomic::AtomicBool::new(false),
154 }
155 }
156
157 pub fn take_fx_stale_warning(&self) -> bool {
159 self.fx_stale_warned
160 .swap(false, std::sync::atomic::Ordering::Relaxed)
161 }
162
163 pub async fn add_limit(&self, limit: &mut SpendLimit) -> Result<String, PayError> {
165 validate_limit(limit, self.exchange_rate.as_ref())?;
166
167 let _guard = self.mu.lock().await;
168
169 match &self.backend {
170 #[cfg(feature = "redb")]
171 SpendBackend::Redb { .. } => self.add_limit_redb(limit),
172 #[cfg(feature = "postgres")]
173 SpendBackend::Postgres { .. } => self.add_limit_postgres(limit).await,
174 SpendBackend::None => Err(PayError::NotImplemented(
175 "no storage backend for spend limits".to_string(),
176 )),
177 }
178 }
179
180 pub async fn remove_limit(&self, _rule_id: &str) -> Result<(), PayError> {
182 let _guard = self.mu.lock().await;
183
184 match &self.backend {
185 #[cfg(feature = "redb")]
186 SpendBackend::Redb { .. } => self.remove_limit_redb(_rule_id),
187 #[cfg(feature = "postgres")]
188 SpendBackend::Postgres { .. } => self.remove_limit_postgres(_rule_id).await,
189 SpendBackend::None => Err(PayError::NotImplemented(
190 "no storage backend for spend limits".to_string(),
191 )),
192 }
193 }
194
195 pub async fn set_limits(&self, limits: &[SpendLimit]) -> Result<(), PayError> {
197 for limit in limits {
198 validate_limit(limit, self.exchange_rate.as_ref())?;
199 }
200
201 let _guard = self.mu.lock().await;
202
203 match &self.backend {
204 #[cfg(feature = "redb")]
205 SpendBackend::Redb { .. } => self.set_limits_redb(limits),
206 #[cfg(feature = "postgres")]
207 SpendBackend::Postgres { .. } => self.set_limits_postgres(limits).await,
208 SpendBackend::None => Err(PayError::NotImplemented(
209 "no storage backend for spend limits".to_string(),
210 )),
211 }
212 }
213
214 pub async fn get_status(&self) -> Result<Vec<SpendLimitStatus>, PayError> {
216 let _guard = self.mu.lock().await;
217
218 match &self.backend {
219 #[cfg(feature = "redb")]
220 SpendBackend::Redb { .. } => self.get_status_redb(),
221 #[cfg(feature = "postgres")]
222 SpendBackend::Postgres { .. } => self.get_status_postgres().await,
223 SpendBackend::None => Ok(Vec::new()),
224 }
225 }
226
227 pub async fn reserve(&self, op_id: &str, ctx: &SpendContext) -> Result<u64, PayError> {
229 if op_id.trim().is_empty() {
230 return Err(PayError::InvalidAmount("op_id cannot be empty".to_string()));
231 }
232 if ctx.network.trim().is_empty() {
233 return Err(PayError::InvalidAmount(
234 "network cannot be empty for spend check".to_string(),
235 ));
236 }
237
238 let _guard = self.mu.lock().await;
239
240 match &self.backend {
241 #[cfg(feature = "redb")]
242 SpendBackend::Redb { .. } => self.reserve_redb(op_id, ctx).await,
243 #[cfg(feature = "postgres")]
244 SpendBackend::Postgres { .. } => self.reserve_postgres(op_id, ctx).await,
245 SpendBackend::None => Err(PayError::NotImplemented(
246 "no storage backend for spend limits".to_string(),
247 )),
248 }
249 }
250
251 pub async fn confirm(&self, _reservation_id: u64) -> Result<(), PayError> {
252 let _guard = self.mu.lock().await;
253
254 match &self.backend {
255 #[cfg(feature = "redb")]
256 SpendBackend::Redb { .. } => self.confirm_redb(_reservation_id),
257 #[cfg(feature = "postgres")]
258 SpendBackend::Postgres { .. } => self.confirm_postgres(_reservation_id).await,
259 SpendBackend::None => Err(PayError::NotImplemented(
260 "no storage backend for spend limits".to_string(),
261 )),
262 }
263 }
264
265 pub async fn cancel(&self, _reservation_id: u64) -> Result<(), PayError> {
266 let _guard = self.mu.lock().await;
267
268 match &self.backend {
269 #[cfg(feature = "redb")]
270 SpendBackend::Redb { .. } => self.cancel_redb(_reservation_id),
271 #[cfg(feature = "postgres")]
272 SpendBackend::Postgres { .. } => self.cancel_postgres(_reservation_id).await,
273 SpendBackend::None => Ok(()),
274 }
275 }
276}
277
278#[cfg(feature = "redb")]
283impl SpendLedger {
284 fn spend_db_path(&self) -> PathBuf {
285 match &self.backend {
286 SpendBackend::Redb { data_dir } => Path::new(data_dir).join("spend").join("spend.redb"),
287 #[allow(unreachable_patterns)]
288 _ => PathBuf::new(),
289 }
290 }
291
292 fn exchange_rate_db_path(&self) -> PathBuf {
293 match &self.backend {
294 SpendBackend::Redb { data_dir } => Path::new(data_dir)
295 .join("spend")
296 .join("exchange-rate-cache.redb"),
297 #[allow(unreachable_patterns)]
298 _ => PathBuf::new(),
299 }
300 }
301
302 fn open_spend_db(&self) -> Result<Database, PayError> {
303 db::open_and_migrate(
304 &self.spend_db_path(),
305 SPEND_VERSION,
306 &[
307 &|_db: &Database| Ok(()),
309 ],
310 )
311 }
312
313 fn open_exchange_rate_db(&self) -> Result<Database, PayError> {
314 db::open_and_migrate(
315 &self.exchange_rate_db_path(),
316 FX_CACHE_VERSION,
317 &[
318 &|_db: &Database| Ok(()),
320 ],
321 )
322 }
323
324 fn add_limit_redb(&self, limit: &mut SpendLimit) -> Result<String, PayError> {
325 let db = self.open_spend_db()?;
326 let rule_id = generate_rule_identifier()?;
327 limit.rule_id = Some(rule_id.clone());
328 let encoded = encode(limit)?;
329 let write_txn = db
330 .begin_write()
331 .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
332 {
333 let mut rule_table = write_txn
334 .open_table(RULE_BY_ID)
335 .map_err(|e| PayError::InternalError(format!("spend open rule table: {e}")))?;
336 rule_table
337 .insert(rule_id.as_str(), encoded.as_str())
338 .map_err(|e| PayError::InternalError(format!("spend insert rule: {e}")))?;
339 }
340 write_txn
341 .commit()
342 .map_err(|e| PayError::InternalError(format!("spend commit add_limit: {e}")))?;
343 Ok(rule_id)
344 }
345
346 fn remove_limit_redb(&self, rule_id: &str) -> Result<(), PayError> {
347 let db = self.open_spend_db()?;
348 let write_txn = db
349 .begin_write()
350 .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
351 {
352 let mut rule_table = write_txn
353 .open_table(RULE_BY_ID)
354 .map_err(|e| PayError::InternalError(format!("spend open rule table: {e}")))?;
355 let existed = rule_table
356 .remove(rule_id)
357 .map_err(|e| PayError::InternalError(format!("spend remove rule: {e}")))?;
358 if existed.is_none() {
359 return Err(PayError::InvalidAmount(format!(
360 "rule_id '{rule_id}' not found"
361 )));
362 }
363 }
364 write_txn
365 .commit()
366 .map_err(|e| PayError::InternalError(format!("spend commit remove_limit: {e}")))
367 }
368
369 fn set_limits_redb(&self, limits: &[SpendLimit]) -> Result<(), PayError> {
370 let db = self.open_spend_db()?;
371 let write_txn = db
372 .begin_write()
373 .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
374 {
375 let mut rule_table = write_txn
376 .open_table(RULE_BY_ID)
377 .map_err(|e| PayError::InternalError(format!("spend open rule table: {e}")))?;
378 let existing_ids = rule_table
380 .iter()
381 .map_err(|e| PayError::InternalError(format!("spend iterate rules: {e}")))?
382 .map(|entry| {
383 entry
384 .map(|(k, _)| k.value().to_string())
385 .map_err(|e| PayError::InternalError(format!("spend read rule key: {e}")))
386 })
387 .collect::<Result<Vec<_>, _>>()?;
388 for rid in existing_ids {
389 rule_table
390 .remove(rid.as_str())
391 .map_err(|e| PayError::InternalError(format!("spend remove rule: {e}")))?;
392 }
393
394 for limit in limits {
396 let mut rule = limit.clone();
397 let rid = generate_rule_identifier()?;
398 rule.rule_id = Some(rid.clone());
399 let encoded = encode(&rule)?;
400 rule_table
401 .insert(rid.as_str(), encoded.as_str())
402 .map_err(|e| PayError::InternalError(format!("spend insert rule: {e}")))?;
403 }
404 }
405 write_txn
406 .commit()
407 .map_err(|e| PayError::InternalError(format!("spend commit set_limits: {e}")))
408 }
409
410 fn get_status_redb(&self) -> Result<Vec<SpendLimitStatus>, PayError> {
411 let db = self.open_spend_db()?;
412 let read_txn = db
413 .begin_read()
414 .map_err(|e| PayError::InternalError(format!("spend begin_read: {e}")))?;
415 let rules = load_rules(&read_txn)?;
416 let reservations = load_reservations(&read_txn)?;
417 let now = now_epoch_ms();
418 let mut out = Vec::with_capacity(rules.len());
419 for rule in rules {
420 let use_usd = rule.scope == SpendScope::GlobalUsdCents;
421 let (spent, oldest_ts) = spent_in_window(&rule, &reservations, now, use_usd)?;
422 let remaining = rule.max_spend.saturating_sub(spent);
423 let window_ms = rule.window_s.saturating_mul(1000);
424 let window_reset_s = oldest_ts
425 .map(|oldest| (oldest.saturating_add(window_ms)).saturating_sub(now) / 1000)
426 .unwrap_or(0);
427 out.push(SpendLimitStatus {
428 rule_id: rule.rule_id.clone().unwrap_or_default(),
429 scope: rule.scope,
430 network: rule.network.clone(),
431 wallet: rule.wallet.clone(),
432 window_s: rule.window_s,
433 max_spend: rule.max_spend,
434 spent,
435 remaining,
436 token: rule.token.clone(),
437 window_reset_s,
438 });
439 }
440 Ok(out)
441 }
442
443 async fn reserve_redb(&self, op_id: &str, ctx: &SpendContext) -> Result<u64, PayError> {
444 let now = now_epoch_ms();
445 let db = self.open_spend_db()?;
446
447 let read_txn = db
448 .begin_read()
449 .map_err(|e| PayError::InternalError(format!("spend begin_read: {e}")))?;
450 let rules = load_rules(&read_txn)?;
451
452 if rules.iter().any(|r| {
453 r.scope == SpendScope::Wallet
454 && r.network.as_deref() == Some(ctx.network.as_str())
455 && ctx.wallet.is_none()
456 }) {
457 return Err(PayError::InvalidAmount(
458 "wallet-scoped limits require an explicit wallet".to_string(),
459 ));
460 }
461
462 let needs_usd = rules.iter().any(|r| r.scope == SpendScope::GlobalUsdCents);
464 let amount_usd_cents = if needs_usd {
465 Some(
466 self.amount_to_usd_cents(&ctx.network, ctx.token.as_deref(), ctx.amount_native)
467 .await?,
468 )
469 } else {
470 None
471 };
472
473 let write_txn = db
474 .begin_write()
475 .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
476
477 let mut encoded_blobs: Vec<String> = Vec::new();
478 let reservation_id = {
479 let mut reservation_index =
480 write_txn.open_table(RESERVATION_ID_BY_OP_ID).map_err(|e| {
481 PayError::InternalError(format!("spend open reservation op index: {e}"))
482 })?;
483 if let Some(existing) = reservation_index
484 .get(op_id)
485 .map_err(|e| PayError::InternalError(format!("spend read op index: {e}")))?
486 {
487 let existing_id = existing.value();
488 return Ok(existing_id);
489 }
490
491 let mut reservation_table = write_txn.open_table(RESERVATION_BY_ID).map_err(|e| {
492 PayError::InternalError(format!("spend open reservation table: {e}"))
493 })?;
494
495 expire_pending(&mut reservation_table, now)?;
496
497 let reservations = reservation_table
498 .iter()
499 .map_err(|e| PayError::InternalError(format!("spend iterate reservations: {e}")))?
500 .map(|entry| {
501 let (_k, v) = entry.map_err(|e| {
502 PayError::InternalError(format!("spend read reservation: {e}"))
503 })?;
504 decode::<SpendReservation>(v.value())
505 .map_err(|e| prepend_err("spend decode reservation", e))
506 })
507 .collect::<Result<Vec<_>, _>>()?;
508
509 for rule in rules.iter() {
510 if !rule_matches_context(
511 rule,
512 &ctx.network,
513 ctx.wallet.as_deref(),
514 ctx.token.as_deref(),
515 ) {
516 continue;
517 }
518
519 let use_usd = rule.scope == SpendScope::GlobalUsdCents;
520 let candidate_amount =
521 amount_for_rule(rule, ctx.amount_native, amount_usd_cents, use_usd)?;
522 let (spent, oldest_ts) = spent_in_window(rule, &reservations, now, use_usd)?;
523 if spent.saturating_add(candidate_amount) > rule.max_spend {
524 let window_ms = rule.window_s.saturating_mul(1000);
525 let remaining_s = oldest_ts
526 .map(|oldest| (oldest.saturating_add(window_ms)).saturating_sub(now) / 1000)
527 .unwrap_or(0);
528
529 return Err(PayError::LimitExceeded {
530 rule_id: rule.rule_id.clone().unwrap_or_default(),
531 scope: rule.scope,
532 scope_key: scope_key(rule),
533 spent,
534 max_spend: rule.max_spend,
535 token: rule.token.clone(),
536 remaining_s,
537 origin: None,
538 });
539 }
540 }
541
542 let reservation_id = next_counter(&write_txn, NEXT_RESERVATION_ID_KEY)?;
543 let reservation = SpendReservation {
544 reservation_id,
545 op_id: op_id.to_string(),
546 network: ctx.network.clone(),
547 wallet: ctx.wallet.clone(),
548 token: ctx.token.clone(),
549 amount_native: ctx.amount_native,
550 amount_usd_cents,
551 status: ReservationStatus::Pending,
552 created_at_epoch_ms: now,
553 expires_at_epoch_ms: now.saturating_add(300_000),
554 finalized_at_epoch_ms: None,
555 };
556 encoded_blobs.push(encode(&reservation)?);
557 let encoded = encoded_blobs
558 .last()
559 .ok_or_else(|| PayError::InternalError("missing reservation blob".to_string()))?;
560 reservation_table
561 .insert(reservation_id, encoded.as_str())
562 .map_err(|e| PayError::InternalError(format!("spend insert reservation: {e}")))?;
563 reservation_index
564 .insert(op_id, reservation_id)
565 .map_err(|e| PayError::InternalError(format!("spend insert op index: {e}")))?;
566 reservation_id
567 };
568
569 write_txn
570 .commit()
571 .map_err(|e| PayError::InternalError(format!("spend commit reserve: {e}")))?;
572 Ok(reservation_id)
573 }
574
575 fn confirm_redb(&self, reservation_id: u64) -> Result<(), PayError> {
576 let db = self.open_spend_db()?;
577 let now = now_epoch_ms();
578
579 let write_txn = db
580 .begin_write()
581 .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
582
583 let mut encoded_blobs: Vec<String> = Vec::new();
584 {
585 let mut reservation_table = write_txn.open_table(RESERVATION_BY_ID).map_err(|e| {
586 PayError::InternalError(format!("spend open reservation table: {e}"))
587 })?;
588 let Some(existing_bytes) = reservation_table
589 .get(reservation_id)
590 .map_err(|e| PayError::InternalError(format!("spend read reservation: {e}")))?
591 .map(|g| g.value().to_string())
592 else {
593 return Err(PayError::InternalError(format!(
594 "reservation {reservation_id} not found"
595 )));
596 };
597
598 let mut reservation: SpendReservation = decode(&existing_bytes)?;
599 if !matches!(reservation.status, ReservationStatus::Pending) {
600 return Ok(());
601 }
602
603 reservation.status = ReservationStatus::Confirmed;
604 reservation.finalized_at_epoch_ms = Some(now);
605 encoded_blobs.push(encode(&reservation)?);
606 let encoded = encoded_blobs
607 .last()
608 .ok_or_else(|| PayError::InternalError("missing reservation blob".to_string()))?;
609 reservation_table
610 .insert(reservation_id, encoded.as_str())
611 .map_err(|e| PayError::InternalError(format!("spend update reservation: {e}")))?;
612
613 let mut events = write_txn
614 .open_table(SPEND_EVENT_BY_ID)
615 .map_err(|e| PayError::InternalError(format!("spend open event table: {e}")))?;
616 let event_id = next_counter(&write_txn, NEXT_EVENT_ID_KEY)?;
617 let event = SpendEvent {
618 event_id,
619 reservation_id,
620 op_id: reservation.op_id,
621 network: reservation.network,
622 wallet: reservation.wallet,
623 token: reservation.token,
624 amount_native: reservation.amount_native,
625 amount_usd_cents: reservation.amount_usd_cents,
626 created_at_epoch_ms: reservation.created_at_epoch_ms,
627 confirmed_at_epoch_ms: now,
628 };
629 encoded_blobs.push(encode(&event)?);
630 let encoded_event = encoded_blobs
631 .last()
632 .ok_or_else(|| PayError::InternalError("missing event blob".to_string()))?;
633 events
634 .insert(event_id, encoded_event.as_str())
635 .map_err(|e| PayError::InternalError(format!("spend insert event: {e}")))?;
636 }
637
638 write_txn
639 .commit()
640 .map_err(|e| PayError::InternalError(format!("spend commit confirm: {e}")))
641 }
642
643 fn cancel_redb(&self, reservation_id: u64) -> Result<(), PayError> {
644 let db = self.open_spend_db()?;
645 let now = now_epoch_ms();
646
647 let write_txn = db
648 .begin_write()
649 .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
650
651 let mut encoded_blobs: Vec<String> = Vec::new();
652 {
653 let mut reservation_table = write_txn.open_table(RESERVATION_BY_ID).map_err(|e| {
654 PayError::InternalError(format!("spend open reservation table: {e}"))
655 })?;
656 let existing = reservation_table
657 .get(reservation_id)
658 .map_err(|e| PayError::InternalError(format!("spend read reservation: {e}")))?;
659 let existing_bytes = existing.map(|g| g.value().to_string());
660 if let Some(existing_bytes) = existing_bytes {
661 let mut reservation: SpendReservation = decode(&existing_bytes)?;
662 if matches!(reservation.status, ReservationStatus::Pending) {
663 reservation.status = ReservationStatus::Cancelled;
664 reservation.finalized_at_epoch_ms = Some(now);
665 encoded_blobs.push(encode(&reservation)?);
666 let encoded = encoded_blobs.last().ok_or_else(|| {
667 PayError::InternalError("missing reservation blob".to_string())
668 })?;
669 reservation_table
670 .insert(reservation_id, encoded.as_str())
671 .map_err(|e| {
672 PayError::InternalError(format!("spend update reservation: {e}"))
673 })?;
674 }
675 }
676 }
677
678 write_txn
679 .commit()
680 .map_err(|e| PayError::InternalError(format!("spend commit cancel: {e}")))
681 }
682}
683
684#[cfg(feature = "postgres")]
689impl SpendLedger {
690 fn pg_pool(&self) -> Result<&sqlx::PgPool, PayError> {
691 match &self.backend {
692 SpendBackend::Postgres { pool } => Ok(pool),
693 _ => Err(PayError::InternalError(
694 "expected postgres spend backend".to_string(),
695 )),
696 }
697 }
698
699 async fn add_limit_postgres(&self, limit: &mut SpendLimit) -> Result<String, PayError> {
700 let pool = self.pg_pool()?;
701 let rule_id = generate_rule_identifier()?;
702 limit.rule_id = Some(rule_id.clone());
703 let rule_json = serde_json::to_value(limit)
704 .map_err(|e| PayError::InternalError(format!("serialize spend rule: {e}")))?;
705
706 sqlx::query("INSERT INTO spend_rules (rule_id, rule) VALUES ($1, $2)")
707 .bind(&rule_id)
708 .bind(&rule_json)
709 .execute(pool)
710 .await
711 .map_err(|e| PayError::InternalError(format!("pg insert spend rule: {e}")))?;
712
713 Ok(rule_id)
714 }
715
716 async fn remove_limit_postgres(&self, rule_id: &str) -> Result<(), PayError> {
717 let pool = self.pg_pool()?;
718 let result = sqlx::query("DELETE FROM spend_rules WHERE rule_id = $1")
719 .bind(rule_id)
720 .execute(pool)
721 .await
722 .map_err(|e| PayError::InternalError(format!("pg delete spend rule: {e}")))?;
723
724 if result.rows_affected() == 0 {
725 return Err(PayError::InvalidAmount(format!(
726 "rule_id '{rule_id}' not found"
727 )));
728 }
729 Ok(())
730 }
731
732 async fn set_limits_postgres(&self, limits: &[SpendLimit]) -> Result<(), PayError> {
733 let pool = self.pg_pool()?;
734 let mut tx = pool
735 .begin()
736 .await
737 .map_err(|e| PayError::InternalError(format!("pg begin tx: {e}")))?;
738
739 sqlx::query("DELETE FROM spend_rules")
740 .execute(&mut *tx)
741 .await
742 .map_err(|e| PayError::InternalError(format!("pg clear spend rules: {e}")))?;
743
744 for limit in limits {
745 let mut rule = limit.clone();
746 let rid = generate_rule_identifier()?;
747 rule.rule_id = Some(rid.clone());
748 let rule_json = serde_json::to_value(&rule)
749 .map_err(|e| PayError::InternalError(format!("serialize spend rule: {e}")))?;
750 sqlx::query("INSERT INTO spend_rules (rule_id, rule) VALUES ($1, $2)")
751 .bind(&rid)
752 .bind(&rule_json)
753 .execute(&mut *tx)
754 .await
755 .map_err(|e| PayError::InternalError(format!("pg insert spend rule: {e}")))?;
756 }
757
758 tx.commit()
759 .await
760 .map_err(|e| PayError::InternalError(format!("pg commit set_limits: {e}")))
761 }
762
763 async fn get_status_postgres(&self) -> Result<Vec<SpendLimitStatus>, PayError> {
764 let pool = self.pg_pool()?;
765 let rules = pg_load_rules(pool).await?;
766 let reservations = pg_load_reservations(pool).await?;
767 let now = now_epoch_ms();
768
769 let mut out = Vec::with_capacity(rules.len());
770 for rule in rules {
771 let use_usd = rule.scope == SpendScope::GlobalUsdCents;
772 let (spent, oldest_ts) = spent_in_window(&rule, &reservations, now, use_usd)?;
773 let remaining = rule.max_spend.saturating_sub(spent);
774 let window_ms = rule.window_s.saturating_mul(1000);
775 let window_reset_s = oldest_ts
776 .map(|oldest| (oldest.saturating_add(window_ms)).saturating_sub(now) / 1000)
777 .unwrap_or(0);
778 out.push(SpendLimitStatus {
779 rule_id: rule.rule_id.clone().unwrap_or_default(),
780 scope: rule.scope,
781 network: rule.network.clone(),
782 wallet: rule.wallet.clone(),
783 window_s: rule.window_s,
784 max_spend: rule.max_spend,
785 spent,
786 remaining,
787 token: rule.token.clone(),
788 window_reset_s,
789 });
790 }
791 Ok(out)
792 }
793
794 async fn reserve_postgres(&self, op_id: &str, ctx: &SpendContext) -> Result<u64, PayError> {
795 use crate::store::postgres_store::SPEND_ADVISORY_LOCK_KEY;
796
797 let pool = self.pg_pool()?;
798 let now = now_epoch_ms();
799
800 let rules = pg_load_rules(pool).await?;
802 if rules.iter().any(|r| {
803 r.scope == SpendScope::Wallet
804 && r.network.as_deref() == Some(ctx.network.as_str())
805 && ctx.wallet.is_none()
806 }) {
807 return Err(PayError::InvalidAmount(
808 "wallet-scoped limits require an explicit wallet".to_string(),
809 ));
810 }
811
812 let needs_usd = rules.iter().any(|r| r.scope == SpendScope::GlobalUsdCents);
813 let amount_usd_cents = if needs_usd {
814 Some(
815 self.amount_to_usd_cents(&ctx.network, ctx.token.as_deref(), ctx.amount_native)
816 .await?,
817 )
818 } else {
819 None
820 };
821
822 let mut tx = pool
824 .begin()
825 .await
826 .map_err(|e| PayError::InternalError(format!("pg begin tx: {e}")))?;
827
828 sqlx::query("SELECT pg_advisory_xact_lock($1)")
829 .bind(SPEND_ADVISORY_LOCK_KEY)
830 .execute(&mut *tx)
831 .await
832 .map_err(|e| PayError::InternalError(format!("pg advisory lock: {e}")))?;
833
834 let existing: Option<(i64,)> =
836 sqlx::query_as("SELECT reservation_id FROM spend_reservations WHERE op_id = $1")
837 .bind(op_id)
838 .fetch_optional(&mut *tx)
839 .await
840 .map_err(|e| PayError::InternalError(format!("pg check op_id: {e}")))?;
841
842 if let Some((rid,)) = existing {
843 return Ok(rid as u64);
844 }
845
846 pg_expire_pending(&mut tx, now).await?;
848
849 let reservations = pg_load_reservations_tx(&mut tx).await?;
851
852 let rules = pg_load_rules_tx(&mut tx).await?;
854
855 for rule in rules.iter() {
857 if !rule_matches_context(
858 rule,
859 &ctx.network,
860 ctx.wallet.as_deref(),
861 ctx.token.as_deref(),
862 ) {
863 continue;
864 }
865
866 let use_usd = rule.scope == SpendScope::GlobalUsdCents;
867 let candidate_amount =
868 amount_for_rule(rule, ctx.amount_native, amount_usd_cents, use_usd)?;
869 let (spent, oldest_ts) = spent_in_window(rule, &reservations, now, use_usd)?;
870 if spent.saturating_add(candidate_amount) > rule.max_spend {
871 let window_ms = rule.window_s.saturating_mul(1000);
872 let remaining_s = oldest_ts
873 .map(|oldest| (oldest.saturating_add(window_ms)).saturating_sub(now) / 1000)
874 .unwrap_or(0);
875
876 return Err(PayError::LimitExceeded {
877 rule_id: rule.rule_id.clone().unwrap_or_default(),
878 scope: rule.scope,
879 scope_key: scope_key(rule),
880 spent,
881 max_spend: rule.max_spend,
882 token: rule.token.clone(),
883 remaining_s,
884 origin: None,
885 });
886 }
887 }
888
889 let reservation = SpendReservation {
891 reservation_id: 0, op_id: op_id.to_string(),
893 network: ctx.network.clone(),
894 wallet: ctx.wallet.clone(),
895 token: ctx.token.clone(),
896 amount_native: ctx.amount_native,
897 amount_usd_cents,
898 status: ReservationStatus::Pending,
899 created_at_epoch_ms: now,
900 expires_at_epoch_ms: now.saturating_add(300_000),
901 finalized_at_epoch_ms: None,
902 };
903 let reservation_json = serde_json::to_value(&reservation)
904 .map_err(|e| PayError::InternalError(format!("serialize reservation: {e}")))?;
905
906 let row: (i64,) = sqlx::query_as(
907 "INSERT INTO spend_reservations (op_id, reservation) \
908 VALUES ($1, $2) RETURNING reservation_id",
909 )
910 .bind(op_id)
911 .bind(&reservation_json)
912 .fetch_one(&mut *tx)
913 .await
914 .map_err(|e| PayError::InternalError(format!("pg insert reservation: {e}")))?;
915
916 let reservation_id = row.0 as u64;
917
918 let mut updated_json = reservation_json;
920 updated_json["reservation_id"] = serde_json::json!(reservation_id);
921 sqlx::query("UPDATE spend_reservations SET reservation = $1 WHERE reservation_id = $2")
922 .bind(&updated_json)
923 .bind(row.0)
924 .execute(&mut *tx)
925 .await
926 .map_err(|e| PayError::InternalError(format!("pg update reservation id: {e}")))?;
927
928 tx.commit()
929 .await
930 .map_err(|e| PayError::InternalError(format!("pg commit reserve: {e}")))?;
931
932 Ok(reservation_id)
933 }
934
935 async fn confirm_postgres(&self, reservation_id: u64) -> Result<(), PayError> {
936 let pool = self.pg_pool()?;
937 let now = now_epoch_ms();
938 let rid = reservation_id as i64;
939
940 let row: Option<(serde_json::Value,)> =
941 sqlx::query_as("SELECT reservation FROM spend_reservations WHERE reservation_id = $1")
942 .bind(rid)
943 .fetch_optional(pool)
944 .await
945 .map_err(|e| PayError::InternalError(format!("pg read reservation: {e}")))?;
946
947 let Some((res_json,)) = row else {
948 return Err(PayError::InternalError(format!(
949 "reservation {reservation_id} not found"
950 )));
951 };
952
953 let mut reservation: SpendReservation = serde_json::from_value(res_json)
954 .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))?;
955
956 if !matches!(reservation.status, ReservationStatus::Pending) {
957 return Ok(());
958 }
959
960 reservation.status = ReservationStatus::Confirmed;
961 reservation.finalized_at_epoch_ms = Some(now);
962 let updated_json = serde_json::to_value(&reservation)
963 .map_err(|e| PayError::InternalError(format!("serialize reservation: {e}")))?;
964
965 let event = SpendEvent {
966 event_id: 0, reservation_id,
968 op_id: reservation.op_id,
969 network: reservation.network,
970 wallet: reservation.wallet,
971 token: reservation.token,
972 amount_native: reservation.amount_native,
973 amount_usd_cents: reservation.amount_usd_cents,
974 created_at_epoch_ms: reservation.created_at_epoch_ms,
975 confirmed_at_epoch_ms: now,
976 };
977 let event_json = serde_json::to_value(&event)
978 .map_err(|e| PayError::InternalError(format!("serialize spend event: {e}")))?;
979
980 let mut tx = pool
981 .begin()
982 .await
983 .map_err(|e| PayError::InternalError(format!("pg begin tx: {e}")))?;
984
985 sqlx::query("UPDATE spend_reservations SET reservation = $1 WHERE reservation_id = $2")
986 .bind(&updated_json)
987 .bind(rid)
988 .execute(&mut *tx)
989 .await
990 .map_err(|e| PayError::InternalError(format!("pg update reservation: {e}")))?;
991
992 sqlx::query("INSERT INTO spend_events (reservation_id, event) VALUES ($1, $2)")
993 .bind(rid)
994 .bind(&event_json)
995 .execute(&mut *tx)
996 .await
997 .map_err(|e| PayError::InternalError(format!("pg insert spend event: {e}")))?;
998
999 tx.commit()
1000 .await
1001 .map_err(|e| PayError::InternalError(format!("pg commit confirm: {e}")))
1002 }
1003
1004 async fn cancel_postgres(&self, reservation_id: u64) -> Result<(), PayError> {
1005 let pool = self.pg_pool()?;
1006 let now = now_epoch_ms();
1007 let rid = reservation_id as i64;
1008
1009 let row: Option<(serde_json::Value,)> =
1010 sqlx::query_as("SELECT reservation FROM spend_reservations WHERE reservation_id = $1")
1011 .bind(rid)
1012 .fetch_optional(pool)
1013 .await
1014 .map_err(|e| PayError::InternalError(format!("pg read reservation: {e}")))?;
1015
1016 if let Some((res_json,)) = row {
1017 let mut reservation: SpendReservation = serde_json::from_value(res_json)
1018 .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))?;
1019
1020 if matches!(reservation.status, ReservationStatus::Pending) {
1021 reservation.status = ReservationStatus::Cancelled;
1022 reservation.finalized_at_epoch_ms = Some(now);
1023 let updated_json = serde_json::to_value(&reservation)
1024 .map_err(|e| PayError::InternalError(format!("serialize reservation: {e}")))?;
1025
1026 sqlx::query(
1027 "UPDATE spend_reservations SET reservation = $1 WHERE reservation_id = $2",
1028 )
1029 .bind(&updated_json)
1030 .bind(rid)
1031 .execute(pool)
1032 .await
1033 .map_err(|e| PayError::InternalError(format!("pg update reservation: {e}")))?;
1034 }
1035 }
1036
1037 Ok(())
1038 }
1039}
1040
1041#[cfg(feature = "postgres")]
1042async fn pg_load_rules(pool: &sqlx::PgPool) -> Result<Vec<SpendLimit>, PayError> {
1043 let rows: Vec<(serde_json::Value,)> =
1044 sqlx::query_as("SELECT rule FROM spend_rules ORDER BY rule_id")
1045 .fetch_all(pool)
1046 .await
1047 .map_err(|e| PayError::InternalError(format!("pg load spend rules: {e}")))?;
1048 rows.into_iter()
1049 .map(|(v,)| {
1050 serde_json::from_value(v)
1051 .map_err(|e| PayError::InternalError(format!("pg parse spend rule: {e}")))
1052 })
1053 .collect()
1054}
1055
1056#[cfg(feature = "postgres")]
1057async fn pg_load_rules_tx(
1058 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1059) -> Result<Vec<SpendLimit>, PayError> {
1060 let rows: Vec<(serde_json::Value,)> =
1061 sqlx::query_as("SELECT rule FROM spend_rules ORDER BY rule_id")
1062 .fetch_all(&mut **tx)
1063 .await
1064 .map_err(|e| PayError::InternalError(format!("pg load spend rules: {e}")))?;
1065 rows.into_iter()
1066 .map(|(v,)| {
1067 serde_json::from_value(v)
1068 .map_err(|e| PayError::InternalError(format!("pg parse spend rule: {e}")))
1069 })
1070 .collect()
1071}
1072
1073#[cfg(feature = "postgres")]
1074async fn pg_load_reservations(pool: &sqlx::PgPool) -> Result<Vec<SpendReservation>, PayError> {
1075 let rows: Vec<(serde_json::Value,)> =
1076 sqlx::query_as("SELECT reservation FROM spend_reservations ORDER BY reservation_id")
1077 .fetch_all(pool)
1078 .await
1079 .map_err(|e| PayError::InternalError(format!("pg load reservations: {e}")))?;
1080 rows.into_iter()
1081 .map(|(v,)| {
1082 serde_json::from_value(v)
1083 .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))
1084 })
1085 .collect()
1086}
1087
1088#[cfg(feature = "postgres")]
1089async fn pg_load_reservations_tx(
1090 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1091) -> Result<Vec<SpendReservation>, PayError> {
1092 let rows: Vec<(serde_json::Value,)> =
1093 sqlx::query_as("SELECT reservation FROM spend_reservations ORDER BY reservation_id")
1094 .fetch_all(&mut **tx)
1095 .await
1096 .map_err(|e| PayError::InternalError(format!("pg load reservations: {e}")))?;
1097 rows.into_iter()
1098 .map(|(v,)| {
1099 serde_json::from_value(v)
1100 .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))
1101 })
1102 .collect()
1103}
1104
1105#[cfg(feature = "postgres")]
1106async fn pg_expire_pending(
1107 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1108 now_ms: u64,
1109) -> Result<(), PayError> {
1110 let rows: Vec<(i64, serde_json::Value)> =
1112 sqlx::query_as("SELECT reservation_id, reservation FROM spend_reservations")
1113 .fetch_all(&mut **tx)
1114 .await
1115 .map_err(|e| {
1116 PayError::InternalError(format!("pg load reservations for expire: {e}"))
1117 })?;
1118
1119 for (rid, res_json) in rows {
1120 let mut reservation: SpendReservation = serde_json::from_value(res_json)
1121 .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))?;
1122 if matches!(reservation.status, ReservationStatus::Pending)
1123 && reservation.expires_at_epoch_ms <= now_ms
1124 {
1125 reservation.status = ReservationStatus::Expired;
1126 reservation.finalized_at_epoch_ms = Some(now_ms);
1127 let updated = serde_json::to_value(&reservation)
1128 .map_err(|e| PayError::InternalError(format!("serialize reservation: {e}")))?;
1129 sqlx::query("UPDATE spend_reservations SET reservation = $1 WHERE reservation_id = $2")
1130 .bind(&updated)
1131 .bind(rid)
1132 .execute(&mut **tx)
1133 .await
1134 .map_err(|e| PayError::InternalError(format!("pg expire reservation: {e}")))?;
1135 }
1136 }
1137 Ok(())
1138}
1139
1140impl SpendLedger {
1145 async fn amount_to_usd_cents(
1146 &self,
1147 network: &str,
1148 token: Option<&str>,
1149 amount_native: u64,
1150 ) -> Result<u64, PayError> {
1151 let (symbol, divisor) = token_asset(network, token).ok_or_else(|| {
1152 PayError::InvalidAmount(format!(
1153 "network '{network}' token '{token:?}' is unsupported for global-usd-cents limits"
1154 ))
1155 })?;
1156
1157 let quote = self.get_or_fetch_quote(symbol, "USD").await?;
1158
1159 let now = now_epoch_ms();
1162 if quote.expires_at_epoch_ms > 0 && now > quote.expires_at_epoch_ms {
1163 return Err(PayError::NetworkError(
1164 "exchange-rate quote expired — cannot convert to USD; check exchange_rate sources"
1165 .to_string(),
1166 ));
1167 }
1168
1169 let ttl_ms = quote
1172 .expires_at_epoch_ms
1173 .saturating_sub(quote.fetched_at_epoch_ms);
1174 let age_ms = now.saturating_sub(quote.fetched_at_epoch_ms);
1175 if ttl_ms > 0 && age_ms > ttl_ms * 4 / 5 {
1176 self.fx_stale_warned
1177 .store(true, std::sync::atomic::Ordering::Relaxed);
1178 }
1179
1180 let usd = (amount_native as f64 / divisor) * quote.price;
1181 if !usd.is_finite() || usd < 0f64 {
1182 return Err(PayError::InternalError(
1183 "invalid exchange-rate conversion result".to_string(),
1184 ));
1185 }
1186 Ok((usd * 100f64).round() as u64)
1187 }
1188
1189 async fn get_or_fetch_quote(
1190 &self,
1191 base: &str,
1192 quote: &str,
1193 ) -> Result<ExchangeRateQuote, PayError> {
1194 let pair = format!(
1195 "{}/{}",
1196 base.to_ascii_uppercase(),
1197 quote.to_ascii_uppercase()
1198 );
1199 let now = now_epoch_ms();
1200
1201 #[cfg(feature = "redb")]
1203 if let SpendBackend::Redb { .. } = &self.backend {
1204 let fx_db = self.open_exchange_rate_db()?;
1205 let read_txn = fx_db
1206 .begin_read()
1207 .map_err(|e| PayError::InternalError(format!("fx begin_read: {e}")))?;
1208 if let Ok(table) = read_txn.open_table(FX_QUOTE_BY_PAIR) {
1209 if let Some(entry) = table
1210 .get(pair.as_str())
1211 .map_err(|e| PayError::InternalError(format!("fx read quote: {e}")))?
1212 {
1213 let cached: ExchangeRateQuote = decode(entry.value())?;
1214 if cached.expires_at_epoch_ms > now {
1215 return Ok(cached);
1216 }
1217 }
1218 }
1219 }
1220
1221 #[cfg(feature = "postgres")]
1223 if let SpendBackend::Postgres { pool } = &self.backend {
1224 let row: Option<(serde_json::Value,)> =
1225 sqlx::query_as("SELECT quote FROM exchange_rate_cache WHERE pair = $1")
1226 .bind(&pair)
1227 .fetch_optional(pool)
1228 .await
1229 .map_err(|e| PayError::InternalError(format!("pg fx read cache: {e}")))?;
1230 if let Some((quote_json,)) = row {
1231 let cached: ExchangeRateQuote = serde_json::from_value(quote_json)
1232 .map_err(|e| PayError::InternalError(format!("pg fx parse cache: {e}")))?;
1233 if cached.expires_at_epoch_ms > now {
1234 return Ok(cached);
1235 }
1236 }
1237 }
1238
1239 let (fetched_price, source_name) = self.fetch_exchange_rate_http(base, quote).await?;
1240 let ttl_s = self
1241 .exchange_rate
1242 .as_ref()
1243 .map(|cfg| cfg.ttl_s)
1244 .unwrap_or(300)
1245 .max(1);
1246 let new_quote = ExchangeRateQuote {
1247 pair: pair.clone(),
1248 source: source_name,
1249 price: fetched_price,
1250 fetched_at_epoch_ms: now,
1251 expires_at_epoch_ms: now.saturating_add(ttl_s.saturating_mul(1000)),
1252 };
1253
1254 #[cfg(feature = "redb")]
1256 if let SpendBackend::Redb { .. } = &self.backend {
1257 let fx_db = self.open_exchange_rate_db()?;
1258 let write_txn = fx_db
1259 .begin_write()
1260 .map_err(|e| PayError::InternalError(format!("fx begin_write: {e}")))?;
1261 let mut encoded_blobs: Vec<String> = Vec::new();
1262 {
1263 let mut table = write_txn
1264 .open_table(FX_QUOTE_BY_PAIR)
1265 .map_err(|e| PayError::InternalError(format!("fx open quote table: {e}")))?;
1266 encoded_blobs.push(encode(&new_quote)?);
1267 let encoded = encoded_blobs
1268 .last()
1269 .ok_or_else(|| PayError::InternalError("missing quote blob".to_string()))?;
1270 table
1271 .insert(pair.as_str(), encoded.as_str())
1272 .map_err(|e| PayError::InternalError(format!("fx insert quote: {e}")))?;
1273 }
1274 write_txn
1275 .commit()
1276 .map_err(|e| PayError::InternalError(format!("fx commit write: {e}")))?;
1277 }
1278
1279 #[cfg(feature = "postgres")]
1281 if let SpendBackend::Postgres { pool } = &self.backend {
1282 let quote_json = serde_json::to_value(&new_quote)
1283 .map_err(|e| PayError::InternalError(format!("serialize fx quote: {e}")))?;
1284 let _ = sqlx::query(
1285 "INSERT INTO exchange_rate_cache (pair, quote) VALUES ($1, $2) \
1286 ON CONFLICT (pair) DO UPDATE SET quote = $2",
1287 )
1288 .bind(&pair)
1289 .bind("e_json)
1290 .execute(pool)
1291 .await;
1292 }
1293
1294 Ok(new_quote)
1295 }
1296
1297 #[cfg(feature = "exchange-rate")]
1298 async fn fetch_exchange_rate_http(
1299 &self,
1300 base: &str,
1301 quote_currency: &str,
1302 ) -> Result<(f64, String), PayError> {
1303 let cfg = self.exchange_rate.as_ref().cloned().unwrap_or_default();
1304
1305 if cfg.sources.is_empty() {
1306 return Err(PayError::InvalidAmount(
1307 "exchange_rate.sources is empty — no exchange-rate API configured".to_string(),
1308 ));
1309 }
1310
1311 let client = reqwest::Client::new();
1312 let mut last_err = String::new();
1313
1314 for source in &cfg.sources {
1315 match fetch_from_source(&client, source, base, quote_currency).await {
1316 Ok(price) => return Ok((price, source.endpoint.clone())),
1317 Err(e) => {
1318 last_err =
1319 format!("{} ({}): {e}", source.endpoint, source.source_type.as_str());
1320 }
1321 }
1322 }
1323
1324 Err(PayError::NetworkError(format!(
1325 "all exchange-rate sources failed; last: {last_err}"
1326 )))
1327 }
1328
1329 #[cfg(not(feature = "exchange-rate"))]
1330 async fn fetch_exchange_rate_http(
1331 &self,
1332 _base: &str,
1333 _quote_currency: &str,
1334 ) -> Result<(f64, String), PayError> {
1335 Err(PayError::NotImplemented(
1336 "exchange-rate HTTP support is not built in this feature set".to_string(),
1337 ))
1338 }
1339}
1340
1341fn now_epoch_ms() -> u64 {
1346 std::time::SystemTime::now()
1347 .duration_since(std::time::UNIX_EPOCH)
1348 .map(|d| d.as_millis() as u64)
1349 .unwrap_or(0)
1350}
1351
1352fn token_asset(network: &str, token: Option<&str>) -> Option<(&'static str, f64)> {
1353 match token.map(|t| t.to_ascii_lowercase()).as_deref() {
1354 Some("sol") => Some(("SOL", 1e9)),
1355 Some("eth") => Some(("ETH", 1e18)),
1356 Some("usdc" | "usdt") => Some(("USD", 1e6)),
1357 Some(_) => None,
1358 None => {
1359 let p = network.to_ascii_lowercase();
1360 if p.starts_with("ln") || p == "cashu" || p == "btc" {
1361 Some(("BTC", 1e8))
1362 } else {
1363 None
1364 }
1365 }
1366 }
1367}
1368
1369#[cfg(feature = "exchange-rate")]
1370fn extract_price_generic(value: &serde_json::Value) -> Option<f64> {
1371 value
1372 .get("price")
1373 .and_then(|v| v.as_f64())
1374 .or_else(|| value.get("rate").and_then(|v| v.as_f64()))
1375 .or_else(|| value.get("usd_per_base").and_then(|v| v.as_f64()))
1376 .or_else(|| {
1377 value
1378 .get("data")
1379 .and_then(|d| d.get("price"))
1380 .and_then(|v| v.as_f64())
1381 })
1382}
1383
1384#[cfg(feature = "exchange-rate")]
1385impl ExchangeRateSourceType {
1386 fn as_str(self) -> &'static str {
1387 match self {
1388 Self::Generic => "generic",
1389 Self::CoinGecko => "coingecko",
1390 Self::Kraken => "kraken",
1391 }
1392 }
1393}
1394
1395#[cfg(feature = "exchange-rate")]
1396fn coingecko_coin_id(symbol: &str) -> Option<&'static str> {
1397 match symbol.to_ascii_uppercase().as_str() {
1398 "BTC" => Some("bitcoin"),
1399 "SOL" => Some("solana"),
1400 "ETH" => Some("ethereum"),
1401 _ => None,
1402 }
1403}
1404
1405#[cfg(feature = "exchange-rate")]
1406fn kraken_pair(symbol: &str) -> Option<&'static str> {
1407 match symbol.to_ascii_uppercase().as_str() {
1408 "BTC" => Some("XBTUSD"),
1409 "SOL" => Some("SOLUSD"),
1410 "ETH" => Some("ETHUSD"),
1411 _ => None,
1412 }
1413}
1414
1415#[cfg(feature = "exchange-rate")]
1416async fn fetch_from_source(
1417 client: &reqwest::Client,
1418 source: &crate::types::ExchangeRateSource,
1419 base: &str,
1420 quote_currency: &str,
1421) -> Result<f64, String> {
1422 type PriceExtractor = Box<dyn Fn(&serde_json::Value) -> Option<f64> + Send>;
1423 let (url, extract_fn): (String, PriceExtractor) = match source.source_type {
1424 ExchangeRateSourceType::Kraken => {
1425 let pair = kraken_pair(base)
1426 .ok_or_else(|| format!("kraken: unsupported base asset '{base}'"))?;
1427 let url = format!("{}/0/public/Ticker?pair={pair}", source.endpoint);
1428 let pair_owned = pair.to_string();
1429 (
1430 url,
1431 Box::new(move |v: &serde_json::Value| {
1432 let result = v.get("result")?;
1433 let ticker = result
1434 .get(&pair_owned)
1435 .or_else(|| result.as_object().and_then(|m| m.values().next()))?;
1436 let price_str = ticker.get("c")?.as_array()?.first()?.as_str()?;
1437 price_str.parse::<f64>().ok()
1438 }),
1439 )
1440 }
1441 ExchangeRateSourceType::CoinGecko => {
1442 let coin_id = coingecko_coin_id(base)
1443 .ok_or_else(|| format!("coingecko: unsupported base asset '{base}'"))?;
1444 let vs = quote_currency.to_ascii_lowercase();
1445 let url = format!(
1446 "{}/simple/price?ids={coin_id}&vs_currencies={vs}",
1447 source.endpoint
1448 );
1449 let coin_id_owned = coin_id.to_string();
1450 let vs_owned = vs.clone();
1451 (
1452 url,
1453 Box::new(move |v: &serde_json::Value| {
1454 v.get(&coin_id_owned)?.get(&vs_owned)?.as_f64()
1455 }),
1456 )
1457 }
1458 ExchangeRateSourceType::Generic => {
1459 let sep = if source.endpoint.contains('?') {
1460 '&'
1461 } else {
1462 '?'
1463 };
1464 let url = format!(
1465 "{}{sep}base={}"e={}",
1466 source.endpoint,
1467 base.to_ascii_uppercase(),
1468 quote_currency.to_ascii_uppercase()
1469 );
1470 (url, Box::new(extract_price_generic))
1471 }
1472 };
1473
1474 let mut req = client.get(&url);
1475 if let Some(key) = &source.api_key {
1476 req = req.header("Authorization", format!("Bearer {key}"));
1477 req = req.header("X-Api-Key", key);
1478 }
1479
1480 let resp = req
1481 .send()
1482 .await
1483 .map_err(|e| format!("request failed: {e}"))?;
1484 if !resp.status().is_success() {
1485 return Err(format!("status {}", resp.status()));
1486 }
1487
1488 let value: serde_json::Value = resp
1489 .json()
1490 .await
1491 .map_err(|e| format!("parse failed: {e}"))?;
1492
1493 extract_fn(&value).ok_or_else(|| "could not extract price from response".to_string())
1494}
1495
1496#[cfg(feature = "redb")]
1497fn encode<T: Serialize>(value: &T) -> Result<String, PayError> {
1498 serde_json::to_string(value)
1499 .map_err(|e| PayError::InternalError(format!("spend encode failed: {e}")))
1500}
1501
1502#[cfg(feature = "redb")]
1503fn decode<T: DeserializeOwned>(encoded: &str) -> Result<T, PayError> {
1504 serde_json::from_str(encoded).map_err(|e| {
1505 let preview_len = encoded.len().min(48);
1506 let preview = &encoded[..preview_len];
1507 PayError::InternalError(format!(
1508 "spend decode failed (len={}, preview={}): {e}",
1509 encoded.len(),
1510 preview
1511 ))
1512 })
1513}
1514
1515#[cfg(feature = "redb")]
1516fn prepend_err(prefix: &str, err: PayError) -> PayError {
1517 match err {
1518 PayError::InternalError(msg) => PayError::InternalError(format!("{prefix}: {msg}")),
1519 other => other,
1520 }
1521}
1522
1523fn generate_rule_identifier() -> Result<String, PayError> {
1524 let mut buf = [0u8; 4];
1525 getrandom::fill(&mut buf).map_err(|e| PayError::InternalError(format!("rng failed: {e}")))?;
1526 Ok(format!("r_{}", hex::encode(buf)))
1527}
1528
1529fn validate_limit(
1530 rule: &SpendLimit,
1531 exchange_rate: Option<&ExchangeRateConfig>,
1532) -> Result<(), PayError> {
1533 if rule.window_s == 0 {
1534 return Err(PayError::InvalidAmount(
1535 "limit rule has zero window_s".to_string(),
1536 ));
1537 }
1538 if rule.max_spend == 0 {
1539 return Err(PayError::InvalidAmount(
1540 "limit rule has zero max_spend".to_string(),
1541 ));
1542 }
1543
1544 match rule.scope {
1545 SpendScope::GlobalUsdCents => {
1546 if rule.network.is_some() || rule.wallet.is_some() {
1547 return Err(PayError::InvalidAmount(
1548 "scope=global-usd-cents cannot set network/wallet".to_string(),
1549 ));
1550 }
1551 if rule.token.is_some() {
1552 return Err(PayError::InvalidAmount(
1553 "scope=global-usd-cents cannot set token".to_string(),
1554 ));
1555 }
1556 }
1557 SpendScope::Network => {
1558 if rule.network.as_deref().unwrap_or("").trim().is_empty() {
1559 return Err(PayError::InvalidAmount(
1560 "scope=network requires network".to_string(),
1561 ));
1562 }
1563 if rule.wallet.is_some() {
1564 return Err(PayError::InvalidAmount(
1565 "scope=network cannot set wallet".to_string(),
1566 ));
1567 }
1568 }
1569 SpendScope::Wallet => {
1570 if rule.network.as_deref().unwrap_or("").trim().is_empty() {
1571 return Err(PayError::InvalidAmount(
1572 "scope=wallet requires network".to_string(),
1573 ));
1574 }
1575 if rule.wallet.as_deref().unwrap_or("").trim().is_empty() {
1576 return Err(PayError::InvalidAmount(
1577 "scope=wallet requires wallet".to_string(),
1578 ));
1579 }
1580 }
1581 }
1582
1583 if rule.scope == SpendScope::GlobalUsdCents && exchange_rate.is_none() {
1584 return Err(PayError::InvalidAmount(
1585 "scope=global-usd-cents requires config.exchange_rate".to_string(),
1586 ));
1587 }
1588 Ok(())
1589}
1590
1591#[cfg(feature = "redb")]
1592fn load_rules(read_txn: &redb::ReadTransaction) -> Result<Vec<SpendLimit>, PayError> {
1593 let Ok(rule_table) = read_txn.open_table(RULE_BY_ID) else {
1594 return Ok(vec![]);
1595 };
1596 rule_table
1597 .iter()
1598 .map_err(|e| PayError::InternalError(format!("spend iterate rules: {e}")))?
1599 .map(|entry| {
1600 let (_k, v) = entry
1601 .map_err(|e| PayError::InternalError(format!("spend read rule entry: {e}")))?;
1602 decode::<SpendLimit>(v.value()).map_err(|e| prepend_err("spend decode rule", e))
1603 })
1604 .collect()
1605}
1606
1607#[cfg(feature = "redb")]
1608fn load_reservations(read_txn: &redb::ReadTransaction) -> Result<Vec<SpendReservation>, PayError> {
1609 let Ok(table) = read_txn.open_table(RESERVATION_BY_ID) else {
1610 return Ok(vec![]);
1611 };
1612 table
1613 .iter()
1614 .map_err(|e| PayError::InternalError(format!("spend iterate reservations: {e}")))?
1615 .map(|entry| {
1616 let (_k, v) = entry
1617 .map_err(|e| PayError::InternalError(format!("spend read reservation: {e}")))?;
1618 decode::<SpendReservation>(v.value())
1619 .map_err(|e| prepend_err("spend decode reservation", e))
1620 })
1621 .collect()
1622}
1623
1624#[cfg(feature = "redb")]
1625fn expire_pending(_table: &mut redb::Table<u64, &str>, _now_ms: u64) -> Result<(), PayError> {
1626 Ok(())
1627}
1628
1629fn amount_for_rule(
1630 _rule: &SpendLimit,
1631 amount_native: u64,
1632 amount_usd_cents: Option<u64>,
1633 use_usd: bool,
1634) -> Result<u64, PayError> {
1635 if use_usd {
1636 amount_usd_cents.ok_or_else(|| {
1637 PayError::InternalError("missing USD amount for non-native unit rule".to_string())
1638 })
1639 } else {
1640 Ok(amount_native)
1641 }
1642}
1643
1644fn reservation_active_for_window(r: &SpendReservation, now_ms: u64) -> bool {
1645 match r.status {
1646 ReservationStatus::Confirmed => true,
1647 ReservationStatus::Pending => r.expires_at_epoch_ms > now_ms,
1648 ReservationStatus::Cancelled | ReservationStatus::Expired => false,
1649 }
1650}
1651
1652fn rule_matches_context(
1653 rule: &SpendLimit,
1654 network: &str,
1655 wallet: Option<&str>,
1656 token: Option<&str>,
1657) -> bool {
1658 if let Some(rule_token) = &rule.token {
1659 match token {
1660 Some(ctx_token) if ctx_token.eq_ignore_ascii_case(rule_token) => {}
1661 _ => return false,
1662 }
1663 }
1664 match rule.scope {
1665 SpendScope::GlobalUsdCents => true,
1666 SpendScope::Network => rule.network.as_deref() == Some(network),
1667 SpendScope::Wallet => {
1668 rule.network.as_deref() == Some(network) && rule.wallet.as_deref() == wallet
1669 }
1670 }
1671}
1672
1673fn scope_key(rule: &SpendLimit) -> String {
1674 match rule.scope {
1675 SpendScope::GlobalUsdCents => "global-usd-cents".to_string(),
1676 SpendScope::Network => rule.network.clone().unwrap_or_default(),
1677 SpendScope::Wallet => format!(
1678 "{}/{}",
1679 rule.network.clone().unwrap_or_default(),
1680 rule.wallet.clone().unwrap_or_default()
1681 ),
1682 }
1683}
1684
1685fn spent_in_window(
1686 rule: &SpendLimit,
1687 reservations: &[SpendReservation],
1688 now_ms: u64,
1689 use_usd: bool,
1690) -> Result<(u64, Option<u64>), PayError> {
1691 let window_ms = rule.window_s.saturating_mul(1000);
1692 let cutoff = now_ms.saturating_sub(window_ms);
1693
1694 let mut spent = 0u64;
1695 let mut oldest: Option<u64> = None;
1696
1697 for r in reservations {
1698 if !reservation_active_for_window(r, now_ms) {
1699 continue;
1700 }
1701 if r.created_at_epoch_ms < cutoff {
1702 continue;
1703 }
1704 if !rule_matches_context(rule, &r.network, r.wallet.as_deref(), r.token.as_deref()) {
1705 continue;
1706 }
1707
1708 let amount = if use_usd {
1709 r.amount_usd_cents.ok_or_else(|| {
1710 PayError::InternalError("reservation missing USD amount".to_string())
1711 })?
1712 } else {
1713 r.amount_native
1714 };
1715 spent = spent.saturating_add(amount);
1716 oldest = Some(oldest.map_or(r.created_at_epoch_ms, |v| v.min(r.created_at_epoch_ms)));
1717 }
1718
1719 Ok((spent, oldest))
1720}
1721
1722#[cfg(feature = "redb")]
1723fn next_counter(write_txn: &redb::WriteTransaction, key: &str) -> Result<u64, PayError> {
1724 let mut meta = write_txn
1725 .open_table(META_COUNTER)
1726 .map_err(|e| PayError::InternalError(format!("spend open meta table: {e}")))?;
1727 let current = match meta
1728 .get(key)
1729 .map_err(|e| PayError::InternalError(format!("spend read counter {key}: {e}")))?
1730 {
1731 Some(v) => v.value(),
1732 None => 0,
1733 };
1734 let next = current.saturating_add(1);
1735 meta.insert(key, next)
1736 .map_err(|e| PayError::InternalError(format!("spend write counter {key}: {e}")))?;
1737 Ok(next)
1738}
1739
1740#[cfg(test)]
1741mod tests {
1742 use super::*;
1743
1744 fn make_limit(scope: SpendScope, network: Option<&str>, wallet: Option<&str>) -> SpendLimit {
1745 SpendLimit {
1746 rule_id: None,
1747 scope,
1748 network: network.map(|s| s.to_string()),
1749 wallet: wallet.map(|s| s.to_string()),
1750 window_s: 3600,
1751 max_spend: 1000,
1752 token: None,
1753 }
1754 }
1755
1756 #[cfg(feature = "redb")]
1757 #[tokio::test]
1758 async fn provider_limit_reserve_and_confirm() {
1759 let tmp = tempfile::tempdir().unwrap();
1760 let ledger = SpendLedger::new(tmp.path().to_str().unwrap(), None);
1761
1762 ledger
1763 .set_limits(&[make_limit(SpendScope::Network, Some("cashu"), None)])
1764 .await
1765 .unwrap();
1766
1767 let ctx = SpendContext {
1768 network: "cashu".to_string(),
1769 wallet: Some("w_01".to_string()),
1770 amount_native: 400,
1771 token: None,
1772 };
1773 let r1 = ledger.reserve("op_1", &ctx).await.unwrap();
1774 ledger.confirm(r1).await.unwrap();
1775
1776 let r2 = ledger.reserve("op_2", &ctx).await.unwrap();
1777 let err = ledger.reserve("op_3", &ctx).await.unwrap_err();
1778 assert!(matches!(err, PayError::LimitExceeded { .. }));
1779
1780 ledger.cancel(r2).await.unwrap();
1781 }
1782
1783 #[cfg(feature = "redb")]
1784 #[tokio::test]
1785 async fn wallet_scope_requires_wallet_context() {
1786 let tmp = tempfile::tempdir().unwrap();
1787 let ledger = SpendLedger::new(tmp.path().to_str().unwrap(), None);
1788
1789 ledger
1790 .set_limits(&[make_limit(SpendScope::Wallet, Some("cashu"), Some("w_abc"))])
1791 .await
1792 .unwrap();
1793
1794 let ctx = SpendContext {
1795 network: "cashu".to_string(),
1796 wallet: None,
1797 amount_native: 1,
1798 token: None,
1799 };
1800 let err = ledger.reserve("op_1", &ctx).await.unwrap_err();
1801 assert!(matches!(err, PayError::InvalidAmount(_)));
1802 }
1803
1804 #[tokio::test]
1805 async fn global_usd_cents_scope_requires_exchange_rate_config() {
1806 let tmp = tempfile::tempdir().unwrap();
1807 let ledger = SpendLedger::new(tmp.path().to_str().unwrap(), None);
1808
1809 let err = ledger
1810 .set_limits(&[SpendLimit {
1811 rule_id: None,
1812 scope: SpendScope::GlobalUsdCents,
1813 network: None,
1814 wallet: None,
1815 window_s: 3600,
1816 max_spend: 100,
1817 token: None,
1818 }])
1819 .await
1820 .unwrap_err();
1821
1822 assert!(matches!(err, PayError::InvalidAmount(_)));
1823 }
1824
1825 #[cfg(feature = "redb")]
1826 #[tokio::test]
1827 async fn network_scope_native_token_ok_without_exchange_rate() {
1828 let tmp = tempfile::tempdir().unwrap();
1829 let ledger = SpendLedger::new(tmp.path().to_str().unwrap(), None);
1830
1831 ledger
1832 .set_limits(&[SpendLimit {
1833 rule_id: None,
1834 scope: SpendScope::Network,
1835 network: Some("cashu".to_string()),
1836 wallet: None,
1837 window_s: 3600,
1838 max_spend: 100,
1839 token: None,
1840 }])
1841 .await
1842 .expect("network scope should not require exchange_rate");
1843 }
1844}