1pub mod tokens;
2
3use crate::provider::PayError;
4use crate::types::{
5 ExchangeRateConfig, ExchangeRateSourceType, SpendLimit, SpendLimitStatus, SpendScope,
6};
7use serde::de::DeserializeOwned;
8use serde::{Deserialize, Serialize};
9use tokio::sync::Mutex;
10
11#[cfg(feature = "redb")]
12use crate::store::db;
13#[cfg(feature = "redb")]
14use redb::{Database, ReadableDatabase, ReadableTable, TableDefinition};
15#[cfg(feature = "redb")]
16use std::path::{Path, PathBuf};
17
18#[cfg(feature = "redb")]
19const META_COUNTER: TableDefinition<&str, u64> = TableDefinition::new("meta_counter");
20#[cfg(feature = "redb")]
21const RULE_BY_ID: TableDefinition<&str, &str> = TableDefinition::new("rule_by_id_v3");
22#[cfg(feature = "redb")]
23const RESERVATION_BY_ID: TableDefinition<u64, &str> = TableDefinition::new("reservation_by_id");
24#[cfg(feature = "redb")]
25const RESERVATION_ID_BY_OP_ID: TableDefinition<&str, u64> =
26 TableDefinition::new("reservation_id_by_op_id");
27#[cfg(feature = "redb")]
28const SPEND_EVENT_BY_ID: TableDefinition<u64, &str> = TableDefinition::new("spend_event_by_id");
29#[cfg(feature = "redb")]
30const FX_QUOTE_BY_PAIR: TableDefinition<&str, &str> = TableDefinition::new("quote_by_pair");
31#[cfg(feature = "redb")]
32const NEXT_RESERVATION_ID_KEY: &str = "next_reservation_id";
33#[cfg(feature = "redb")]
34const NEXT_EVENT_ID_KEY: &str = "next_event_id";
35#[cfg(feature = "redb")]
36const SPEND_VERSION: u64 = 1;
37#[cfg(feature = "redb")]
38const FX_CACHE_VERSION: u64 = 1;
39
40#[derive(Debug, Clone)]
41pub struct SpendContext {
42 pub network: String,
43 pub wallet: Option<String>,
44 pub amount_native: u64,
45 pub token: Option<String>,
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
49enum ReservationStatus {
50 Pending,
51 Confirmed,
52 Cancelled,
53 Expired,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
57struct SpendReservation {
58 reservation_id: u64,
59 op_id: String,
60 network: String,
61 wallet: Option<String>,
62 #[serde(default)]
63 token: Option<String>,
64 amount_native: u64,
65 amount_usd_cents: Option<u64>,
66 status: ReservationStatus,
67 created_at_epoch_ms: u64,
68 expires_at_epoch_ms: u64,
69 finalized_at_epoch_ms: Option<u64>,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
73struct SpendEvent {
74 event_id: u64,
75 reservation_id: u64,
76 op_id: String,
77 network: String,
78 wallet: Option<String>,
79 #[serde(default)]
80 token: Option<String>,
81 amount_native: u64,
82 amount_usd_cents: Option<u64>,
83 created_at_epoch_ms: u64,
84 confirmed_at_epoch_ms: u64,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
88struct ExchangeRateQuote {
89 pair: String,
90 source: String,
91 price: f64,
92 fetched_at_epoch_ms: u64,
93 expires_at_epoch_ms: u64,
94}
95
96#[allow(dead_code)] enum SpendBackend {
102 #[cfg(feature = "redb")]
103 Redb {
104 data_dir: String,
105 },
106 #[cfg(feature = "postgres")]
107 Postgres {
108 pool: sqlx::PgPool,
109 },
110 None,
111}
112
113pub struct SpendLedger {
118 backend: SpendBackend,
119 exchange_rate: Option<ExchangeRateConfig>,
120 mu: Mutex<()>,
121}
122
123impl SpendLedger {
124 pub fn new(data_dir: &str, exchange_rate: Option<ExchangeRateConfig>) -> Self {
125 #[cfg(feature = "redb")]
126 let backend = SpendBackend::Redb {
127 data_dir: data_dir.to_string(),
128 };
129 #[cfg(not(feature = "redb"))]
130 let backend = {
131 let _ = data_dir;
132 SpendBackend::None
133 };
134 Self {
135 backend,
136 exchange_rate,
137 mu: Mutex::new(()),
138 }
139 }
140
141 #[cfg(feature = "postgres")]
142 pub fn new_postgres(pool: sqlx::PgPool, exchange_rate: Option<ExchangeRateConfig>) -> Self {
143 Self {
144 backend: SpendBackend::Postgres { pool },
145 exchange_rate,
146 mu: Mutex::new(()),
147 }
148 }
149
150 pub async fn add_limit(&self, limit: &mut SpendLimit) -> Result<String, PayError> {
152 validate_limit(limit, self.exchange_rate.as_ref())?;
153
154 let _guard = self.mu.lock().await;
155
156 match &self.backend {
157 #[cfg(feature = "redb")]
158 SpendBackend::Redb { .. } => self.add_limit_redb(limit),
159 #[cfg(feature = "postgres")]
160 SpendBackend::Postgres { .. } => self.add_limit_postgres(limit).await,
161 SpendBackend::None => Err(PayError::NotImplemented(
162 "no storage backend for spend limits".to_string(),
163 )),
164 }
165 }
166
167 pub async fn remove_limit(&self, rule_id: &str) -> Result<(), PayError> {
169 let _guard = self.mu.lock().await;
170
171 match &self.backend {
172 #[cfg(feature = "redb")]
173 SpendBackend::Redb { .. } => self.remove_limit_redb(rule_id),
174 #[cfg(feature = "postgres")]
175 SpendBackend::Postgres { .. } => self.remove_limit_postgres(rule_id).await,
176 SpendBackend::None => Err(PayError::NotImplemented(
177 "no storage backend for spend limits".to_string(),
178 )),
179 }
180 }
181
182 pub async fn set_limits(&self, limits: &[SpendLimit]) -> Result<(), PayError> {
184 for limit in limits {
185 validate_limit(limit, self.exchange_rate.as_ref())?;
186 }
187
188 let _guard = self.mu.lock().await;
189
190 match &self.backend {
191 #[cfg(feature = "redb")]
192 SpendBackend::Redb { .. } => self.set_limits_redb(limits),
193 #[cfg(feature = "postgres")]
194 SpendBackend::Postgres { .. } => self.set_limits_postgres(limits).await,
195 SpendBackend::None => Err(PayError::NotImplemented(
196 "no storage backend for spend limits".to_string(),
197 )),
198 }
199 }
200
201 pub async fn get_status(&self) -> Result<Vec<SpendLimitStatus>, PayError> {
203 let _guard = self.mu.lock().await;
204
205 match &self.backend {
206 #[cfg(feature = "redb")]
207 SpendBackend::Redb { .. } => self.get_status_redb(),
208 #[cfg(feature = "postgres")]
209 SpendBackend::Postgres { .. } => self.get_status_postgres().await,
210 SpendBackend::None => Ok(Vec::new()),
211 }
212 }
213
214 pub async fn reserve(&self, op_id: &str, ctx: &SpendContext) -> Result<u64, PayError> {
216 if op_id.trim().is_empty() {
217 return Err(PayError::InvalidAmount("op_id cannot be empty".to_string()));
218 }
219 if ctx.network.trim().is_empty() {
220 return Err(PayError::InvalidAmount(
221 "network cannot be empty for spend check".to_string(),
222 ));
223 }
224
225 let _guard = self.mu.lock().await;
226
227 match &self.backend {
228 #[cfg(feature = "redb")]
229 SpendBackend::Redb { .. } => self.reserve_redb(op_id, ctx).await,
230 #[cfg(feature = "postgres")]
231 SpendBackend::Postgres { .. } => self.reserve_postgres(op_id, ctx).await,
232 SpendBackend::None => Err(PayError::NotImplemented(
233 "no storage backend for spend limits".to_string(),
234 )),
235 }
236 }
237
238 pub async fn confirm(&self, reservation_id: u64) -> Result<(), PayError> {
239 let _guard = self.mu.lock().await;
240
241 match &self.backend {
242 #[cfg(feature = "redb")]
243 SpendBackend::Redb { .. } => self.confirm_redb(reservation_id),
244 #[cfg(feature = "postgres")]
245 SpendBackend::Postgres { .. } => self.confirm_postgres(reservation_id).await,
246 SpendBackend::None => Err(PayError::NotImplemented(
247 "no storage backend for spend limits".to_string(),
248 )),
249 }
250 }
251
252 pub async fn cancel(&self, reservation_id: u64) -> Result<(), PayError> {
253 let _guard = self.mu.lock().await;
254
255 match &self.backend {
256 #[cfg(feature = "redb")]
257 SpendBackend::Redb { .. } => self.cancel_redb(reservation_id),
258 #[cfg(feature = "postgres")]
259 SpendBackend::Postgres { .. } => self.cancel_postgres(reservation_id).await,
260 SpendBackend::None => Ok(()),
261 }
262 }
263}
264
265#[cfg(feature = "redb")]
270impl SpendLedger {
271 fn spend_db_path(&self) -> PathBuf {
272 match &self.backend {
273 SpendBackend::Redb { data_dir } => Path::new(data_dir).join("spend").join("spend.redb"),
274 #[allow(unreachable_patterns)]
275 _ => PathBuf::new(),
276 }
277 }
278
279 fn exchange_rate_db_path(&self) -> PathBuf {
280 match &self.backend {
281 SpendBackend::Redb { data_dir } => Path::new(data_dir)
282 .join("spend")
283 .join("exchange-rate-cache.redb"),
284 #[allow(unreachable_patterns)]
285 _ => PathBuf::new(),
286 }
287 }
288
289 fn open_spend_db(&self) -> Result<Database, PayError> {
290 db::open_and_migrate(
291 &self.spend_db_path(),
292 SPEND_VERSION,
293 &[
294 &|_db: &Database| Ok(()),
296 ],
297 )
298 }
299
300 fn open_exchange_rate_db(&self) -> Result<Database, PayError> {
301 db::open_and_migrate(
302 &self.exchange_rate_db_path(),
303 FX_CACHE_VERSION,
304 &[
305 &|_db: &Database| Ok(()),
307 ],
308 )
309 }
310
311 fn add_limit_redb(&self, limit: &mut SpendLimit) -> Result<String, PayError> {
312 let db = self.open_spend_db()?;
313 let rule_id = generate_rule_identifier()?;
314 limit.rule_id = Some(rule_id.clone());
315 let encoded = encode(limit)?;
316 let write_txn = db
317 .begin_write()
318 .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
319 {
320 let mut rule_table = write_txn
321 .open_table(RULE_BY_ID)
322 .map_err(|e| PayError::InternalError(format!("spend open rule table: {e}")))?;
323 rule_table
324 .insert(rule_id.as_str(), encoded.as_str())
325 .map_err(|e| PayError::InternalError(format!("spend insert rule: {e}")))?;
326 }
327 write_txn
328 .commit()
329 .map_err(|e| PayError::InternalError(format!("spend commit add_limit: {e}")))?;
330 Ok(rule_id)
331 }
332
333 fn remove_limit_redb(&self, rule_id: &str) -> Result<(), PayError> {
334 let db = self.open_spend_db()?;
335 let write_txn = db
336 .begin_write()
337 .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
338 {
339 let mut rule_table = write_txn
340 .open_table(RULE_BY_ID)
341 .map_err(|e| PayError::InternalError(format!("spend open rule table: {e}")))?;
342 let existed = rule_table
343 .remove(rule_id)
344 .map_err(|e| PayError::InternalError(format!("spend remove rule: {e}")))?;
345 if existed.is_none() {
346 return Err(PayError::InvalidAmount(format!(
347 "rule_id '{rule_id}' not found"
348 )));
349 }
350 }
351 write_txn
352 .commit()
353 .map_err(|e| PayError::InternalError(format!("spend commit remove_limit: {e}")))
354 }
355
356 fn set_limits_redb(&self, limits: &[SpendLimit]) -> Result<(), PayError> {
357 let db = self.open_spend_db()?;
358 let write_txn = db
359 .begin_write()
360 .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
361 {
362 let mut rule_table = write_txn
363 .open_table(RULE_BY_ID)
364 .map_err(|e| PayError::InternalError(format!("spend open rule table: {e}")))?;
365 let existing_ids = rule_table
367 .iter()
368 .map_err(|e| PayError::InternalError(format!("spend iterate rules: {e}")))?
369 .map(|entry| {
370 entry
371 .map(|(k, _)| k.value().to_string())
372 .map_err(|e| PayError::InternalError(format!("spend read rule key: {e}")))
373 })
374 .collect::<Result<Vec<_>, _>>()?;
375 for rid in existing_ids {
376 rule_table
377 .remove(rid.as_str())
378 .map_err(|e| PayError::InternalError(format!("spend remove rule: {e}")))?;
379 }
380
381 for limit in limits {
383 let mut rule = limit.clone();
384 let rid = generate_rule_identifier()?;
385 rule.rule_id = Some(rid.clone());
386 let encoded = encode(&rule)?;
387 rule_table
388 .insert(rid.as_str(), encoded.as_str())
389 .map_err(|e| PayError::InternalError(format!("spend insert rule: {e}")))?;
390 }
391 }
392 write_txn
393 .commit()
394 .map_err(|e| PayError::InternalError(format!("spend commit set_limits: {e}")))
395 }
396
397 fn get_status_redb(&self) -> Result<Vec<SpendLimitStatus>, PayError> {
398 let db = self.open_spend_db()?;
399 let read_txn = db
400 .begin_read()
401 .map_err(|e| PayError::InternalError(format!("spend begin_read: {e}")))?;
402 let rules = load_rules(&read_txn)?;
403 let reservations = load_reservations(&read_txn)?;
404 let now = now_epoch_ms();
405 let mut out = Vec::with_capacity(rules.len());
406 for rule in rules {
407 let use_usd = rule.scope == SpendScope::GlobalUsdCents;
408 let (spent, oldest_ts) = spent_in_window(&rule, &reservations, now, use_usd)?;
409 let remaining = rule.max_spend.saturating_sub(spent);
410 let window_ms = rule.window_s.saturating_mul(1000);
411 let window_reset_s = oldest_ts
412 .map(|oldest| (oldest.saturating_add(window_ms)).saturating_sub(now) / 1000)
413 .unwrap_or(0);
414 out.push(SpendLimitStatus {
415 rule_id: rule.rule_id.clone().unwrap_or_default(),
416 scope: rule.scope,
417 network: rule.network.clone(),
418 wallet: rule.wallet.clone(),
419 window_s: rule.window_s,
420 max_spend: rule.max_spend,
421 spent,
422 remaining,
423 token: rule.token.clone(),
424 window_reset_s,
425 });
426 }
427 Ok(out)
428 }
429
430 async fn reserve_redb(&self, op_id: &str, ctx: &SpendContext) -> Result<u64, PayError> {
431 let now = now_epoch_ms();
432 let db = self.open_spend_db()?;
433
434 let read_txn = db
435 .begin_read()
436 .map_err(|e| PayError::InternalError(format!("spend begin_read: {e}")))?;
437 let rules = load_rules(&read_txn)?;
438
439 if rules.iter().any(|r| {
440 r.scope == SpendScope::Wallet
441 && r.network.as_deref() == Some(ctx.network.as_str())
442 && ctx.wallet.is_none()
443 }) {
444 return Err(PayError::InvalidAmount(
445 "wallet-scoped limits require an explicit wallet".to_string(),
446 ));
447 }
448
449 let needs_usd = rules.iter().any(|r| r.scope == SpendScope::GlobalUsdCents);
451 let amount_usd_cents = if needs_usd {
452 Some(
453 self.amount_to_usd_cents(&ctx.network, ctx.token.as_deref(), ctx.amount_native)
454 .await?,
455 )
456 } else {
457 None
458 };
459
460 let write_txn = db
461 .begin_write()
462 .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
463
464 let mut encoded_blobs: Vec<String> = Vec::new();
465 let reservation_id = {
466 let mut reservation_index =
467 write_txn.open_table(RESERVATION_ID_BY_OP_ID).map_err(|e| {
468 PayError::InternalError(format!("spend open reservation op index: {e}"))
469 })?;
470 if let Some(existing) = reservation_index
471 .get(op_id)
472 .map_err(|e| PayError::InternalError(format!("spend read op index: {e}")))?
473 {
474 let existing_id = existing.value();
475 return Ok(existing_id);
476 }
477
478 let mut reservation_table = write_txn.open_table(RESERVATION_BY_ID).map_err(|e| {
479 PayError::InternalError(format!("spend open reservation table: {e}"))
480 })?;
481
482 expire_pending(&mut reservation_table, now)?;
483
484 let reservations = reservation_table
485 .iter()
486 .map_err(|e| PayError::InternalError(format!("spend iterate reservations: {e}")))?
487 .map(|entry| {
488 let (_k, v) = entry.map_err(|e| {
489 PayError::InternalError(format!("spend read reservation: {e}"))
490 })?;
491 decode::<SpendReservation>(v.value())
492 .map_err(|e| prepend_err("spend decode reservation", e))
493 })
494 .collect::<Result<Vec<_>, _>>()?;
495
496 for rule in rules.iter() {
497 if !rule_matches_context(
498 rule,
499 &ctx.network,
500 ctx.wallet.as_deref(),
501 ctx.token.as_deref(),
502 ) {
503 continue;
504 }
505
506 let use_usd = rule.scope == SpendScope::GlobalUsdCents;
507 let candidate_amount =
508 amount_for_rule(rule, ctx.amount_native, amount_usd_cents, use_usd)?;
509 let (spent, oldest_ts) = spent_in_window(rule, &reservations, now, use_usd)?;
510 if spent.saturating_add(candidate_amount) > rule.max_spend {
511 let window_ms = rule.window_s.saturating_mul(1000);
512 let remaining_s = oldest_ts
513 .map(|oldest| (oldest.saturating_add(window_ms)).saturating_sub(now) / 1000)
514 .unwrap_or(0);
515
516 return Err(PayError::LimitExceeded {
517 rule_id: rule.rule_id.clone().unwrap_or_default(),
518 scope: rule.scope,
519 scope_key: scope_key(rule),
520 spent,
521 max_spend: rule.max_spend,
522 token: rule.token.clone(),
523 remaining_s,
524 origin: None,
525 });
526 }
527 }
528
529 let reservation_id = next_counter(&write_txn, NEXT_RESERVATION_ID_KEY)?;
530 let reservation = SpendReservation {
531 reservation_id,
532 op_id: op_id.to_string(),
533 network: ctx.network.clone(),
534 wallet: ctx.wallet.clone(),
535 token: ctx.token.clone(),
536 amount_native: ctx.amount_native,
537 amount_usd_cents,
538 status: ReservationStatus::Pending,
539 created_at_epoch_ms: now,
540 expires_at_epoch_ms: now.saturating_add(300_000),
541 finalized_at_epoch_ms: None,
542 };
543 encoded_blobs.push(encode(&reservation)?);
544 let encoded = encoded_blobs
545 .last()
546 .ok_or_else(|| PayError::InternalError("missing reservation blob".to_string()))?;
547 reservation_table
548 .insert(reservation_id, encoded.as_str())
549 .map_err(|e| PayError::InternalError(format!("spend insert reservation: {e}")))?;
550 reservation_index
551 .insert(op_id, reservation_id)
552 .map_err(|e| PayError::InternalError(format!("spend insert op index: {e}")))?;
553 reservation_id
554 };
555
556 write_txn
557 .commit()
558 .map_err(|e| PayError::InternalError(format!("spend commit reserve: {e}")))?;
559 Ok(reservation_id)
560 }
561
562 fn confirm_redb(&self, reservation_id: u64) -> Result<(), PayError> {
563 let db = self.open_spend_db()?;
564 let now = now_epoch_ms();
565
566 let write_txn = db
567 .begin_write()
568 .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
569
570 let mut encoded_blobs: Vec<String> = Vec::new();
571 {
572 let mut reservation_table = write_txn.open_table(RESERVATION_BY_ID).map_err(|e| {
573 PayError::InternalError(format!("spend open reservation table: {e}"))
574 })?;
575 let Some(existing_bytes) = reservation_table
576 .get(reservation_id)
577 .map_err(|e| PayError::InternalError(format!("spend read reservation: {e}")))?
578 .map(|g| g.value().to_string())
579 else {
580 return Err(PayError::InternalError(format!(
581 "reservation {reservation_id} not found"
582 )));
583 };
584
585 let mut reservation: SpendReservation = decode(&existing_bytes)?;
586 if !matches!(reservation.status, ReservationStatus::Pending) {
587 return Ok(());
588 }
589
590 reservation.status = ReservationStatus::Confirmed;
591 reservation.finalized_at_epoch_ms = Some(now);
592 encoded_blobs.push(encode(&reservation)?);
593 let encoded = encoded_blobs
594 .last()
595 .ok_or_else(|| PayError::InternalError("missing reservation blob".to_string()))?;
596 reservation_table
597 .insert(reservation_id, encoded.as_str())
598 .map_err(|e| PayError::InternalError(format!("spend update reservation: {e}")))?;
599
600 let mut events = write_txn
601 .open_table(SPEND_EVENT_BY_ID)
602 .map_err(|e| PayError::InternalError(format!("spend open event table: {e}")))?;
603 let event_id = next_counter(&write_txn, NEXT_EVENT_ID_KEY)?;
604 let event = SpendEvent {
605 event_id,
606 reservation_id,
607 op_id: reservation.op_id,
608 network: reservation.network,
609 wallet: reservation.wallet,
610 token: reservation.token,
611 amount_native: reservation.amount_native,
612 amount_usd_cents: reservation.amount_usd_cents,
613 created_at_epoch_ms: reservation.created_at_epoch_ms,
614 confirmed_at_epoch_ms: now,
615 };
616 encoded_blobs.push(encode(&event)?);
617 let encoded_event = encoded_blobs
618 .last()
619 .ok_or_else(|| PayError::InternalError("missing event blob".to_string()))?;
620 events
621 .insert(event_id, encoded_event.as_str())
622 .map_err(|e| PayError::InternalError(format!("spend insert event: {e}")))?;
623 }
624
625 write_txn
626 .commit()
627 .map_err(|e| PayError::InternalError(format!("spend commit confirm: {e}")))
628 }
629
630 fn cancel_redb(&self, reservation_id: u64) -> Result<(), PayError> {
631 let db = self.open_spend_db()?;
632 let now = now_epoch_ms();
633
634 let write_txn = db
635 .begin_write()
636 .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
637
638 let mut encoded_blobs: Vec<String> = Vec::new();
639 {
640 let mut reservation_table = write_txn.open_table(RESERVATION_BY_ID).map_err(|e| {
641 PayError::InternalError(format!("spend open reservation table: {e}"))
642 })?;
643 let existing = reservation_table
644 .get(reservation_id)
645 .map_err(|e| PayError::InternalError(format!("spend read reservation: {e}")))?;
646 let existing_bytes = existing.map(|g| g.value().to_string());
647 if let Some(existing_bytes) = existing_bytes {
648 let mut reservation: SpendReservation = decode(&existing_bytes)?;
649 if matches!(reservation.status, ReservationStatus::Pending) {
650 reservation.status = ReservationStatus::Cancelled;
651 reservation.finalized_at_epoch_ms = Some(now);
652 encoded_blobs.push(encode(&reservation)?);
653 let encoded = encoded_blobs.last().ok_or_else(|| {
654 PayError::InternalError("missing reservation blob".to_string())
655 })?;
656 reservation_table
657 .insert(reservation_id, encoded.as_str())
658 .map_err(|e| {
659 PayError::InternalError(format!("spend update reservation: {e}"))
660 })?;
661 }
662 }
663 }
664
665 write_txn
666 .commit()
667 .map_err(|e| PayError::InternalError(format!("spend commit cancel: {e}")))
668 }
669}
670
671#[cfg(feature = "postgres")]
676impl SpendLedger {
677 fn pg_pool(&self) -> Result<&sqlx::PgPool, PayError> {
678 match &self.backend {
679 SpendBackend::Postgres { pool } => Ok(pool),
680 _ => Err(PayError::InternalError(
681 "expected postgres spend backend".to_string(),
682 )),
683 }
684 }
685
686 async fn add_limit_postgres(&self, limit: &mut SpendLimit) -> Result<String, PayError> {
687 let pool = self.pg_pool()?;
688 let rule_id = generate_rule_identifier()?;
689 limit.rule_id = Some(rule_id.clone());
690 let rule_json = serde_json::to_value(limit)
691 .map_err(|e| PayError::InternalError(format!("serialize spend rule: {e}")))?;
692
693 sqlx::query("INSERT INTO spend_rules (rule_id, rule) VALUES ($1, $2)")
694 .bind(&rule_id)
695 .bind(&rule_json)
696 .execute(pool)
697 .await
698 .map_err(|e| PayError::InternalError(format!("pg insert spend rule: {e}")))?;
699
700 Ok(rule_id)
701 }
702
703 async fn remove_limit_postgres(&self, rule_id: &str) -> Result<(), PayError> {
704 let pool = self.pg_pool()?;
705 let result = sqlx::query("DELETE FROM spend_rules WHERE rule_id = $1")
706 .bind(rule_id)
707 .execute(pool)
708 .await
709 .map_err(|e| PayError::InternalError(format!("pg delete spend rule: {e}")))?;
710
711 if result.rows_affected() == 0 {
712 return Err(PayError::InvalidAmount(format!(
713 "rule_id '{rule_id}' not found"
714 )));
715 }
716 Ok(())
717 }
718
719 async fn set_limits_postgres(&self, limits: &[SpendLimit]) -> Result<(), PayError> {
720 let pool = self.pg_pool()?;
721 let mut tx = pool
722 .begin()
723 .await
724 .map_err(|e| PayError::InternalError(format!("pg begin tx: {e}")))?;
725
726 sqlx::query("DELETE FROM spend_rules")
727 .execute(&mut *tx)
728 .await
729 .map_err(|e| PayError::InternalError(format!("pg clear spend rules: {e}")))?;
730
731 for limit in limits {
732 let mut rule = limit.clone();
733 let rid = generate_rule_identifier()?;
734 rule.rule_id = Some(rid.clone());
735 let rule_json = serde_json::to_value(&rule)
736 .map_err(|e| PayError::InternalError(format!("serialize spend rule: {e}")))?;
737 sqlx::query("INSERT INTO spend_rules (rule_id, rule) VALUES ($1, $2)")
738 .bind(&rid)
739 .bind(&rule_json)
740 .execute(&mut *tx)
741 .await
742 .map_err(|e| PayError::InternalError(format!("pg insert spend rule: {e}")))?;
743 }
744
745 tx.commit()
746 .await
747 .map_err(|e| PayError::InternalError(format!("pg commit set_limits: {e}")))
748 }
749
750 async fn get_status_postgres(&self) -> Result<Vec<SpendLimitStatus>, PayError> {
751 let pool = self.pg_pool()?;
752 let rules = pg_load_rules(pool).await?;
753 let reservations = pg_load_reservations(pool).await?;
754 let now = now_epoch_ms();
755
756 let mut out = Vec::with_capacity(rules.len());
757 for rule in rules {
758 let use_usd = rule.scope == SpendScope::GlobalUsdCents;
759 let (spent, oldest_ts) = spent_in_window(&rule, &reservations, now, use_usd)?;
760 let remaining = rule.max_spend.saturating_sub(spent);
761 let window_ms = rule.window_s.saturating_mul(1000);
762 let window_reset_s = oldest_ts
763 .map(|oldest| (oldest.saturating_add(window_ms)).saturating_sub(now) / 1000)
764 .unwrap_or(0);
765 out.push(SpendLimitStatus {
766 rule_id: rule.rule_id.clone().unwrap_or_default(),
767 scope: rule.scope,
768 network: rule.network.clone(),
769 wallet: rule.wallet.clone(),
770 window_s: rule.window_s,
771 max_spend: rule.max_spend,
772 spent,
773 remaining,
774 token: rule.token.clone(),
775 window_reset_s,
776 });
777 }
778 Ok(out)
779 }
780
781 async fn reserve_postgres(&self, op_id: &str, ctx: &SpendContext) -> Result<u64, PayError> {
782 use crate::store::postgres_store::SPEND_ADVISORY_LOCK_KEY;
783
784 let pool = self.pg_pool()?;
785 let now = now_epoch_ms();
786
787 let rules = pg_load_rules(pool).await?;
789 if rules.iter().any(|r| {
790 r.scope == SpendScope::Wallet
791 && r.network.as_deref() == Some(ctx.network.as_str())
792 && ctx.wallet.is_none()
793 }) {
794 return Err(PayError::InvalidAmount(
795 "wallet-scoped limits require an explicit wallet".to_string(),
796 ));
797 }
798
799 let needs_usd = rules.iter().any(|r| r.scope == SpendScope::GlobalUsdCents);
800 let amount_usd_cents = if needs_usd {
801 Some(
802 self.amount_to_usd_cents(&ctx.network, ctx.token.as_deref(), ctx.amount_native)
803 .await?,
804 )
805 } else {
806 None
807 };
808
809 let mut tx = pool
811 .begin()
812 .await
813 .map_err(|e| PayError::InternalError(format!("pg begin tx: {e}")))?;
814
815 sqlx::query("SELECT pg_advisory_xact_lock($1)")
816 .bind(SPEND_ADVISORY_LOCK_KEY)
817 .execute(&mut *tx)
818 .await
819 .map_err(|e| PayError::InternalError(format!("pg advisory lock: {e}")))?;
820
821 let existing: Option<(i64,)> =
823 sqlx::query_as("SELECT reservation_id FROM spend_reservations WHERE op_id = $1")
824 .bind(op_id)
825 .fetch_optional(&mut *tx)
826 .await
827 .map_err(|e| PayError::InternalError(format!("pg check op_id: {e}")))?;
828
829 if let Some((rid,)) = existing {
830 return Ok(rid as u64);
831 }
832
833 pg_expire_pending(&mut tx, now).await?;
835
836 let reservations = pg_load_reservations_tx(&mut tx).await?;
838
839 let rules = pg_load_rules_tx(&mut tx).await?;
841
842 for rule in rules.iter() {
844 if !rule_matches_context(
845 rule,
846 &ctx.network,
847 ctx.wallet.as_deref(),
848 ctx.token.as_deref(),
849 ) {
850 continue;
851 }
852
853 let use_usd = rule.scope == SpendScope::GlobalUsdCents;
854 let candidate_amount =
855 amount_for_rule(rule, ctx.amount_native, amount_usd_cents, use_usd)?;
856 let (spent, oldest_ts) = spent_in_window(rule, &reservations, now, use_usd)?;
857 if spent.saturating_add(candidate_amount) > rule.max_spend {
858 let window_ms = rule.window_s.saturating_mul(1000);
859 let remaining_s = oldest_ts
860 .map(|oldest| (oldest.saturating_add(window_ms)).saturating_sub(now) / 1000)
861 .unwrap_or(0);
862
863 return Err(PayError::LimitExceeded {
864 rule_id: rule.rule_id.clone().unwrap_or_default(),
865 scope: rule.scope,
866 scope_key: scope_key(rule),
867 spent,
868 max_spend: rule.max_spend,
869 token: rule.token.clone(),
870 remaining_s,
871 origin: None,
872 });
873 }
874 }
875
876 let reservation = SpendReservation {
878 reservation_id: 0, op_id: op_id.to_string(),
880 network: ctx.network.clone(),
881 wallet: ctx.wallet.clone(),
882 token: ctx.token.clone(),
883 amount_native: ctx.amount_native,
884 amount_usd_cents,
885 status: ReservationStatus::Pending,
886 created_at_epoch_ms: now,
887 expires_at_epoch_ms: now.saturating_add(300_000),
888 finalized_at_epoch_ms: None,
889 };
890 let reservation_json = serde_json::to_value(&reservation)
891 .map_err(|e| PayError::InternalError(format!("serialize reservation: {e}")))?;
892
893 let row: (i64,) = sqlx::query_as(
894 "INSERT INTO spend_reservations (op_id, reservation) \
895 VALUES ($1, $2) RETURNING reservation_id",
896 )
897 .bind(op_id)
898 .bind(&reservation_json)
899 .fetch_one(&mut *tx)
900 .await
901 .map_err(|e| PayError::InternalError(format!("pg insert reservation: {e}")))?;
902
903 let reservation_id = row.0 as u64;
904
905 let mut updated_json = reservation_json;
907 updated_json["reservation_id"] = serde_json::json!(reservation_id);
908 sqlx::query("UPDATE spend_reservations SET reservation = $1 WHERE reservation_id = $2")
909 .bind(&updated_json)
910 .bind(row.0)
911 .execute(&mut *tx)
912 .await
913 .map_err(|e| PayError::InternalError(format!("pg update reservation id: {e}")))?;
914
915 tx.commit()
916 .await
917 .map_err(|e| PayError::InternalError(format!("pg commit reserve: {e}")))?;
918
919 Ok(reservation_id)
920 }
921
922 async fn confirm_postgres(&self, reservation_id: u64) -> Result<(), PayError> {
923 let pool = self.pg_pool()?;
924 let now = now_epoch_ms();
925 let rid = reservation_id as i64;
926
927 let row: Option<(serde_json::Value,)> =
928 sqlx::query_as("SELECT reservation FROM spend_reservations WHERE reservation_id = $1")
929 .bind(rid)
930 .fetch_optional(pool)
931 .await
932 .map_err(|e| PayError::InternalError(format!("pg read reservation: {e}")))?;
933
934 let Some((res_json,)) = row else {
935 return Err(PayError::InternalError(format!(
936 "reservation {reservation_id} not found"
937 )));
938 };
939
940 let mut reservation: SpendReservation = serde_json::from_value(res_json)
941 .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))?;
942
943 if !matches!(reservation.status, ReservationStatus::Pending) {
944 return Ok(());
945 }
946
947 reservation.status = ReservationStatus::Confirmed;
948 reservation.finalized_at_epoch_ms = Some(now);
949 let updated_json = serde_json::to_value(&reservation)
950 .map_err(|e| PayError::InternalError(format!("serialize reservation: {e}")))?;
951
952 let event = SpendEvent {
953 event_id: 0, reservation_id,
955 op_id: reservation.op_id,
956 network: reservation.network,
957 wallet: reservation.wallet,
958 token: reservation.token,
959 amount_native: reservation.amount_native,
960 amount_usd_cents: reservation.amount_usd_cents,
961 created_at_epoch_ms: reservation.created_at_epoch_ms,
962 confirmed_at_epoch_ms: now,
963 };
964 let event_json = serde_json::to_value(&event)
965 .map_err(|e| PayError::InternalError(format!("serialize spend event: {e}")))?;
966
967 let mut tx = pool
968 .begin()
969 .await
970 .map_err(|e| PayError::InternalError(format!("pg begin tx: {e}")))?;
971
972 sqlx::query("UPDATE spend_reservations SET reservation = $1 WHERE reservation_id = $2")
973 .bind(&updated_json)
974 .bind(rid)
975 .execute(&mut *tx)
976 .await
977 .map_err(|e| PayError::InternalError(format!("pg update reservation: {e}")))?;
978
979 sqlx::query("INSERT INTO spend_events (reservation_id, event) VALUES ($1, $2)")
980 .bind(rid)
981 .bind(&event_json)
982 .execute(&mut *tx)
983 .await
984 .map_err(|e| PayError::InternalError(format!("pg insert spend event: {e}")))?;
985
986 tx.commit()
987 .await
988 .map_err(|e| PayError::InternalError(format!("pg commit confirm: {e}")))
989 }
990
991 async fn cancel_postgres(&self, reservation_id: u64) -> Result<(), PayError> {
992 let pool = self.pg_pool()?;
993 let now = now_epoch_ms();
994 let rid = reservation_id as i64;
995
996 let row: Option<(serde_json::Value,)> =
997 sqlx::query_as("SELECT reservation FROM spend_reservations WHERE reservation_id = $1")
998 .bind(rid)
999 .fetch_optional(pool)
1000 .await
1001 .map_err(|e| PayError::InternalError(format!("pg read reservation: {e}")))?;
1002
1003 if let Some((res_json,)) = row {
1004 let mut reservation: SpendReservation = serde_json::from_value(res_json)
1005 .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))?;
1006
1007 if matches!(reservation.status, ReservationStatus::Pending) {
1008 reservation.status = ReservationStatus::Cancelled;
1009 reservation.finalized_at_epoch_ms = Some(now);
1010 let updated_json = serde_json::to_value(&reservation)
1011 .map_err(|e| PayError::InternalError(format!("serialize reservation: {e}")))?;
1012
1013 sqlx::query(
1014 "UPDATE spend_reservations SET reservation = $1 WHERE reservation_id = $2",
1015 )
1016 .bind(&updated_json)
1017 .bind(rid)
1018 .execute(pool)
1019 .await
1020 .map_err(|e| PayError::InternalError(format!("pg update reservation: {e}")))?;
1021 }
1022 }
1023
1024 Ok(())
1025 }
1026}
1027
1028#[cfg(feature = "postgres")]
1029async fn pg_load_rules(pool: &sqlx::PgPool) -> Result<Vec<SpendLimit>, PayError> {
1030 let rows: Vec<(serde_json::Value,)> =
1031 sqlx::query_as("SELECT rule FROM spend_rules ORDER BY rule_id")
1032 .fetch_all(pool)
1033 .await
1034 .map_err(|e| PayError::InternalError(format!("pg load spend rules: {e}")))?;
1035 rows.into_iter()
1036 .map(|(v,)| {
1037 serde_json::from_value(v)
1038 .map_err(|e| PayError::InternalError(format!("pg parse spend rule: {e}")))
1039 })
1040 .collect()
1041}
1042
1043#[cfg(feature = "postgres")]
1044async fn pg_load_rules_tx(
1045 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1046) -> Result<Vec<SpendLimit>, PayError> {
1047 let rows: Vec<(serde_json::Value,)> =
1048 sqlx::query_as("SELECT rule FROM spend_rules ORDER BY rule_id")
1049 .fetch_all(&mut **tx)
1050 .await
1051 .map_err(|e| PayError::InternalError(format!("pg load spend rules: {e}")))?;
1052 rows.into_iter()
1053 .map(|(v,)| {
1054 serde_json::from_value(v)
1055 .map_err(|e| PayError::InternalError(format!("pg parse spend rule: {e}")))
1056 })
1057 .collect()
1058}
1059
1060#[cfg(feature = "postgres")]
1061async fn pg_load_reservations(pool: &sqlx::PgPool) -> Result<Vec<SpendReservation>, PayError> {
1062 let rows: Vec<(serde_json::Value,)> =
1063 sqlx::query_as("SELECT reservation FROM spend_reservations ORDER BY reservation_id")
1064 .fetch_all(pool)
1065 .await
1066 .map_err(|e| PayError::InternalError(format!("pg load reservations: {e}")))?;
1067 rows.into_iter()
1068 .map(|(v,)| {
1069 serde_json::from_value(v)
1070 .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))
1071 })
1072 .collect()
1073}
1074
1075#[cfg(feature = "postgres")]
1076async fn pg_load_reservations_tx(
1077 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1078) -> Result<Vec<SpendReservation>, PayError> {
1079 let rows: Vec<(serde_json::Value,)> =
1080 sqlx::query_as("SELECT reservation FROM spend_reservations ORDER BY reservation_id")
1081 .fetch_all(&mut **tx)
1082 .await
1083 .map_err(|e| PayError::InternalError(format!("pg load reservations: {e}")))?;
1084 rows.into_iter()
1085 .map(|(v,)| {
1086 serde_json::from_value(v)
1087 .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))
1088 })
1089 .collect()
1090}
1091
1092#[cfg(feature = "postgres")]
1093async fn pg_expire_pending(
1094 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1095 now_ms: u64,
1096) -> Result<(), PayError> {
1097 let rows: Vec<(i64, serde_json::Value)> =
1099 sqlx::query_as("SELECT reservation_id, reservation FROM spend_reservations")
1100 .fetch_all(&mut **tx)
1101 .await
1102 .map_err(|e| {
1103 PayError::InternalError(format!("pg load reservations for expire: {e}"))
1104 })?;
1105
1106 for (rid, res_json) in rows {
1107 let mut reservation: SpendReservation = serde_json::from_value(res_json)
1108 .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))?;
1109 if matches!(reservation.status, ReservationStatus::Pending)
1110 && reservation.expires_at_epoch_ms <= now_ms
1111 {
1112 reservation.status = ReservationStatus::Expired;
1113 reservation.finalized_at_epoch_ms = Some(now_ms);
1114 let updated = serde_json::to_value(&reservation)
1115 .map_err(|e| PayError::InternalError(format!("serialize reservation: {e}")))?;
1116 sqlx::query("UPDATE spend_reservations SET reservation = $1 WHERE reservation_id = $2")
1117 .bind(&updated)
1118 .bind(rid)
1119 .execute(&mut **tx)
1120 .await
1121 .map_err(|e| PayError::InternalError(format!("pg expire reservation: {e}")))?;
1122 }
1123 }
1124 Ok(())
1125}
1126
1127impl SpendLedger {
1132 async fn amount_to_usd_cents(
1133 &self,
1134 network: &str,
1135 token: Option<&str>,
1136 amount_native: u64,
1137 ) -> Result<u64, PayError> {
1138 let (symbol, divisor) = token_asset(network, token).ok_or_else(|| {
1139 PayError::InvalidAmount(format!(
1140 "network '{network}' token '{token:?}' is unsupported for global-usd-cents limits"
1141 ))
1142 })?;
1143
1144 let quote = self.get_or_fetch_quote(symbol, "USD").await?;
1145 let usd = (amount_native as f64 / divisor) * quote.price;
1146 if !usd.is_finite() || usd < 0f64 {
1147 return Err(PayError::InternalError(
1148 "invalid exchange-rate conversion result".to_string(),
1149 ));
1150 }
1151 Ok((usd * 100f64).round() as u64)
1152 }
1153
1154 async fn get_or_fetch_quote(
1155 &self,
1156 base: &str,
1157 quote: &str,
1158 ) -> Result<ExchangeRateQuote, PayError> {
1159 let pair = format!(
1160 "{}/{}",
1161 base.to_ascii_uppercase(),
1162 quote.to_ascii_uppercase()
1163 );
1164 let now = now_epoch_ms();
1165
1166 #[cfg(feature = "redb")]
1168 if let SpendBackend::Redb { .. } = &self.backend {
1169 let fx_db = self.open_exchange_rate_db()?;
1170 let read_txn = fx_db
1171 .begin_read()
1172 .map_err(|e| PayError::InternalError(format!("fx begin_read: {e}")))?;
1173 if let Ok(table) = read_txn.open_table(FX_QUOTE_BY_PAIR) {
1174 if let Some(entry) = table
1175 .get(pair.as_str())
1176 .map_err(|e| PayError::InternalError(format!("fx read quote: {e}")))?
1177 {
1178 let cached: ExchangeRateQuote = decode(entry.value())?;
1179 if cached.expires_at_epoch_ms > now {
1180 return Ok(cached);
1181 }
1182 }
1183 }
1184 }
1185
1186 #[cfg(feature = "postgres")]
1188 if let SpendBackend::Postgres { pool } = &self.backend {
1189 let row: Option<(serde_json::Value,)> =
1190 sqlx::query_as("SELECT quote FROM exchange_rate_cache WHERE pair = $1")
1191 .bind(&pair)
1192 .fetch_optional(pool)
1193 .await
1194 .map_err(|e| PayError::InternalError(format!("pg fx read cache: {e}")))?;
1195 if let Some((quote_json,)) = row {
1196 let cached: ExchangeRateQuote = serde_json::from_value(quote_json)
1197 .map_err(|e| PayError::InternalError(format!("pg fx parse cache: {e}")))?;
1198 if cached.expires_at_epoch_ms > now {
1199 return Ok(cached);
1200 }
1201 }
1202 }
1203
1204 let (fetched_price, source_name) = self.fetch_exchange_rate_http(base, quote).await?;
1205 let ttl_s = self
1206 .exchange_rate
1207 .as_ref()
1208 .map(|cfg| cfg.ttl_s)
1209 .unwrap_or(300)
1210 .max(1);
1211 let new_quote = ExchangeRateQuote {
1212 pair: pair.clone(),
1213 source: source_name,
1214 price: fetched_price,
1215 fetched_at_epoch_ms: now,
1216 expires_at_epoch_ms: now.saturating_add(ttl_s.saturating_mul(1000)),
1217 };
1218
1219 #[cfg(feature = "redb")]
1221 if let SpendBackend::Redb { .. } = &self.backend {
1222 let fx_db = self.open_exchange_rate_db()?;
1223 let write_txn = fx_db
1224 .begin_write()
1225 .map_err(|e| PayError::InternalError(format!("fx begin_write: {e}")))?;
1226 let mut encoded_blobs: Vec<String> = Vec::new();
1227 {
1228 let mut table = write_txn
1229 .open_table(FX_QUOTE_BY_PAIR)
1230 .map_err(|e| PayError::InternalError(format!("fx open quote table: {e}")))?;
1231 encoded_blobs.push(encode(&new_quote)?);
1232 let encoded = encoded_blobs
1233 .last()
1234 .ok_or_else(|| PayError::InternalError("missing quote blob".to_string()))?;
1235 table
1236 .insert(pair.as_str(), encoded.as_str())
1237 .map_err(|e| PayError::InternalError(format!("fx insert quote: {e}")))?;
1238 }
1239 write_txn
1240 .commit()
1241 .map_err(|e| PayError::InternalError(format!("fx commit write: {e}")))?;
1242 }
1243
1244 #[cfg(feature = "postgres")]
1246 if let SpendBackend::Postgres { pool } = &self.backend {
1247 let quote_json = serde_json::to_value(&new_quote)
1248 .map_err(|e| PayError::InternalError(format!("serialize fx quote: {e}")))?;
1249 let _ = sqlx::query(
1250 "INSERT INTO exchange_rate_cache (pair, quote) VALUES ($1, $2) \
1251 ON CONFLICT (pair) DO UPDATE SET quote = $2",
1252 )
1253 .bind(&pair)
1254 .bind("e_json)
1255 .execute(pool)
1256 .await;
1257 }
1258
1259 Ok(new_quote)
1260 }
1261
1262 #[cfg(feature = "exchange-rate")]
1263 async fn fetch_exchange_rate_http(
1264 &self,
1265 base: &str,
1266 quote_currency: &str,
1267 ) -> Result<(f64, String), PayError> {
1268 let cfg = self.exchange_rate.as_ref().cloned().unwrap_or_default();
1269
1270 if cfg.sources.is_empty() {
1271 return Err(PayError::InvalidAmount(
1272 "exchange_rate.sources is empty — no exchange-rate API configured".to_string(),
1273 ));
1274 }
1275
1276 let client = reqwest::Client::new();
1277 let mut last_err = String::new();
1278
1279 for source in &cfg.sources {
1280 match fetch_from_source(&client, source, base, quote_currency).await {
1281 Ok(price) => return Ok((price, source.endpoint.clone())),
1282 Err(e) => {
1283 last_err =
1284 format!("{} ({}): {e}", source.endpoint, source.source_type.as_str());
1285 }
1286 }
1287 }
1288
1289 Err(PayError::NetworkError(format!(
1290 "all exchange-rate sources failed; last: {last_err}"
1291 )))
1292 }
1293
1294 #[cfg(not(feature = "exchange-rate"))]
1295 async fn fetch_exchange_rate_http(
1296 &self,
1297 _base: &str,
1298 _quote_currency: &str,
1299 ) -> Result<(f64, String), PayError> {
1300 Err(PayError::NotImplemented(
1301 "exchange-rate HTTP support is not built in this feature set".to_string(),
1302 ))
1303 }
1304}
1305
1306fn now_epoch_ms() -> u64 {
1311 std::time::SystemTime::now()
1312 .duration_since(std::time::UNIX_EPOCH)
1313 .map(|d| d.as_millis() as u64)
1314 .unwrap_or(0)
1315}
1316
1317fn token_asset(network: &str, token: Option<&str>) -> Option<(&'static str, f64)> {
1318 match token.map(|t| t.to_ascii_lowercase()).as_deref() {
1319 Some("sol") => Some(("SOL", 1e9)),
1320 Some("eth") => Some(("ETH", 1e18)),
1321 Some("usdc" | "usdt") => Some(("USD", 1e6)),
1322 Some(_) => None,
1323 None => {
1324 let p = network.to_ascii_lowercase();
1325 if p.starts_with("ln") || p == "cashu" || p == "btc" {
1326 Some(("BTC", 1e8))
1327 } else {
1328 None
1329 }
1330 }
1331 }
1332}
1333
1334fn extract_price_generic(value: &serde_json::Value) -> Option<f64> {
1335 value
1336 .get("price")
1337 .and_then(|v| v.as_f64())
1338 .or_else(|| value.get("rate").and_then(|v| v.as_f64()))
1339 .or_else(|| value.get("usd_per_base").and_then(|v| v.as_f64()))
1340 .or_else(|| {
1341 value
1342 .get("data")
1343 .and_then(|d| d.get("price"))
1344 .and_then(|v| v.as_f64())
1345 })
1346}
1347
1348impl ExchangeRateSourceType {
1349 fn as_str(self) -> &'static str {
1350 match self {
1351 Self::Generic => "generic",
1352 Self::CoinGecko => "coingecko",
1353 Self::Kraken => "kraken",
1354 }
1355 }
1356}
1357
1358fn coingecko_coin_id(symbol: &str) -> Option<&'static str> {
1359 match symbol.to_ascii_uppercase().as_str() {
1360 "BTC" => Some("bitcoin"),
1361 "SOL" => Some("solana"),
1362 "ETH" => Some("ethereum"),
1363 _ => None,
1364 }
1365}
1366
1367fn kraken_pair(symbol: &str) -> Option<&'static str> {
1368 match symbol.to_ascii_uppercase().as_str() {
1369 "BTC" => Some("XBTUSD"),
1370 "SOL" => Some("SOLUSD"),
1371 "ETH" => Some("ETHUSD"),
1372 _ => None,
1373 }
1374}
1375
1376#[cfg(feature = "exchange-rate")]
1377async fn fetch_from_source(
1378 client: &reqwest::Client,
1379 source: &crate::types::ExchangeRateSource,
1380 base: &str,
1381 quote_currency: &str,
1382) -> Result<f64, String> {
1383 type PriceExtractor = Box<dyn Fn(&serde_json::Value) -> Option<f64> + Send>;
1384 let (url, extract_fn): (String, PriceExtractor) = match source.source_type {
1385 ExchangeRateSourceType::Kraken => {
1386 let pair = kraken_pair(base)
1387 .ok_or_else(|| format!("kraken: unsupported base asset '{base}'"))?;
1388 let url = format!("{}/0/public/Ticker?pair={pair}", source.endpoint);
1389 let pair_owned = pair.to_string();
1390 (
1391 url,
1392 Box::new(move |v: &serde_json::Value| {
1393 let result = v.get("result")?;
1394 let ticker = result
1395 .get(&pair_owned)
1396 .or_else(|| result.as_object().and_then(|m| m.values().next()))?;
1397 let price_str = ticker.get("c")?.as_array()?.first()?.as_str()?;
1398 price_str.parse::<f64>().ok()
1399 }),
1400 )
1401 }
1402 ExchangeRateSourceType::CoinGecko => {
1403 let coin_id = coingecko_coin_id(base)
1404 .ok_or_else(|| format!("coingecko: unsupported base asset '{base}'"))?;
1405 let vs = quote_currency.to_ascii_lowercase();
1406 let url = format!(
1407 "{}/simple/price?ids={coin_id}&vs_currencies={vs}",
1408 source.endpoint
1409 );
1410 let coin_id_owned = coin_id.to_string();
1411 let vs_owned = vs.clone();
1412 (
1413 url,
1414 Box::new(move |v: &serde_json::Value| {
1415 v.get(&coin_id_owned)?.get(&vs_owned)?.as_f64()
1416 }),
1417 )
1418 }
1419 ExchangeRateSourceType::Generic => {
1420 let sep = if source.endpoint.contains('?') {
1421 '&'
1422 } else {
1423 '?'
1424 };
1425 let url = format!(
1426 "{}{sep}base={}"e={}",
1427 source.endpoint,
1428 base.to_ascii_uppercase(),
1429 quote_currency.to_ascii_uppercase()
1430 );
1431 (url, Box::new(extract_price_generic))
1432 }
1433 };
1434
1435 let mut req = client.get(&url);
1436 if let Some(key) = &source.api_key {
1437 req = req.header("Authorization", format!("Bearer {key}"));
1438 req = req.header("X-Api-Key", key);
1439 }
1440
1441 let resp = req
1442 .send()
1443 .await
1444 .map_err(|e| format!("request failed: {e}"))?;
1445 if !resp.status().is_success() {
1446 return Err(format!("status {}", resp.status()));
1447 }
1448
1449 let value: serde_json::Value = resp
1450 .json()
1451 .await
1452 .map_err(|e| format!("parse failed: {e}"))?;
1453
1454 extract_fn(&value).ok_or_else(|| "could not extract price from response".to_string())
1455}
1456
1457fn encode<T: Serialize>(value: &T) -> Result<String, PayError> {
1458 serde_json::to_string(value)
1459 .map_err(|e| PayError::InternalError(format!("spend encode failed: {e}")))
1460}
1461
1462fn decode<T: DeserializeOwned>(encoded: &str) -> Result<T, PayError> {
1463 serde_json::from_str(encoded).map_err(|e| {
1464 let preview_len = encoded.len().min(48);
1465 let preview = &encoded[..preview_len];
1466 PayError::InternalError(format!(
1467 "spend decode failed (len={}, preview={}): {e}",
1468 encoded.len(),
1469 preview
1470 ))
1471 })
1472}
1473
1474fn prepend_err(prefix: &str, err: PayError) -> PayError {
1475 match err {
1476 PayError::InternalError(msg) => PayError::InternalError(format!("{prefix}: {msg}")),
1477 other => other,
1478 }
1479}
1480
1481fn generate_rule_identifier() -> Result<String, PayError> {
1482 let mut buf = [0u8; 4];
1483 getrandom::fill(&mut buf).map_err(|e| PayError::InternalError(format!("rng failed: {e}")))?;
1484 Ok(format!("r_{}", hex::encode(buf)))
1485}
1486
1487fn validate_limit(
1488 rule: &SpendLimit,
1489 exchange_rate: Option<&ExchangeRateConfig>,
1490) -> Result<(), PayError> {
1491 if rule.window_s == 0 {
1492 return Err(PayError::InvalidAmount(
1493 "limit rule has zero window_s".to_string(),
1494 ));
1495 }
1496 if rule.max_spend == 0 {
1497 return Err(PayError::InvalidAmount(
1498 "limit rule has zero max_spend".to_string(),
1499 ));
1500 }
1501
1502 match rule.scope {
1503 SpendScope::GlobalUsdCents => {
1504 if rule.network.is_some() || rule.wallet.is_some() {
1505 return Err(PayError::InvalidAmount(
1506 "scope=global-usd-cents cannot set network/wallet".to_string(),
1507 ));
1508 }
1509 if rule.token.is_some() {
1510 return Err(PayError::InvalidAmount(
1511 "scope=global-usd-cents cannot set token".to_string(),
1512 ));
1513 }
1514 }
1515 SpendScope::Network => {
1516 if rule.network.as_deref().unwrap_or("").trim().is_empty() {
1517 return Err(PayError::InvalidAmount(
1518 "scope=network requires network".to_string(),
1519 ));
1520 }
1521 if rule.wallet.is_some() {
1522 return Err(PayError::InvalidAmount(
1523 "scope=network cannot set wallet".to_string(),
1524 ));
1525 }
1526 }
1527 SpendScope::Wallet => {
1528 if rule.network.as_deref().unwrap_or("").trim().is_empty() {
1529 return Err(PayError::InvalidAmount(
1530 "scope=wallet requires network".to_string(),
1531 ));
1532 }
1533 if rule.wallet.as_deref().unwrap_or("").trim().is_empty() {
1534 return Err(PayError::InvalidAmount(
1535 "scope=wallet requires wallet".to_string(),
1536 ));
1537 }
1538 }
1539 }
1540
1541 if rule.scope == SpendScope::GlobalUsdCents && exchange_rate.is_none() {
1542 return Err(PayError::InvalidAmount(
1543 "scope=global-usd-cents requires config.exchange_rate".to_string(),
1544 ));
1545 }
1546 Ok(())
1547}
1548
1549#[cfg(feature = "redb")]
1550fn load_rules(read_txn: &redb::ReadTransaction) -> Result<Vec<SpendLimit>, PayError> {
1551 let Ok(rule_table) = read_txn.open_table(RULE_BY_ID) else {
1552 return Ok(vec![]);
1553 };
1554 rule_table
1555 .iter()
1556 .map_err(|e| PayError::InternalError(format!("spend iterate rules: {e}")))?
1557 .map(|entry| {
1558 let (_k, v) = entry
1559 .map_err(|e| PayError::InternalError(format!("spend read rule entry: {e}")))?;
1560 decode::<SpendLimit>(v.value()).map_err(|e| prepend_err("spend decode rule", e))
1561 })
1562 .collect()
1563}
1564
1565#[cfg(feature = "redb")]
1566fn load_reservations(read_txn: &redb::ReadTransaction) -> Result<Vec<SpendReservation>, PayError> {
1567 let Ok(table) = read_txn.open_table(RESERVATION_BY_ID) else {
1568 return Ok(vec![]);
1569 };
1570 table
1571 .iter()
1572 .map_err(|e| PayError::InternalError(format!("spend iterate reservations: {e}")))?
1573 .map(|entry| {
1574 let (_k, v) = entry
1575 .map_err(|e| PayError::InternalError(format!("spend read reservation: {e}")))?;
1576 decode::<SpendReservation>(v.value())
1577 .map_err(|e| prepend_err("spend decode reservation", e))
1578 })
1579 .collect()
1580}
1581
1582#[cfg(feature = "redb")]
1583fn expire_pending(_table: &mut redb::Table<u64, &str>, _now_ms: u64) -> Result<(), PayError> {
1584 Ok(())
1585}
1586
1587fn amount_for_rule(
1588 _rule: &SpendLimit,
1589 amount_native: u64,
1590 amount_usd_cents: Option<u64>,
1591 use_usd: bool,
1592) -> Result<u64, PayError> {
1593 if use_usd {
1594 amount_usd_cents.ok_or_else(|| {
1595 PayError::InternalError("missing USD amount for non-native unit rule".to_string())
1596 })
1597 } else {
1598 Ok(amount_native)
1599 }
1600}
1601
1602fn reservation_active_for_window(r: &SpendReservation, now_ms: u64) -> bool {
1603 match r.status {
1604 ReservationStatus::Confirmed => true,
1605 ReservationStatus::Pending => r.expires_at_epoch_ms > now_ms,
1606 ReservationStatus::Cancelled | ReservationStatus::Expired => false,
1607 }
1608}
1609
1610fn rule_matches_context(
1611 rule: &SpendLimit,
1612 network: &str,
1613 wallet: Option<&str>,
1614 token: Option<&str>,
1615) -> bool {
1616 if let Some(rule_token) = &rule.token {
1617 match token {
1618 Some(ctx_token) if ctx_token.eq_ignore_ascii_case(rule_token) => {}
1619 _ => return false,
1620 }
1621 }
1622 match rule.scope {
1623 SpendScope::GlobalUsdCents => true,
1624 SpendScope::Network => rule.network.as_deref() == Some(network),
1625 SpendScope::Wallet => {
1626 rule.network.as_deref() == Some(network) && rule.wallet.as_deref() == wallet
1627 }
1628 }
1629}
1630
1631fn scope_key(rule: &SpendLimit) -> String {
1632 match rule.scope {
1633 SpendScope::GlobalUsdCents => "global-usd-cents".to_string(),
1634 SpendScope::Network => rule.network.clone().unwrap_or_default(),
1635 SpendScope::Wallet => format!(
1636 "{}/{}",
1637 rule.network.clone().unwrap_or_default(),
1638 rule.wallet.clone().unwrap_or_default()
1639 ),
1640 }
1641}
1642
1643fn spent_in_window(
1644 rule: &SpendLimit,
1645 reservations: &[SpendReservation],
1646 now_ms: u64,
1647 use_usd: bool,
1648) -> Result<(u64, Option<u64>), PayError> {
1649 let window_ms = rule.window_s.saturating_mul(1000);
1650 let cutoff = now_ms.saturating_sub(window_ms);
1651
1652 let mut spent = 0u64;
1653 let mut oldest: Option<u64> = None;
1654
1655 for r in reservations {
1656 if !reservation_active_for_window(r, now_ms) {
1657 continue;
1658 }
1659 if r.created_at_epoch_ms < cutoff {
1660 continue;
1661 }
1662 if !rule_matches_context(rule, &r.network, r.wallet.as_deref(), r.token.as_deref()) {
1663 continue;
1664 }
1665
1666 let amount = if use_usd {
1667 r.amount_usd_cents.ok_or_else(|| {
1668 PayError::InternalError("reservation missing USD amount".to_string())
1669 })?
1670 } else {
1671 r.amount_native
1672 };
1673 spent = spent.saturating_add(amount);
1674 oldest = Some(oldest.map_or(r.created_at_epoch_ms, |v| v.min(r.created_at_epoch_ms)));
1675 }
1676
1677 Ok((spent, oldest))
1678}
1679
1680#[cfg(feature = "redb")]
1681fn next_counter(write_txn: &redb::WriteTransaction, key: &str) -> Result<u64, PayError> {
1682 let mut meta = write_txn
1683 .open_table(META_COUNTER)
1684 .map_err(|e| PayError::InternalError(format!("spend open meta table: {e}")))?;
1685 let current = match meta
1686 .get(key)
1687 .map_err(|e| PayError::InternalError(format!("spend read counter {key}: {e}")))?
1688 {
1689 Some(v) => v.value(),
1690 None => 0,
1691 };
1692 let next = current.saturating_add(1);
1693 meta.insert(key, next)
1694 .map_err(|e| PayError::InternalError(format!("spend write counter {key}: {e}")))?;
1695 Ok(next)
1696}
1697
1698#[cfg(test)]
1699mod tests {
1700 use super::*;
1701
1702 fn make_limit(scope: SpendScope, network: Option<&str>, wallet: Option<&str>) -> SpendLimit {
1703 SpendLimit {
1704 rule_id: None,
1705 scope,
1706 network: network.map(|s| s.to_string()),
1707 wallet: wallet.map(|s| s.to_string()),
1708 window_s: 3600,
1709 max_spend: 1000,
1710 token: None,
1711 }
1712 }
1713
1714 #[cfg(feature = "redb")]
1715 #[tokio::test]
1716 async fn provider_limit_reserve_and_confirm() {
1717 let tmp = tempfile::tempdir().unwrap();
1718 let ledger = SpendLedger::new(tmp.path().to_str().unwrap(), None);
1719
1720 ledger
1721 .set_limits(&[make_limit(SpendScope::Network, Some("cashu"), None)])
1722 .await
1723 .unwrap();
1724
1725 let ctx = SpendContext {
1726 network: "cashu".to_string(),
1727 wallet: Some("w_01".to_string()),
1728 amount_native: 400,
1729 token: None,
1730 };
1731 let r1 = ledger.reserve("op_1", &ctx).await.unwrap();
1732 ledger.confirm(r1).await.unwrap();
1733
1734 let r2 = ledger.reserve("op_2", &ctx).await.unwrap();
1735 let err = ledger.reserve("op_3", &ctx).await.unwrap_err();
1736 assert!(matches!(err, PayError::LimitExceeded { .. }));
1737
1738 ledger.cancel(r2).await.unwrap();
1739 }
1740
1741 #[cfg(feature = "redb")]
1742 #[tokio::test]
1743 async fn wallet_scope_requires_wallet_context() {
1744 let tmp = tempfile::tempdir().unwrap();
1745 let ledger = SpendLedger::new(tmp.path().to_str().unwrap(), None);
1746
1747 ledger
1748 .set_limits(&[make_limit(SpendScope::Wallet, Some("cashu"), Some("w_abc"))])
1749 .await
1750 .unwrap();
1751
1752 let ctx = SpendContext {
1753 network: "cashu".to_string(),
1754 wallet: None,
1755 amount_native: 1,
1756 token: None,
1757 };
1758 let err = ledger.reserve("op_1", &ctx).await.unwrap_err();
1759 assert!(matches!(err, PayError::InvalidAmount(_)));
1760 }
1761
1762 #[tokio::test]
1763 async fn global_usd_cents_scope_requires_exchange_rate_config() {
1764 let tmp = tempfile::tempdir().unwrap();
1765 let ledger = SpendLedger::new(tmp.path().to_str().unwrap(), None);
1766
1767 let err = ledger
1768 .set_limits(&[SpendLimit {
1769 rule_id: None,
1770 scope: SpendScope::GlobalUsdCents,
1771 network: None,
1772 wallet: None,
1773 window_s: 3600,
1774 max_spend: 100,
1775 token: None,
1776 }])
1777 .await
1778 .unwrap_err();
1779
1780 assert!(matches!(err, PayError::InvalidAmount(_)));
1781 }
1782
1783 #[cfg(feature = "redb")]
1784 #[tokio::test]
1785 async fn network_scope_native_token_ok_without_exchange_rate() {
1786 let tmp = tempfile::tempdir().unwrap();
1787 let ledger = SpendLedger::new(tmp.path().to_str().unwrap(), None);
1788
1789 ledger
1790 .set_limits(&[SpendLimit {
1791 rule_id: None,
1792 scope: SpendScope::Network,
1793 network: Some("cashu".to_string()),
1794 wallet: None,
1795 window_s: 3600,
1796 max_spend: 100,
1797 token: None,
1798 }])
1799 .await
1800 .expect("network scope should not require exchange_rate");
1801 }
1802}