1#![cfg_attr(not(any(feature = "redb", feature = "postgres")), allow(dead_code))]
2
3pub mod tokens;
4
5use crate::provider::PayError;
6#[cfg(feature = "exchange-rate")]
7use crate::types::ExchangeRateSourceType;
8use crate::types::{ExchangeRateConfig, SpendLimit, SpendLimitStatus, SpendScope};
9#[cfg(feature = "redb")]
10use serde::de::DeserializeOwned;
11use serde::{Deserialize, Serialize};
12use std::collections::hash_map::DefaultHasher;
13use std::hash::{Hash, Hasher};
14use tokio::sync::Mutex;
15
16#[cfg(feature = "redb")]
17use crate::store::db;
18#[cfg(feature = "redb")]
19use redb::{Database, ReadableDatabase, ReadableTable, TableDefinition};
20#[cfg(feature = "redb")]
21use std::path::{Path, PathBuf};
22
23#[cfg(feature = "redb")]
24const META_COUNTER: TableDefinition<&str, u64> = TableDefinition::new("meta_counter");
25#[cfg(feature = "redb")]
26const RULE_BY_ID: TableDefinition<&str, &str> = TableDefinition::new("rule_by_id_v3");
27#[cfg(feature = "redb")]
28const RESERVATION_BY_ID: TableDefinition<u64, &str> = TableDefinition::new("reservation_by_id");
29#[cfg(feature = "redb")]
30const RESERVATION_ID_BY_OP_ID: TableDefinition<&str, u64> =
31 TableDefinition::new("reservation_id_by_op_id");
32#[cfg(feature = "redb")]
33const SPEND_EVENT_BY_ID: TableDefinition<u64, &str> = TableDefinition::new("spend_event_by_id");
34#[cfg(feature = "redb")]
35const FX_QUOTE_BY_PAIR: TableDefinition<&str, &str> = TableDefinition::new("quote_by_pair");
36#[cfg(feature = "redb")]
37const NEXT_RESERVATION_ID_KEY: &str = "next_reservation_id";
38#[cfg(feature = "redb")]
39const NEXT_EVENT_ID_KEY: &str = "next_event_id";
40#[cfg(feature = "redb")]
41const SPEND_VERSION: u64 = 1;
42#[cfg(feature = "redb")]
43const FX_CACHE_VERSION: u64 = 1;
44
45#[derive(Debug, Clone, Hash)]
46pub struct SpendContext {
47 pub network: String,
48 pub wallet: Option<String>,
49 pub amount_native: u64,
50 pub token: Option<String>,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
54enum ReservationStatus {
55 Pending,
56 Confirmed,
57 Cancelled,
58 Expired,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
62struct SpendReservation {
63 reservation_id: u64,
64 op_id: String,
65 network: String,
66 wallet: Option<String>,
67 #[serde(default)]
68 token: Option<String>,
69 amount_native: u64,
70 amount_usd_cents: Option<u64>,
71 status: ReservationStatus,
72 created_at_epoch_ms: u64,
73 expires_at_epoch_ms: u64,
74 finalized_at_epoch_ms: Option<u64>,
75 #[serde(default, skip_serializing_if = "Option::is_none")]
76 request_hash: Option<String>,
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
80struct SpendEvent {
81 event_id: u64,
82 reservation_id: u64,
83 op_id: String,
84 network: String,
85 wallet: Option<String>,
86 #[serde(default)]
87 token: Option<String>,
88 amount_native: u64,
89 amount_usd_cents: Option<u64>,
90 created_at_epoch_ms: u64,
91 confirmed_at_epoch_ms: u64,
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
95struct ExchangeRateQuote {
96 pair: String,
97 source: String,
98 price: f64,
99 fetched_at_epoch_ms: u64,
100 expires_at_epoch_ms: u64,
101}
102
103#[allow(dead_code)] enum SpendBackend {
109 #[cfg(feature = "redb")]
110 Redb {
111 data_dir: String,
112 },
113 #[cfg(feature = "postgres")]
114 Postgres {
115 pool: sqlx::PgPool,
116 },
117 None,
118}
119
120pub struct SpendLedger {
125 backend: SpendBackend,
126 exchange_rate: Option<ExchangeRateConfig>,
127 mu: Mutex<()>,
128 fx_stale_warned: std::sync::atomic::AtomicBool,
130}
131
132impl SpendLedger {
133 pub fn new(data_dir: &str, exchange_rate: Option<ExchangeRateConfig>) -> Self {
134 #[cfg(feature = "redb")]
135 let backend = SpendBackend::Redb {
136 data_dir: data_dir.to_string(),
137 };
138 #[cfg(not(feature = "redb"))]
139 let backend = {
140 let _ = data_dir;
141 SpendBackend::None
142 };
143 Self {
144 backend,
145 exchange_rate,
146 mu: Mutex::new(()),
147 fx_stale_warned: std::sync::atomic::AtomicBool::new(false),
148 }
149 }
150
151 #[cfg(feature = "postgres")]
152 pub fn new_postgres(pool: sqlx::PgPool, exchange_rate: Option<ExchangeRateConfig>) -> Self {
153 Self {
154 backend: SpendBackend::Postgres { pool },
155 exchange_rate,
156 mu: Mutex::new(()),
157 fx_stale_warned: std::sync::atomic::AtomicBool::new(false),
158 }
159 }
160
161 pub fn take_fx_stale_warning(&self) -> bool {
163 self.fx_stale_warned
164 .swap(false, std::sync::atomic::Ordering::Relaxed)
165 }
166
167 pub async fn add_limit(&self, limit: &mut SpendLimit) -> Result<String, PayError> {
169 normalize_limit(limit);
170 validate_limit(limit, self.exchange_rate.as_ref())?;
171
172 let _guard = self.mu.lock().await;
173
174 match &self.backend {
175 #[cfg(feature = "redb")]
176 SpendBackend::Redb { .. } => self.add_limit_redb(limit),
177 #[cfg(feature = "postgres")]
178 SpendBackend::Postgres { .. } => self.add_limit_postgres(limit).await,
179 SpendBackend::None => Err(PayError::NotImplemented(
180 "no storage backend for spend limits".to_string(),
181 )),
182 }
183 }
184
185 pub async fn remove_limit(&self, _rule_id: &str) -> Result<(), PayError> {
187 let _guard = self.mu.lock().await;
188
189 match &self.backend {
190 #[cfg(feature = "redb")]
191 SpendBackend::Redb { .. } => self.remove_limit_redb(_rule_id),
192 #[cfg(feature = "postgres")]
193 SpendBackend::Postgres { .. } => self.remove_limit_postgres(_rule_id).await,
194 SpendBackend::None => Err(PayError::NotImplemented(
195 "no storage backend for spend limits".to_string(),
196 )),
197 }
198 }
199
200 pub async fn set_limits(&self, limits: &[SpendLimit]) -> Result<(), PayError> {
202 let mut limits = limits.to_vec();
203 for limit in &mut limits {
204 normalize_limit(limit);
205 validate_limit(limit, self.exchange_rate.as_ref())?;
206 }
207
208 let _guard = self.mu.lock().await;
209
210 match &self.backend {
211 #[cfg(feature = "redb")]
212 SpendBackend::Redb { .. } => self.set_limits_redb(&limits),
213 #[cfg(feature = "postgres")]
214 SpendBackend::Postgres { .. } => self.set_limits_postgres(&limits).await,
215 SpendBackend::None => Err(PayError::NotImplemented(
216 "no storage backend for spend limits".to_string(),
217 )),
218 }
219 }
220
221 pub async fn get_status(&self) -> Result<Vec<SpendLimitStatus>, PayError> {
223 let _guard = self.mu.lock().await;
224
225 match &self.backend {
226 #[cfg(feature = "redb")]
227 SpendBackend::Redb { .. } => self.get_status_redb(),
228 #[cfg(feature = "postgres")]
229 SpendBackend::Postgres { .. } => self.get_status_postgres().await,
230 SpendBackend::None => Ok(Vec::new()),
231 }
232 }
233
234 pub async fn reserve(&self, op_id: &str, ctx: &SpendContext) -> Result<u64, PayError> {
236 if op_id.trim().is_empty() {
237 return Err(PayError::InvalidAmount("op_id cannot be empty".to_string()));
238 }
239 if ctx.network.trim().is_empty() {
240 return Err(PayError::InvalidAmount(
241 "network cannot be empty for spend check".to_string(),
242 ));
243 }
244 let request_hash = spend_request_hash(op_id, ctx);
245
246 let _guard = self.mu.lock().await;
247
248 match &self.backend {
249 #[cfg(feature = "redb")]
250 SpendBackend::Redb { .. } => self.reserve_redb(op_id, ctx, &request_hash).await,
251 #[cfg(feature = "postgres")]
252 SpendBackend::Postgres { .. } => self.reserve_postgres(op_id, ctx, &request_hash).await,
253 SpendBackend::None => Err(PayError::NotImplemented(
254 "no storage backend for spend limits".to_string(),
255 )),
256 }
257 }
258
259 pub async fn confirm(&self, _reservation_id: u64) -> Result<(), PayError> {
260 let _guard = self.mu.lock().await;
261
262 match &self.backend {
263 #[cfg(feature = "redb")]
264 SpendBackend::Redb { .. } => self.confirm_redb(_reservation_id),
265 #[cfg(feature = "postgres")]
266 SpendBackend::Postgres { .. } => self.confirm_postgres(_reservation_id).await,
267 SpendBackend::None => Err(PayError::NotImplemented(
268 "no storage backend for spend limits".to_string(),
269 )),
270 }
271 }
272
273 pub async fn cancel(&self, _reservation_id: u64) -> Result<(), PayError> {
274 let _guard = self.mu.lock().await;
275
276 match &self.backend {
277 #[cfg(feature = "redb")]
278 SpendBackend::Redb { .. } => self.cancel_redb(_reservation_id),
279 #[cfg(feature = "postgres")]
280 SpendBackend::Postgres { .. } => self.cancel_postgres(_reservation_id).await,
281 SpendBackend::None => Ok(()),
282 }
283 }
284}
285
286#[cfg(feature = "redb")]
291impl SpendLedger {
292 fn spend_db_path(&self) -> PathBuf {
293 match &self.backend {
294 SpendBackend::Redb { data_dir } => Path::new(data_dir).join("spend").join("spend.redb"),
295 #[allow(unreachable_patterns)]
296 _ => PathBuf::new(),
297 }
298 }
299
300 fn exchange_rate_db_path(&self) -> PathBuf {
301 match &self.backend {
302 SpendBackend::Redb { data_dir } => Path::new(data_dir)
303 .join("spend")
304 .join("exchange-rate-cache.redb"),
305 #[allow(unreachable_patterns)]
306 _ => PathBuf::new(),
307 }
308 }
309
310 fn open_spend_db(&self) -> Result<Database, PayError> {
311 db::open_and_migrate(
312 &self.spend_db_path(),
313 SPEND_VERSION,
314 &[
315 &|_db: &Database| Ok(()),
317 ],
318 )
319 }
320
321 fn open_exchange_rate_db(&self) -> Result<Database, PayError> {
322 db::open_and_migrate(
323 &self.exchange_rate_db_path(),
324 FX_CACHE_VERSION,
325 &[
326 &|_db: &Database| Ok(()),
328 ],
329 )
330 }
331
332 fn add_limit_redb(&self, limit: &mut SpendLimit) -> Result<String, PayError> {
333 let db = self.open_spend_db()?;
334 let rule_id = generate_rule_identifier()?;
335 limit.rule_id = Some(rule_id.clone());
336 let encoded = encode(limit)?;
337 let write_txn = db
338 .begin_write()
339 .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
340 {
341 let mut rule_table = write_txn
342 .open_table(RULE_BY_ID)
343 .map_err(|e| PayError::InternalError(format!("spend open rule table: {e}")))?;
344 rule_table
345 .insert(rule_id.as_str(), encoded.as_str())
346 .map_err(|e| PayError::InternalError(format!("spend insert rule: {e}")))?;
347 }
348 write_txn
349 .commit()
350 .map_err(|e| PayError::InternalError(format!("spend commit add_limit: {e}")))?;
351 Ok(rule_id)
352 }
353
354 fn remove_limit_redb(&self, rule_id: &str) -> Result<(), PayError> {
355 let db = self.open_spend_db()?;
356 let write_txn = db
357 .begin_write()
358 .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
359 {
360 let mut rule_table = write_txn
361 .open_table(RULE_BY_ID)
362 .map_err(|e| PayError::InternalError(format!("spend open rule table: {e}")))?;
363 let existed = rule_table
364 .remove(rule_id)
365 .map_err(|e| PayError::InternalError(format!("spend remove rule: {e}")))?;
366 if existed.is_none() {
367 return Err(PayError::InvalidAmount(format!(
368 "rule_id '{rule_id}' not found"
369 )));
370 }
371 }
372 write_txn
373 .commit()
374 .map_err(|e| PayError::InternalError(format!("spend commit remove_limit: {e}")))
375 }
376
377 fn set_limits_redb(&self, limits: &[SpendLimit]) -> Result<(), PayError> {
378 let db = self.open_spend_db()?;
379 let write_txn = db
380 .begin_write()
381 .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
382 {
383 let mut rule_table = write_txn
384 .open_table(RULE_BY_ID)
385 .map_err(|e| PayError::InternalError(format!("spend open rule table: {e}")))?;
386 let existing_ids = rule_table
388 .iter()
389 .map_err(|e| PayError::InternalError(format!("spend iterate rules: {e}")))?
390 .map(|entry| {
391 entry
392 .map(|(k, _)| k.value().to_string())
393 .map_err(|e| PayError::InternalError(format!("spend read rule key: {e}")))
394 })
395 .collect::<Result<Vec<_>, _>>()?;
396 for rid in existing_ids {
397 rule_table
398 .remove(rid.as_str())
399 .map_err(|e| PayError::InternalError(format!("spend remove rule: {e}")))?;
400 }
401
402 for limit in limits {
404 let mut rule = limit.clone();
405 let rid = generate_rule_identifier()?;
406 rule.rule_id = Some(rid.clone());
407 let encoded = encode(&rule)?;
408 rule_table
409 .insert(rid.as_str(), encoded.as_str())
410 .map_err(|e| PayError::InternalError(format!("spend insert rule: {e}")))?;
411 }
412 }
413 write_txn
414 .commit()
415 .map_err(|e| PayError::InternalError(format!("spend commit set_limits: {e}")))
416 }
417
418 fn get_status_redb(&self) -> Result<Vec<SpendLimitStatus>, PayError> {
419 let db = self.open_spend_db()?;
420 let read_txn = db
421 .begin_read()
422 .map_err(|e| PayError::InternalError(format!("spend begin_read: {e}")))?;
423 let rules = load_rules(&read_txn)?;
424 let reservations = load_reservations(&read_txn)?;
425 let now = now_epoch_ms();
426 let mut out = Vec::with_capacity(rules.len());
427 for rule in rules {
428 let use_usd = rule.scope == SpendScope::GlobalUsdCents;
429 let (spent, oldest_ts) = spent_in_window(&rule, &reservations, now, use_usd)?;
430 let remaining = rule.max_spend.saturating_sub(spent);
431 let window_ms = rule.window_s.saturating_mul(1000);
432 let window_reset_s = oldest_ts
433 .map(|oldest| (oldest.saturating_add(window_ms)).saturating_sub(now) / 1000)
434 .unwrap_or(0);
435 out.push(SpendLimitStatus {
436 rule_id: rule.rule_id.clone().unwrap_or_default(),
437 scope: rule.scope,
438 network: rule.network.clone(),
439 wallet: rule.wallet.clone(),
440 window_s: rule.window_s,
441 max_spend: rule.max_spend,
442 spent,
443 remaining,
444 token: rule.token.clone(),
445 window_reset_s,
446 });
447 }
448 Ok(out)
449 }
450
451 async fn reserve_redb(
452 &self,
453 op_id: &str,
454 ctx: &SpendContext,
455 request_hash: &str,
456 ) -> Result<u64, PayError> {
457 let now = now_epoch_ms();
458 let db = self.open_spend_db()?;
459
460 let read_txn = db
461 .begin_read()
462 .map_err(|e| PayError::InternalError(format!("spend begin_read: {e}")))?;
463 let rules = load_rules(&read_txn)?;
464
465 if rules.iter().any(|r| {
466 r.scope == SpendScope::Wallet
467 && r.network.as_deref() == Some(ctx.network.as_str())
468 && ctx.wallet.is_none()
469 }) {
470 return Err(PayError::InvalidAmount(
471 "wallet-scoped limits require an explicit wallet".to_string(),
472 ));
473 }
474
475 let needs_usd = rules.iter().any(|r| r.scope == SpendScope::GlobalUsdCents);
477 let amount_usd_cents = if needs_usd {
478 Some(
479 self.amount_to_usd_cents(&ctx.network, ctx.token.as_deref(), ctx.amount_native)
480 .await?,
481 )
482 } else {
483 None
484 };
485
486 let write_txn = db
487 .begin_write()
488 .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
489
490 let mut encoded_blobs: Vec<String> = Vec::new();
491 let reservation_id = {
492 let mut reservation_index =
493 write_txn.open_table(RESERVATION_ID_BY_OP_ID).map_err(|e| {
494 PayError::InternalError(format!("spend open reservation op index: {e}"))
495 })?;
496 if let Some(existing) = reservation_index
497 .get(op_id)
498 .map_err(|e| PayError::InternalError(format!("spend read op index: {e}")))?
499 {
500 let existing_id = existing.value();
501 let reservation_table = write_txn.open_table(RESERVATION_BY_ID).map_err(|e| {
502 PayError::InternalError(format!("spend open reservation table: {e}"))
503 })?;
504 let status = reservation_table
505 .get(existing_id)
506 .map_err(|e| PayError::InternalError(format!("spend read reservation: {e}")))?
507 .map(|value| decode::<SpendReservation>(value.value()))
508 .transpose()?
509 .map(|reservation| reservation.status)
510 .unwrap_or(ReservationStatus::Pending);
511 return Err(duplicate_reservation_error(op_id, existing_id, &status));
512 }
513
514 let mut reservation_table = write_txn.open_table(RESERVATION_BY_ID).map_err(|e| {
515 PayError::InternalError(format!("spend open reservation table: {e}"))
516 })?;
517
518 expire_pending(&mut reservation_table, now)?;
519
520 let reservations = reservation_table
521 .iter()
522 .map_err(|e| PayError::InternalError(format!("spend iterate reservations: {e}")))?
523 .map(|entry| {
524 let (_k, v) = entry.map_err(|e| {
525 PayError::InternalError(format!("spend read reservation: {e}"))
526 })?;
527 decode::<SpendReservation>(v.value())
528 .map_err(|e| prepend_err("spend decode reservation", e))
529 })
530 .collect::<Result<Vec<_>, _>>()?;
531
532 for rule in rules.iter() {
533 if !rule_matches_context(
534 rule,
535 &ctx.network,
536 ctx.wallet.as_deref(),
537 ctx.token.as_deref(),
538 ) {
539 continue;
540 }
541
542 let use_usd = rule.scope == SpendScope::GlobalUsdCents;
543 let candidate_amount =
544 amount_for_rule(rule, ctx.amount_native, amount_usd_cents, use_usd)?;
545 let (spent, oldest_ts) = spent_in_window(rule, &reservations, now, use_usd)?;
546 if spent.saturating_add(candidate_amount) > rule.max_spend {
547 let window_ms = rule.window_s.saturating_mul(1000);
548 let remaining_s = oldest_ts
549 .map(|oldest| (oldest.saturating_add(window_ms)).saturating_sub(now) / 1000)
550 .unwrap_or(0);
551
552 return Err(PayError::LimitExceeded {
553 rule_id: rule.rule_id.clone().unwrap_or_default(),
554 scope: rule.scope,
555 scope_key: scope_key(rule),
556 spent,
557 max_spend: rule.max_spend,
558 token: rule.token.clone(),
559 remaining_s,
560 origin: None,
561 });
562 }
563 }
564
565 let reservation_id = next_counter(&write_txn, NEXT_RESERVATION_ID_KEY)?;
566 let reservation = SpendReservation {
567 reservation_id,
568 op_id: op_id.to_string(),
569 network: ctx.network.clone(),
570 wallet: ctx.wallet.clone(),
571 token: ctx.token.clone(),
572 amount_native: ctx.amount_native,
573 amount_usd_cents,
574 status: ReservationStatus::Pending,
575 created_at_epoch_ms: now,
576 expires_at_epoch_ms: now.saturating_add(300_000),
577 finalized_at_epoch_ms: None,
578 request_hash: Some(request_hash.to_string()),
579 };
580 encoded_blobs.push(encode(&reservation)?);
581 let encoded = encoded_blobs
582 .last()
583 .ok_or_else(|| PayError::InternalError("missing reservation blob".to_string()))?;
584 reservation_table
585 .insert(reservation_id, encoded.as_str())
586 .map_err(|e| PayError::InternalError(format!("spend insert reservation: {e}")))?;
587 reservation_index
588 .insert(op_id, reservation_id)
589 .map_err(|e| PayError::InternalError(format!("spend insert op index: {e}")))?;
590 reservation_id
591 };
592
593 write_txn
594 .commit()
595 .map_err(|e| PayError::InternalError(format!("spend commit reserve: {e}")))?;
596 Ok(reservation_id)
597 }
598
599 fn confirm_redb(&self, reservation_id: u64) -> Result<(), PayError> {
600 let db = self.open_spend_db()?;
601 let now = now_epoch_ms();
602
603 let write_txn = db
604 .begin_write()
605 .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
606
607 let mut encoded_blobs: Vec<String> = Vec::new();
608 {
609 let mut reservation_table = write_txn.open_table(RESERVATION_BY_ID).map_err(|e| {
610 PayError::InternalError(format!("spend open reservation table: {e}"))
611 })?;
612 let Some(existing_bytes) = reservation_table
613 .get(reservation_id)
614 .map_err(|e| PayError::InternalError(format!("spend read reservation: {e}")))?
615 .map(|g| g.value().to_string())
616 else {
617 return Err(PayError::InternalError(format!(
618 "reservation {reservation_id} not found"
619 )));
620 };
621
622 let mut reservation: SpendReservation = decode(&existing_bytes)?;
623 if !matches!(reservation.status, ReservationStatus::Pending) {
624 return Ok(());
625 }
626
627 reservation.status = ReservationStatus::Confirmed;
628 reservation.finalized_at_epoch_ms = Some(now);
629 encoded_blobs.push(encode(&reservation)?);
630 let encoded = encoded_blobs
631 .last()
632 .ok_or_else(|| PayError::InternalError("missing reservation blob".to_string()))?;
633 reservation_table
634 .insert(reservation_id, encoded.as_str())
635 .map_err(|e| PayError::InternalError(format!("spend update reservation: {e}")))?;
636
637 let mut events = write_txn
638 .open_table(SPEND_EVENT_BY_ID)
639 .map_err(|e| PayError::InternalError(format!("spend open event table: {e}")))?;
640 let event_id = next_counter(&write_txn, NEXT_EVENT_ID_KEY)?;
641 let event = SpendEvent {
642 event_id,
643 reservation_id,
644 op_id: reservation.op_id,
645 network: reservation.network,
646 wallet: reservation.wallet,
647 token: reservation.token,
648 amount_native: reservation.amount_native,
649 amount_usd_cents: reservation.amount_usd_cents,
650 created_at_epoch_ms: reservation.created_at_epoch_ms,
651 confirmed_at_epoch_ms: now,
652 };
653 encoded_blobs.push(encode(&event)?);
654 let encoded_event = encoded_blobs
655 .last()
656 .ok_or_else(|| PayError::InternalError("missing event blob".to_string()))?;
657 events
658 .insert(event_id, encoded_event.as_str())
659 .map_err(|e| PayError::InternalError(format!("spend insert event: {e}")))?;
660 }
661
662 write_txn
663 .commit()
664 .map_err(|e| PayError::InternalError(format!("spend commit confirm: {e}")))
665 }
666
667 fn cancel_redb(&self, reservation_id: u64) -> Result<(), PayError> {
668 let db = self.open_spend_db()?;
669 let now = now_epoch_ms();
670
671 let write_txn = db
672 .begin_write()
673 .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
674
675 let mut encoded_blobs: Vec<String> = Vec::new();
676 {
677 let mut reservation_table = write_txn.open_table(RESERVATION_BY_ID).map_err(|e| {
678 PayError::InternalError(format!("spend open reservation table: {e}"))
679 })?;
680 let existing = reservation_table
681 .get(reservation_id)
682 .map_err(|e| PayError::InternalError(format!("spend read reservation: {e}")))?;
683 let existing_bytes = existing.map(|g| g.value().to_string());
684 if let Some(existing_bytes) = existing_bytes {
685 let mut reservation: SpendReservation = decode(&existing_bytes)?;
686 if matches!(reservation.status, ReservationStatus::Pending) {
687 reservation.status = ReservationStatus::Cancelled;
688 reservation.finalized_at_epoch_ms = Some(now);
689 encoded_blobs.push(encode(&reservation)?);
690 let encoded = encoded_blobs.last().ok_or_else(|| {
691 PayError::InternalError("missing reservation blob".to_string())
692 })?;
693 reservation_table
694 .insert(reservation_id, encoded.as_str())
695 .map_err(|e| {
696 PayError::InternalError(format!("spend update reservation: {e}"))
697 })?;
698 }
699 }
700 }
701
702 write_txn
703 .commit()
704 .map_err(|e| PayError::InternalError(format!("spend commit cancel: {e}")))
705 }
706}
707
708#[cfg(feature = "postgres")]
713impl SpendLedger {
714 fn pg_pool(&self) -> Result<&sqlx::PgPool, PayError> {
715 match &self.backend {
716 SpendBackend::Postgres { pool } => Ok(pool),
717 _ => Err(PayError::InternalError(
718 "expected postgres spend backend".to_string(),
719 )),
720 }
721 }
722
723 async fn add_limit_postgres(&self, limit: &mut SpendLimit) -> Result<String, PayError> {
724 let pool = self.pg_pool()?;
725 let rule_id = generate_rule_identifier()?;
726 limit.rule_id = Some(rule_id.clone());
727 let rule_json = serde_json::to_value(limit)
728 .map_err(|e| PayError::InternalError(format!("serialize spend rule: {e}")))?;
729
730 sqlx::query("INSERT INTO spend_rules (rule_id, rule) VALUES ($1, $2)")
731 .bind(&rule_id)
732 .bind(&rule_json)
733 .execute(pool)
734 .await
735 .map_err(|e| PayError::InternalError(format!("pg insert spend rule: {e}")))?;
736
737 Ok(rule_id)
738 }
739
740 async fn remove_limit_postgres(&self, rule_id: &str) -> Result<(), PayError> {
741 let pool = self.pg_pool()?;
742 let result = sqlx::query("DELETE FROM spend_rules WHERE rule_id = $1")
743 .bind(rule_id)
744 .execute(pool)
745 .await
746 .map_err(|e| PayError::InternalError(format!("pg delete spend rule: {e}")))?;
747
748 if result.rows_affected() == 0 {
749 return Err(PayError::InvalidAmount(format!(
750 "rule_id '{rule_id}' not found"
751 )));
752 }
753 Ok(())
754 }
755
756 async fn set_limits_postgres(&self, limits: &[SpendLimit]) -> Result<(), PayError> {
757 let pool = self.pg_pool()?;
758 let mut tx = pool
759 .begin()
760 .await
761 .map_err(|e| PayError::InternalError(format!("pg begin tx: {e}")))?;
762
763 sqlx::query("DELETE FROM spend_rules")
764 .execute(&mut *tx)
765 .await
766 .map_err(|e| PayError::InternalError(format!("pg clear spend rules: {e}")))?;
767
768 for limit in limits {
769 let mut rule = limit.clone();
770 let rid = generate_rule_identifier()?;
771 rule.rule_id = Some(rid.clone());
772 let rule_json = serde_json::to_value(&rule)
773 .map_err(|e| PayError::InternalError(format!("serialize spend rule: {e}")))?;
774 sqlx::query("INSERT INTO spend_rules (rule_id, rule) VALUES ($1, $2)")
775 .bind(&rid)
776 .bind(&rule_json)
777 .execute(&mut *tx)
778 .await
779 .map_err(|e| PayError::InternalError(format!("pg insert spend rule: {e}")))?;
780 }
781
782 tx.commit()
783 .await
784 .map_err(|e| PayError::InternalError(format!("pg commit set_limits: {e}")))
785 }
786
787 async fn get_status_postgres(&self) -> Result<Vec<SpendLimitStatus>, PayError> {
788 let pool = self.pg_pool()?;
789 let rules = pg_load_rules(pool).await?;
790 let reservations = pg_load_reservations(pool).await?;
791 let now = now_epoch_ms();
792
793 let mut out = Vec::with_capacity(rules.len());
794 for rule in rules {
795 let use_usd = rule.scope == SpendScope::GlobalUsdCents;
796 let (spent, oldest_ts) = spent_in_window(&rule, &reservations, now, use_usd)?;
797 let remaining = rule.max_spend.saturating_sub(spent);
798 let window_ms = rule.window_s.saturating_mul(1000);
799 let window_reset_s = oldest_ts
800 .map(|oldest| (oldest.saturating_add(window_ms)).saturating_sub(now) / 1000)
801 .unwrap_or(0);
802 out.push(SpendLimitStatus {
803 rule_id: rule.rule_id.clone().unwrap_or_default(),
804 scope: rule.scope,
805 network: rule.network.clone(),
806 wallet: rule.wallet.clone(),
807 window_s: rule.window_s,
808 max_spend: rule.max_spend,
809 spent,
810 remaining,
811 token: rule.token.clone(),
812 window_reset_s,
813 });
814 }
815 Ok(out)
816 }
817
818 async fn reserve_postgres(
819 &self,
820 op_id: &str,
821 ctx: &SpendContext,
822 request_hash: &str,
823 ) -> Result<u64, PayError> {
824 use crate::store::postgres_store::SPEND_ADVISORY_LOCK_KEY;
825
826 let pool = self.pg_pool()?;
827 let now = now_epoch_ms();
828
829 let rules = pg_load_rules(pool).await?;
831 if rules.iter().any(|r| {
832 r.scope == SpendScope::Wallet
833 && r.network.as_deref() == Some(ctx.network.as_str())
834 && ctx.wallet.is_none()
835 }) {
836 return Err(PayError::InvalidAmount(
837 "wallet-scoped limits require an explicit wallet".to_string(),
838 ));
839 }
840
841 let needs_usd = rules.iter().any(|r| r.scope == SpendScope::GlobalUsdCents);
842 let amount_usd_cents = if needs_usd {
843 Some(
844 self.amount_to_usd_cents(&ctx.network, ctx.token.as_deref(), ctx.amount_native)
845 .await?,
846 )
847 } else {
848 None
849 };
850
851 let mut tx = pool
853 .begin()
854 .await
855 .map_err(|e| PayError::InternalError(format!("pg begin tx: {e}")))?;
856
857 sqlx::query("SELECT pg_advisory_xact_lock($1)")
858 .bind(SPEND_ADVISORY_LOCK_KEY)
859 .execute(&mut *tx)
860 .await
861 .map_err(|e| PayError::InternalError(format!("pg advisory lock: {e}")))?;
862
863 let existing: Option<(i64, serde_json::Value)> = sqlx::query_as(
865 "SELECT reservation_id, reservation FROM spend_reservations WHERE op_id = $1",
866 )
867 .bind(op_id)
868 .fetch_optional(&mut *tx)
869 .await
870 .map_err(|e| PayError::InternalError(format!("pg check op_id: {e}")))?;
871
872 if let Some((rid, reservation_json)) = existing {
873 let status = serde_json::from_value::<SpendReservation>(reservation_json)
874 .map(|reservation| reservation.status)
875 .unwrap_or(ReservationStatus::Pending);
876 return Err(duplicate_reservation_error(op_id, rid as u64, &status));
877 }
878
879 pg_expire_pending(&mut tx, now).await?;
881
882 let reservations = pg_load_reservations_tx(&mut tx).await?;
884
885 let rules = pg_load_rules_tx(&mut tx).await?;
887
888 for rule in rules.iter() {
890 if !rule_matches_context(
891 rule,
892 &ctx.network,
893 ctx.wallet.as_deref(),
894 ctx.token.as_deref(),
895 ) {
896 continue;
897 }
898
899 let use_usd = rule.scope == SpendScope::GlobalUsdCents;
900 let candidate_amount =
901 amount_for_rule(rule, ctx.amount_native, amount_usd_cents, use_usd)?;
902 let (spent, oldest_ts) = spent_in_window(rule, &reservations, now, use_usd)?;
903 if spent.saturating_add(candidate_amount) > rule.max_spend {
904 let window_ms = rule.window_s.saturating_mul(1000);
905 let remaining_s = oldest_ts
906 .map(|oldest| (oldest.saturating_add(window_ms)).saturating_sub(now) / 1000)
907 .unwrap_or(0);
908
909 return Err(PayError::LimitExceeded {
910 rule_id: rule.rule_id.clone().unwrap_or_default(),
911 scope: rule.scope,
912 scope_key: scope_key(rule),
913 spent,
914 max_spend: rule.max_spend,
915 token: rule.token.clone(),
916 remaining_s,
917 origin: None,
918 });
919 }
920 }
921
922 let reservation = SpendReservation {
924 reservation_id: 0, op_id: op_id.to_string(),
926 network: ctx.network.clone(),
927 wallet: ctx.wallet.clone(),
928 token: ctx.token.clone(),
929 amount_native: ctx.amount_native,
930 amount_usd_cents,
931 status: ReservationStatus::Pending,
932 created_at_epoch_ms: now,
933 expires_at_epoch_ms: now.saturating_add(300_000),
934 finalized_at_epoch_ms: None,
935 request_hash: Some(request_hash.to_string()),
936 };
937 let reservation_json = serde_json::to_value(&reservation)
938 .map_err(|e| PayError::InternalError(format!("serialize reservation: {e}")))?;
939
940 let row: (i64,) = sqlx::query_as(
941 "INSERT INTO spend_reservations (op_id, reservation) \
942 VALUES ($1, $2) RETURNING reservation_id",
943 )
944 .bind(op_id)
945 .bind(&reservation_json)
946 .fetch_one(&mut *tx)
947 .await
948 .map_err(|e| PayError::InternalError(format!("pg insert reservation: {e}")))?;
949
950 let reservation_id = row.0 as u64;
951
952 let mut updated_json = reservation_json;
954 updated_json["reservation_id"] = serde_json::json!(reservation_id);
955 sqlx::query("UPDATE spend_reservations SET reservation = $1 WHERE reservation_id = $2")
956 .bind(&updated_json)
957 .bind(row.0)
958 .execute(&mut *tx)
959 .await
960 .map_err(|e| PayError::InternalError(format!("pg update reservation id: {e}")))?;
961
962 tx.commit()
963 .await
964 .map_err(|e| PayError::InternalError(format!("pg commit reserve: {e}")))?;
965
966 Ok(reservation_id)
967 }
968
969 async fn confirm_postgres(&self, reservation_id: u64) -> Result<(), PayError> {
970 let pool = self.pg_pool()?;
971 let now = now_epoch_ms();
972 let rid = reservation_id as i64;
973
974 let row: Option<(serde_json::Value,)> =
975 sqlx::query_as("SELECT reservation FROM spend_reservations WHERE reservation_id = $1")
976 .bind(rid)
977 .fetch_optional(pool)
978 .await
979 .map_err(|e| PayError::InternalError(format!("pg read reservation: {e}")))?;
980
981 let Some((res_json,)) = row else {
982 return Err(PayError::InternalError(format!(
983 "reservation {reservation_id} not found"
984 )));
985 };
986
987 let mut reservation: SpendReservation = serde_json::from_value(res_json)
988 .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))?;
989
990 if !matches!(reservation.status, ReservationStatus::Pending) {
991 return Ok(());
992 }
993
994 reservation.status = ReservationStatus::Confirmed;
995 reservation.finalized_at_epoch_ms = Some(now);
996 let updated_json = serde_json::to_value(&reservation)
997 .map_err(|e| PayError::InternalError(format!("serialize reservation: {e}")))?;
998
999 let event = SpendEvent {
1000 event_id: 0, reservation_id,
1002 op_id: reservation.op_id,
1003 network: reservation.network,
1004 wallet: reservation.wallet,
1005 token: reservation.token,
1006 amount_native: reservation.amount_native,
1007 amount_usd_cents: reservation.amount_usd_cents,
1008 created_at_epoch_ms: reservation.created_at_epoch_ms,
1009 confirmed_at_epoch_ms: now,
1010 };
1011 let event_json = serde_json::to_value(&event)
1012 .map_err(|e| PayError::InternalError(format!("serialize spend event: {e}")))?;
1013
1014 let mut tx = pool
1015 .begin()
1016 .await
1017 .map_err(|e| PayError::InternalError(format!("pg begin tx: {e}")))?;
1018
1019 sqlx::query("UPDATE spend_reservations SET reservation = $1 WHERE reservation_id = $2")
1020 .bind(&updated_json)
1021 .bind(rid)
1022 .execute(&mut *tx)
1023 .await
1024 .map_err(|e| PayError::InternalError(format!("pg update reservation: {e}")))?;
1025
1026 sqlx::query("INSERT INTO spend_events (reservation_id, event) VALUES ($1, $2)")
1027 .bind(rid)
1028 .bind(&event_json)
1029 .execute(&mut *tx)
1030 .await
1031 .map_err(|e| PayError::InternalError(format!("pg insert spend event: {e}")))?;
1032
1033 tx.commit()
1034 .await
1035 .map_err(|e| PayError::InternalError(format!("pg commit confirm: {e}")))
1036 }
1037
1038 async fn cancel_postgres(&self, reservation_id: u64) -> Result<(), PayError> {
1039 let pool = self.pg_pool()?;
1040 let now = now_epoch_ms();
1041 let rid = reservation_id as i64;
1042
1043 let row: Option<(serde_json::Value,)> =
1044 sqlx::query_as("SELECT reservation FROM spend_reservations WHERE reservation_id = $1")
1045 .bind(rid)
1046 .fetch_optional(pool)
1047 .await
1048 .map_err(|e| PayError::InternalError(format!("pg read reservation: {e}")))?;
1049
1050 if let Some((res_json,)) = row {
1051 let mut reservation: SpendReservation = serde_json::from_value(res_json)
1052 .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))?;
1053
1054 if matches!(reservation.status, ReservationStatus::Pending) {
1055 reservation.status = ReservationStatus::Cancelled;
1056 reservation.finalized_at_epoch_ms = Some(now);
1057 let updated_json = serde_json::to_value(&reservation)
1058 .map_err(|e| PayError::InternalError(format!("serialize reservation: {e}")))?;
1059
1060 sqlx::query(
1061 "UPDATE spend_reservations SET reservation = $1 WHERE reservation_id = $2",
1062 )
1063 .bind(&updated_json)
1064 .bind(rid)
1065 .execute(pool)
1066 .await
1067 .map_err(|e| PayError::InternalError(format!("pg update reservation: {e}")))?;
1068 }
1069 }
1070
1071 Ok(())
1072 }
1073}
1074
1075#[cfg(feature = "postgres")]
1076async fn pg_load_rules(pool: &sqlx::PgPool) -> Result<Vec<SpendLimit>, PayError> {
1077 let rows: Vec<(serde_json::Value,)> =
1078 sqlx::query_as("SELECT rule FROM spend_rules ORDER BY rule_id")
1079 .fetch_all(pool)
1080 .await
1081 .map_err(|e| PayError::InternalError(format!("pg load spend rules: {e}")))?;
1082 rows.into_iter()
1083 .map(|(v,)| {
1084 serde_json::from_value(v)
1085 .map_err(|e| PayError::InternalError(format!("pg parse spend rule: {e}")))
1086 })
1087 .collect()
1088}
1089
1090#[cfg(feature = "postgres")]
1091async fn pg_load_rules_tx(
1092 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1093) -> Result<Vec<SpendLimit>, PayError> {
1094 let rows: Vec<(serde_json::Value,)> =
1095 sqlx::query_as("SELECT rule FROM spend_rules ORDER BY rule_id")
1096 .fetch_all(&mut **tx)
1097 .await
1098 .map_err(|e| PayError::InternalError(format!("pg load spend rules: {e}")))?;
1099 rows.into_iter()
1100 .map(|(v,)| {
1101 serde_json::from_value(v)
1102 .map_err(|e| PayError::InternalError(format!("pg parse spend rule: {e}")))
1103 })
1104 .collect()
1105}
1106
1107#[cfg(feature = "postgres")]
1108async fn pg_load_reservations(pool: &sqlx::PgPool) -> Result<Vec<SpendReservation>, PayError> {
1109 let rows: Vec<(serde_json::Value,)> =
1110 sqlx::query_as("SELECT reservation FROM spend_reservations ORDER BY reservation_id")
1111 .fetch_all(pool)
1112 .await
1113 .map_err(|e| PayError::InternalError(format!("pg load reservations: {e}")))?;
1114 rows.into_iter()
1115 .map(|(v,)| {
1116 serde_json::from_value(v)
1117 .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))
1118 })
1119 .collect()
1120}
1121
1122#[cfg(feature = "postgres")]
1123async fn pg_load_reservations_tx(
1124 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1125) -> Result<Vec<SpendReservation>, PayError> {
1126 let rows: Vec<(serde_json::Value,)> =
1127 sqlx::query_as("SELECT reservation FROM spend_reservations ORDER BY reservation_id")
1128 .fetch_all(&mut **tx)
1129 .await
1130 .map_err(|e| PayError::InternalError(format!("pg load reservations: {e}")))?;
1131 rows.into_iter()
1132 .map(|(v,)| {
1133 serde_json::from_value(v)
1134 .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))
1135 })
1136 .collect()
1137}
1138
1139#[cfg(feature = "postgres")]
1140async fn pg_expire_pending(
1141 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1142 now_ms: u64,
1143) -> Result<(), PayError> {
1144 let rows: Vec<(i64, serde_json::Value)> =
1146 sqlx::query_as("SELECT reservation_id, reservation FROM spend_reservations")
1147 .fetch_all(&mut **tx)
1148 .await
1149 .map_err(|e| {
1150 PayError::InternalError(format!("pg load reservations for expire: {e}"))
1151 })?;
1152
1153 for (rid, res_json) in rows {
1154 let mut reservation: SpendReservation = serde_json::from_value(res_json)
1155 .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))?;
1156 if matches!(reservation.status, ReservationStatus::Pending)
1157 && reservation.expires_at_epoch_ms <= now_ms
1158 {
1159 reservation.status = ReservationStatus::Expired;
1160 reservation.finalized_at_epoch_ms = Some(now_ms);
1161 let updated = serde_json::to_value(&reservation)
1162 .map_err(|e| PayError::InternalError(format!("serialize reservation: {e}")))?;
1163 sqlx::query("UPDATE spend_reservations SET reservation = $1 WHERE reservation_id = $2")
1164 .bind(&updated)
1165 .bind(rid)
1166 .execute(&mut **tx)
1167 .await
1168 .map_err(|e| PayError::InternalError(format!("pg expire reservation: {e}")))?;
1169 }
1170 }
1171 Ok(())
1172}
1173
1174impl SpendLedger {
1179 async fn amount_to_usd_cents(
1180 &self,
1181 network: &str,
1182 token: Option<&str>,
1183 amount_native: u64,
1184 ) -> Result<u64, PayError> {
1185 let (symbol, divisor) = token_asset(network, token).ok_or_else(|| {
1186 PayError::InvalidAmount(format!(
1187 "network '{network}' token '{token:?}' is unsupported for global-usd-cents limits"
1188 ))
1189 })?;
1190
1191 let quote = if symbol == "USD" {
1192 let now = now_epoch_ms();
1193 ExchangeRateQuote {
1194 pair: "USD/USD".to_string(),
1195 source: "identity".to_string(),
1196 price: 1.0,
1197 fetched_at_epoch_ms: now,
1198 expires_at_epoch_ms: now.saturating_add(86_400_000),
1199 }
1200 } else {
1201 self.get_or_fetch_quote(symbol, "USD").await?
1202 };
1203
1204 let now = now_epoch_ms();
1207 if quote.expires_at_epoch_ms > 0 && now > quote.expires_at_epoch_ms {
1208 return Err(PayError::NetworkError(
1209 "exchange-rate quote expired — cannot convert to USD; check exchange_rate sources"
1210 .to_string(),
1211 ));
1212 }
1213
1214 let ttl_ms = quote
1217 .expires_at_epoch_ms
1218 .saturating_sub(quote.fetched_at_epoch_ms);
1219 let age_ms = now.saturating_sub(quote.fetched_at_epoch_ms);
1220 if ttl_ms > 0 && age_ms > ttl_ms * 4 / 5 {
1221 self.fx_stale_warned
1222 .store(true, std::sync::atomic::Ordering::Relaxed);
1223 }
1224
1225 let usd = (amount_native as f64 / divisor) * quote.price;
1226 if !usd.is_finite() || usd < 0f64 {
1227 return Err(PayError::InternalError(
1228 "invalid exchange-rate conversion result".to_string(),
1229 ));
1230 }
1231 Ok((usd * 100f64).round() as u64)
1232 }
1233
1234 async fn get_or_fetch_quote(
1235 &self,
1236 base: &str,
1237 quote: &str,
1238 ) -> Result<ExchangeRateQuote, PayError> {
1239 let pair = format!(
1240 "{}/{}",
1241 base.to_ascii_uppercase(),
1242 quote.to_ascii_uppercase()
1243 );
1244 let now = now_epoch_ms();
1245
1246 #[cfg(feature = "redb")]
1248 if let SpendBackend::Redb { .. } = &self.backend {
1249 let fx_db = self.open_exchange_rate_db()?;
1250 let read_txn = fx_db
1251 .begin_read()
1252 .map_err(|e| PayError::InternalError(format!("fx begin_read: {e}")))?;
1253 if let Ok(table) = read_txn.open_table(FX_QUOTE_BY_PAIR) {
1254 if let Some(entry) = table
1255 .get(pair.as_str())
1256 .map_err(|e| PayError::InternalError(format!("fx read quote: {e}")))?
1257 {
1258 let cached: ExchangeRateQuote = decode(entry.value())?;
1259 if cached.expires_at_epoch_ms > now {
1260 return Ok(cached);
1261 }
1262 }
1263 }
1264 }
1265
1266 #[cfg(feature = "postgres")]
1268 if let SpendBackend::Postgres { pool } = &self.backend {
1269 let row: Option<(serde_json::Value,)> =
1270 sqlx::query_as("SELECT quote FROM exchange_rate_cache WHERE pair = $1")
1271 .bind(&pair)
1272 .fetch_optional(pool)
1273 .await
1274 .map_err(|e| PayError::InternalError(format!("pg fx read cache: {e}")))?;
1275 if let Some((quote_json,)) = row {
1276 let cached: ExchangeRateQuote = serde_json::from_value(quote_json)
1277 .map_err(|e| PayError::InternalError(format!("pg fx parse cache: {e}")))?;
1278 if cached.expires_at_epoch_ms > now {
1279 return Ok(cached);
1280 }
1281 }
1282 }
1283
1284 let (fetched_price, source_name) = self.fetch_exchange_rate_http(base, quote).await?;
1285 let ttl_s = self
1286 .exchange_rate
1287 .as_ref()
1288 .map(|cfg| cfg.ttl_s)
1289 .unwrap_or(300)
1290 .max(1);
1291 let new_quote = ExchangeRateQuote {
1292 pair: pair.clone(),
1293 source: source_name,
1294 price: fetched_price,
1295 fetched_at_epoch_ms: now,
1296 expires_at_epoch_ms: now.saturating_add(ttl_s.saturating_mul(1000)),
1297 };
1298
1299 #[cfg(feature = "redb")]
1301 if let SpendBackend::Redb { .. } = &self.backend {
1302 let fx_db = self.open_exchange_rate_db()?;
1303 let write_txn = fx_db
1304 .begin_write()
1305 .map_err(|e| PayError::InternalError(format!("fx begin_write: {e}")))?;
1306 let mut encoded_blobs: Vec<String> = Vec::new();
1307 {
1308 let mut table = write_txn
1309 .open_table(FX_QUOTE_BY_PAIR)
1310 .map_err(|e| PayError::InternalError(format!("fx open quote table: {e}")))?;
1311 encoded_blobs.push(encode(&new_quote)?);
1312 let encoded = encoded_blobs
1313 .last()
1314 .ok_or_else(|| PayError::InternalError("missing quote blob".to_string()))?;
1315 table
1316 .insert(pair.as_str(), encoded.as_str())
1317 .map_err(|e| PayError::InternalError(format!("fx insert quote: {e}")))?;
1318 }
1319 write_txn
1320 .commit()
1321 .map_err(|e| PayError::InternalError(format!("fx commit write: {e}")))?;
1322 }
1323
1324 #[cfg(feature = "postgres")]
1326 if let SpendBackend::Postgres { pool } = &self.backend {
1327 let quote_json = serde_json::to_value(&new_quote)
1328 .map_err(|e| PayError::InternalError(format!("serialize fx quote: {e}")))?;
1329 let _ = sqlx::query(
1330 "INSERT INTO exchange_rate_cache (pair, quote) VALUES ($1, $2) \
1331 ON CONFLICT (pair) DO UPDATE SET quote = $2",
1332 )
1333 .bind(&pair)
1334 .bind("e_json)
1335 .execute(pool)
1336 .await;
1337 }
1338
1339 Ok(new_quote)
1340 }
1341
1342 #[cfg(feature = "exchange-rate")]
1343 async fn fetch_exchange_rate_http(
1344 &self,
1345 base: &str,
1346 quote_currency: &str,
1347 ) -> Result<(f64, String), PayError> {
1348 let cfg = self.exchange_rate.as_ref().cloned().unwrap_or_default();
1349
1350 if cfg.sources.is_empty() {
1351 return Err(PayError::InvalidAmount(
1352 "exchange_rate.sources is empty — no exchange-rate API configured".to_string(),
1353 ));
1354 }
1355
1356 let client = reqwest::Client::new();
1357 let mut last_err = String::new();
1358
1359 for source in &cfg.sources {
1360 match fetch_from_source(&client, source, base, quote_currency).await {
1361 Ok(price) => return Ok((price, source.endpoint.clone())),
1362 Err(e) => {
1363 last_err =
1364 format!("{} ({}): {e}", source.endpoint, source.source_type.as_str());
1365 }
1366 }
1367 }
1368
1369 Err(PayError::NetworkError(format!(
1370 "all exchange-rate sources failed; last: {last_err}"
1371 )))
1372 }
1373
1374 #[cfg(not(feature = "exchange-rate"))]
1375 async fn fetch_exchange_rate_http(
1376 &self,
1377 _base: &str,
1378 _quote_currency: &str,
1379 ) -> Result<(f64, String), PayError> {
1380 Err(PayError::NotImplemented(
1381 "exchange-rate HTTP support is not built in this feature set".to_string(),
1382 ))
1383 }
1384}
1385
1386fn now_epoch_ms() -> u64 {
1391 std::time::SystemTime::now()
1392 .duration_since(std::time::UNIX_EPOCH)
1393 .map(|d| d.as_millis() as u64)
1394 .unwrap_or(0)
1395}
1396
1397fn spend_request_hash(op_id: &str, ctx: &SpendContext) -> String {
1398 let mut hasher = DefaultHasher::new();
1399 op_id.hash(&mut hasher);
1400 ctx.hash(&mut hasher);
1401 format!("{:016x}", hasher.finish())
1402}
1403
1404fn reservation_status_label(status: &ReservationStatus) -> &'static str {
1405 match status {
1406 ReservationStatus::Pending => "pending",
1407 ReservationStatus::Confirmed => "confirmed",
1408 ReservationStatus::Cancelled => "cancelled",
1409 ReservationStatus::Expired => "expired",
1410 }
1411}
1412
1413fn duplicate_reservation_error(
1414 op_id: &str,
1415 reservation_id: u64,
1416 status: &ReservationStatus,
1417) -> PayError {
1418 PayError::InvalidAmount(format!(
1419 "duplicate spend operation id '{op_id}' already has reservation {reservation_id} ({status}); refusing to re-execute payment",
1420 status = reservation_status_label(status)
1421 ))
1422}
1423
1424fn normalize_limit(rule: &mut SpendLimit) {
1425 rule.network = rule
1426 .network
1427 .as_deref()
1428 .map(str::trim)
1429 .filter(|value| !value.is_empty())
1430 .map(|value| value.to_ascii_lowercase());
1431 rule.wallet = rule
1432 .wallet
1433 .as_deref()
1434 .map(str::trim)
1435 .filter(|value| !value.is_empty())
1436 .map(str::to_string);
1437 rule.token = rule
1438 .token
1439 .as_deref()
1440 .map(str::trim)
1441 .filter(|value| !value.is_empty())
1442 .map(|value| canonical_spend_token(rule.network.as_deref().unwrap_or(""), value));
1443
1444 if matches!(rule.scope, SpendScope::Network | SpendScope::Wallet)
1445 && matches!(rule.network.as_deref(), Some("sol" | "evm"))
1446 && rule.token.is_none()
1447 {
1448 rule.token = Some("native".to_string());
1449 }
1450}
1451
1452fn canonical_spend_token(network: &str, token: &str) -> String {
1453 let token = token.trim().to_ascii_lowercase();
1454 match (network, token.as_str()) {
1455 ("sol", "sol" | "native" | "lamports") => "native".to_string(),
1456 ("evm", "eth" | "native" | "wei") => "native".to_string(),
1457 _ => token,
1458 }
1459}
1460
1461fn token_asset(network: &str, token: Option<&str>) -> Option<(&'static str, f64)> {
1462 let network = network.to_ascii_lowercase();
1463 match token.map(|t| canonical_spend_token(&network, t)).as_deref() {
1464 Some("native") => {
1465 if network == "sol" {
1466 Some(("SOL", 1e9))
1467 } else if network == "evm" {
1468 Some(("ETH", 1e18))
1469 } else if network.starts_with("ln") || network == "cashu" || network == "btc" {
1470 Some(("BTC", 1e8))
1471 } else {
1472 None
1473 }
1474 }
1475 Some("btc" | "sat" | "sats") => Some(("BTC", 1e8)),
1476 Some("sol") => Some(("SOL", 1e9)),
1477 Some("eth") => Some(("ETH", 1e18)),
1478 Some("usdc" | "usdt") => Some(("USD", 1e6)),
1479 Some(_) => None,
1480 None => {
1481 if network.starts_with("ln") || network == "cashu" || network == "btc" {
1482 Some(("BTC", 1e8))
1483 } else {
1484 None
1485 }
1486 }
1487 }
1488}
1489
1490#[cfg(feature = "exchange-rate")]
1491fn extract_price_generic(value: &serde_json::Value) -> Option<f64> {
1492 value
1493 .get("price")
1494 .and_then(|v| v.as_f64())
1495 .or_else(|| value.get("rate").and_then(|v| v.as_f64()))
1496 .or_else(|| value.get("usd_per_base").and_then(|v| v.as_f64()))
1497 .or_else(|| {
1498 value
1499 .get("data")
1500 .and_then(|d| d.get("price"))
1501 .and_then(|v| v.as_f64())
1502 })
1503}
1504
1505#[cfg(feature = "exchange-rate")]
1506impl ExchangeRateSourceType {
1507 fn as_str(self) -> &'static str {
1508 match self {
1509 Self::Generic => "generic",
1510 Self::CoinGecko => "coingecko",
1511 Self::Kraken => "kraken",
1512 }
1513 }
1514}
1515
1516#[cfg(feature = "exchange-rate")]
1517fn coingecko_coin_id(symbol: &str) -> Option<&'static str> {
1518 match symbol.to_ascii_uppercase().as_str() {
1519 "BTC" => Some("bitcoin"),
1520 "SOL" => Some("solana"),
1521 "ETH" => Some("ethereum"),
1522 _ => None,
1523 }
1524}
1525
1526#[cfg(feature = "exchange-rate")]
1527fn kraken_pair(symbol: &str) -> Option<&'static str> {
1528 match symbol.to_ascii_uppercase().as_str() {
1529 "BTC" => Some("XBTUSD"),
1530 "SOL" => Some("SOLUSD"),
1531 "ETH" => Some("ETHUSD"),
1532 _ => None,
1533 }
1534}
1535
1536#[cfg(feature = "exchange-rate")]
1537async fn fetch_from_source(
1538 client: &reqwest::Client,
1539 source: &crate::types::ExchangeRateSource,
1540 base: &str,
1541 quote_currency: &str,
1542) -> Result<f64, String> {
1543 type PriceExtractor = Box<dyn Fn(&serde_json::Value) -> Option<f64> + Send>;
1544 let (url, extract_fn): (String, PriceExtractor) = match source.source_type {
1545 ExchangeRateSourceType::Kraken => {
1546 let pair = kraken_pair(base)
1547 .ok_or_else(|| format!("kraken: unsupported base asset '{base}'"))?;
1548 let url = format!("{}/0/public/Ticker?pair={pair}", source.endpoint);
1549 let pair_owned = pair.to_string();
1550 (
1551 url,
1552 Box::new(move |v: &serde_json::Value| {
1553 let result = v.get("result")?;
1554 let ticker = result
1555 .get(&pair_owned)
1556 .or_else(|| result.as_object().and_then(|m| m.values().next()))?;
1557 let price_str = ticker.get("c")?.as_array()?.first()?.as_str()?;
1558 price_str.parse::<f64>().ok()
1559 }),
1560 )
1561 }
1562 ExchangeRateSourceType::CoinGecko => {
1563 let coin_id = coingecko_coin_id(base)
1564 .ok_or_else(|| format!("coingecko: unsupported base asset '{base}'"))?;
1565 let vs = quote_currency.to_ascii_lowercase();
1566 let url = format!(
1567 "{}/simple/price?ids={coin_id}&vs_currencies={vs}",
1568 source.endpoint
1569 );
1570 let coin_id_owned = coin_id.to_string();
1571 let vs_owned = vs.clone();
1572 (
1573 url,
1574 Box::new(move |v: &serde_json::Value| {
1575 v.get(&coin_id_owned)?.get(&vs_owned)?.as_f64()
1576 }),
1577 )
1578 }
1579 ExchangeRateSourceType::Generic => {
1580 let sep = if source.endpoint.contains('?') {
1581 '&'
1582 } else {
1583 '?'
1584 };
1585 let url = format!(
1586 "{}{sep}base={}"e={}",
1587 source.endpoint,
1588 base.to_ascii_uppercase(),
1589 quote_currency.to_ascii_uppercase()
1590 );
1591 (url, Box::new(extract_price_generic))
1592 }
1593 };
1594
1595 let mut req = client.get(&url);
1596 if let Some(key) = &source.api_key_secret {
1597 req = req.header("Authorization", format!("Bearer {key}"));
1598 req = req.header("X-Api-Key", key);
1599 }
1600
1601 let resp = req
1602 .send()
1603 .await
1604 .map_err(|e| format!("request failed: {e}"))?;
1605 if !resp.status().is_success() {
1606 return Err(format!("status {}", resp.status()));
1607 }
1608
1609 let value: serde_json::Value = resp
1610 .json()
1611 .await
1612 .map_err(|e| format!("parse failed: {e}"))?;
1613
1614 extract_fn(&value).ok_or_else(|| "could not extract price from response".to_string())
1615}
1616
1617#[cfg(feature = "redb")]
1618fn encode<T: Serialize>(value: &T) -> Result<String, PayError> {
1619 serde_json::to_string(value)
1620 .map_err(|e| PayError::InternalError(format!("spend encode failed: {e}")))
1621}
1622
1623#[cfg(feature = "redb")]
1624fn decode<T: DeserializeOwned>(encoded: &str) -> Result<T, PayError> {
1625 serde_json::from_str(encoded).map_err(|e| {
1626 let preview_len = encoded.len().min(48);
1627 let preview = &encoded[..preview_len];
1628 PayError::InternalError(format!(
1629 "spend decode failed (len={}, preview={}): {e}",
1630 encoded.len(),
1631 preview
1632 ))
1633 })
1634}
1635
1636#[cfg(feature = "redb")]
1637fn prepend_err(prefix: &str, err: PayError) -> PayError {
1638 match err {
1639 PayError::InternalError(msg) => PayError::InternalError(format!("{prefix}: {msg}")),
1640 other => other,
1641 }
1642}
1643
1644fn generate_rule_identifier() -> Result<String, PayError> {
1645 let mut buf = [0u8; 4];
1646 getrandom::fill(&mut buf).map_err(|e| PayError::InternalError(format!("rng failed: {e}")))?;
1647 Ok(format!("r_{}", hex::encode(buf)))
1648}
1649
1650fn validate_limit(
1651 rule: &SpendLimit,
1652 exchange_rate: Option<&ExchangeRateConfig>,
1653) -> Result<(), PayError> {
1654 if rule.window_s == 0 {
1655 return Err(PayError::InvalidAmount(
1656 "limit rule has zero window_s".to_string(),
1657 ));
1658 }
1659 if rule.max_spend == 0 {
1660 return Err(PayError::InvalidAmount(
1661 "limit rule has zero max_spend".to_string(),
1662 ));
1663 }
1664
1665 match rule.scope {
1666 SpendScope::GlobalUsdCents => {
1667 if rule.network.is_some() || rule.wallet.is_some() {
1668 return Err(PayError::InvalidAmount(
1669 "scope=global-usd-cents cannot set network/wallet".to_string(),
1670 ));
1671 }
1672 if rule.token.is_some() {
1673 return Err(PayError::InvalidAmount(
1674 "scope=global-usd-cents cannot set token".to_string(),
1675 ));
1676 }
1677 }
1678 SpendScope::Network => {
1679 if rule.network.as_deref().unwrap_or("").trim().is_empty() {
1680 return Err(PayError::InvalidAmount(
1681 "scope=network requires network".to_string(),
1682 ));
1683 }
1684 if rule.wallet.is_some() {
1685 return Err(PayError::InvalidAmount(
1686 "scope=network cannot set wallet".to_string(),
1687 ));
1688 }
1689 }
1690 SpendScope::Wallet => {
1691 if rule.network.as_deref().unwrap_or("").trim().is_empty() {
1692 return Err(PayError::InvalidAmount(
1693 "scope=wallet requires network".to_string(),
1694 ));
1695 }
1696 if rule.wallet.as_deref().unwrap_or("").trim().is_empty() {
1697 return Err(PayError::InvalidAmount(
1698 "scope=wallet requires wallet".to_string(),
1699 ));
1700 }
1701 }
1702 }
1703
1704 if rule.scope == SpendScope::GlobalUsdCents && exchange_rate.is_none() {
1705 return Err(PayError::InvalidAmount(
1706 "scope=global-usd-cents requires config.exchange_rate".to_string(),
1707 ));
1708 }
1709 Ok(())
1710}
1711
1712#[cfg(feature = "redb")]
1713fn load_rules(read_txn: &redb::ReadTransaction) -> Result<Vec<SpendLimit>, PayError> {
1714 let Ok(rule_table) = read_txn.open_table(RULE_BY_ID) else {
1715 return Ok(vec![]);
1716 };
1717 rule_table
1718 .iter()
1719 .map_err(|e| PayError::InternalError(format!("spend iterate rules: {e}")))?
1720 .map(|entry| {
1721 let (_k, v) = entry
1722 .map_err(|e| PayError::InternalError(format!("spend read rule entry: {e}")))?;
1723 decode::<SpendLimit>(v.value()).map_err(|e| prepend_err("spend decode rule", e))
1724 })
1725 .collect()
1726}
1727
1728#[cfg(feature = "redb")]
1729fn load_reservations(read_txn: &redb::ReadTransaction) -> Result<Vec<SpendReservation>, PayError> {
1730 let Ok(table) = read_txn.open_table(RESERVATION_BY_ID) else {
1731 return Ok(vec![]);
1732 };
1733 table
1734 .iter()
1735 .map_err(|e| PayError::InternalError(format!("spend iterate reservations: {e}")))?
1736 .map(|entry| {
1737 let (_k, v) = entry
1738 .map_err(|e| PayError::InternalError(format!("spend read reservation: {e}")))?;
1739 decode::<SpendReservation>(v.value())
1740 .map_err(|e| prepend_err("spend decode reservation", e))
1741 })
1742 .collect()
1743}
1744
1745#[cfg(feature = "redb")]
1746fn expire_pending(_table: &mut redb::Table<u64, &str>, _now_ms: u64) -> Result<(), PayError> {
1747 Ok(())
1748}
1749
1750fn amount_for_rule(
1751 _rule: &SpendLimit,
1752 amount_native: u64,
1753 amount_usd_cents: Option<u64>,
1754 use_usd: bool,
1755) -> Result<u64, PayError> {
1756 if use_usd {
1757 amount_usd_cents.ok_or_else(|| {
1758 PayError::InternalError("missing USD amount for non-native unit rule".to_string())
1759 })
1760 } else {
1761 Ok(amount_native)
1762 }
1763}
1764
1765fn reservation_active_for_window(r: &SpendReservation, now_ms: u64) -> bool {
1766 match r.status {
1767 ReservationStatus::Confirmed => true,
1768 ReservationStatus::Pending => r.expires_at_epoch_ms > now_ms,
1769 ReservationStatus::Cancelled | ReservationStatus::Expired => false,
1770 }
1771}
1772
1773fn rule_matches_context(
1774 rule: &SpendLimit,
1775 network: &str,
1776 wallet: Option<&str>,
1777 token: Option<&str>,
1778) -> bool {
1779 if let Some(rule_token) = &rule.token {
1780 let normalized_rule_token = canonical_spend_token(network, rule_token);
1781 match token.map(|ctx_token| canonical_spend_token(network, ctx_token)) {
1782 Some(ctx_token) if ctx_token == normalized_rule_token => {}
1783 _ => return false,
1784 }
1785 }
1786 match rule.scope {
1787 SpendScope::GlobalUsdCents => true,
1788 SpendScope::Network => rule.network.as_deref() == Some(network),
1789 SpendScope::Wallet => {
1790 rule.network.as_deref() == Some(network) && rule.wallet.as_deref() == wallet
1791 }
1792 }
1793}
1794
1795fn scope_key(rule: &SpendLimit) -> String {
1796 match rule.scope {
1797 SpendScope::GlobalUsdCents => "global-usd-cents".to_string(),
1798 SpendScope::Network => rule.network.clone().unwrap_or_default(),
1799 SpendScope::Wallet => format!(
1800 "{}/{}",
1801 rule.network.clone().unwrap_or_default(),
1802 rule.wallet.clone().unwrap_or_default()
1803 ),
1804 }
1805}
1806
1807fn spent_in_window(
1808 rule: &SpendLimit,
1809 reservations: &[SpendReservation],
1810 now_ms: u64,
1811 use_usd: bool,
1812) -> Result<(u64, Option<u64>), PayError> {
1813 let window_ms = rule.window_s.saturating_mul(1000);
1814 let cutoff = now_ms.saturating_sub(window_ms);
1815
1816 let mut spent = 0u64;
1817 let mut oldest: Option<u64> = None;
1818
1819 for r in reservations {
1820 if !reservation_active_for_window(r, now_ms) {
1821 continue;
1822 }
1823 if r.created_at_epoch_ms < cutoff {
1824 continue;
1825 }
1826 if !rule_matches_context(rule, &r.network, r.wallet.as_deref(), r.token.as_deref()) {
1827 continue;
1828 }
1829
1830 let amount = if use_usd {
1831 r.amount_usd_cents.ok_or_else(|| {
1832 PayError::InternalError("reservation missing USD amount".to_string())
1833 })?
1834 } else {
1835 r.amount_native
1836 };
1837 spent = spent.saturating_add(amount);
1838 oldest = Some(oldest.map_or(r.created_at_epoch_ms, |v| v.min(r.created_at_epoch_ms)));
1839 }
1840
1841 Ok((spent, oldest))
1842}
1843
1844#[cfg(feature = "redb")]
1845fn next_counter(write_txn: &redb::WriteTransaction, key: &str) -> Result<u64, PayError> {
1846 let mut meta = write_txn
1847 .open_table(META_COUNTER)
1848 .map_err(|e| PayError::InternalError(format!("spend open meta table: {e}")))?;
1849 let current = match meta
1850 .get(key)
1851 .map_err(|e| PayError::InternalError(format!("spend read counter {key}: {e}")))?
1852 {
1853 Some(v) => v.value(),
1854 None => 0,
1855 };
1856 let next = current.saturating_add(1);
1857 meta.insert(key, next)
1858 .map_err(|e| PayError::InternalError(format!("spend write counter {key}: {e}")))?;
1859 Ok(next)
1860}
1861
1862#[cfg(test)]
1863#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1864mod tests {
1865 use super::*;
1866
1867 fn make_limit(scope: SpendScope, network: Option<&str>, wallet: Option<&str>) -> SpendLimit {
1868 SpendLimit {
1869 rule_id: None,
1870 scope,
1871 network: network.map(|s| s.to_string()),
1872 wallet: wallet.map(|s| s.to_string()),
1873 window_s: 3600,
1874 max_spend: 1000,
1875 token: None,
1876 }
1877 }
1878
1879 #[cfg(feature = "redb")]
1880 #[tokio::test]
1881 async fn provider_limit_reserve_and_confirm() {
1882 let tmp = tempfile::tempdir().unwrap();
1883 let ledger = SpendLedger::new(tmp.path().to_str().unwrap(), None);
1884
1885 ledger
1886 .set_limits(&[make_limit(SpendScope::Network, Some("cashu"), None)])
1887 .await
1888 .unwrap();
1889
1890 let ctx = SpendContext {
1891 network: "cashu".to_string(),
1892 wallet: Some("w_01".to_string()),
1893 amount_native: 400,
1894 token: None,
1895 };
1896 let r1 = ledger.reserve("op_1", &ctx).await.unwrap();
1897 ledger.confirm(r1).await.unwrap();
1898
1899 let r2 = ledger.reserve("op_2", &ctx).await.unwrap();
1900 let err = ledger.reserve("op_3", &ctx).await.unwrap_err();
1901 assert!(matches!(err, PayError::LimitExceeded { .. }));
1902
1903 ledger.cancel(r2).await.unwrap();
1904 }
1905
1906 #[cfg(feature = "redb")]
1907 #[tokio::test]
1908 async fn duplicate_operation_id_is_rejected_after_confirm() {
1909 let tmp = tempfile::tempdir().unwrap();
1910 let ledger = SpendLedger::new(tmp.path().to_str().unwrap(), None);
1911
1912 ledger
1913 .set_limits(&[make_limit(SpendScope::Network, Some("cashu"), None)])
1914 .await
1915 .unwrap();
1916
1917 let ctx = SpendContext {
1918 network: "cashu".to_string(),
1919 wallet: Some("w_01".to_string()),
1920 amount_native: 100,
1921 token: None,
1922 };
1923 let rid = ledger.reserve("op_duplicate", &ctx).await.unwrap();
1924 ledger.confirm(rid).await.unwrap();
1925
1926 let err = ledger.reserve("op_duplicate", &ctx).await.unwrap_err();
1927 assert!(matches!(err, PayError::InvalidAmount(_)));
1928 assert!(err.to_string().contains("refusing to re-execute"));
1929 }
1930
1931 #[cfg(feature = "redb")]
1932 #[tokio::test]
1933 async fn wallet_scope_requires_wallet_context() {
1934 let tmp = tempfile::tempdir().unwrap();
1935 let ledger = SpendLedger::new(tmp.path().to_str().unwrap(), None);
1936
1937 ledger
1938 .set_limits(&[make_limit(SpendScope::Wallet, Some("cashu"), Some("w_abc"))])
1939 .await
1940 .unwrap();
1941
1942 let ctx = SpendContext {
1943 network: "cashu".to_string(),
1944 wallet: None,
1945 amount_native: 1,
1946 token: None,
1947 };
1948 let err = ledger.reserve("op_1", &ctx).await.unwrap_err();
1949 assert!(matches!(err, PayError::InvalidAmount(_)));
1950 }
1951
1952 #[tokio::test]
1953 async fn global_usd_cents_scope_requires_exchange_rate_config() {
1954 let tmp = tempfile::tempdir().unwrap();
1955 let ledger = SpendLedger::new(tmp.path().to_str().unwrap(), None);
1956
1957 let err = ledger
1958 .set_limits(&[SpendLimit {
1959 rule_id: None,
1960 scope: SpendScope::GlobalUsdCents,
1961 network: None,
1962 wallet: None,
1963 window_s: 3600,
1964 max_spend: 100,
1965 token: None,
1966 }])
1967 .await
1968 .unwrap_err();
1969
1970 assert!(matches!(err, PayError::InvalidAmount(_)));
1971 }
1972
1973 #[cfg(feature = "redb")]
1974 #[tokio::test]
1975 async fn network_scope_native_token_ok_without_exchange_rate() {
1976 let tmp = tempfile::tempdir().unwrap();
1977 let ledger = SpendLedger::new(tmp.path().to_str().unwrap(), None);
1978
1979 ledger
1980 .set_limits(&[SpendLimit {
1981 rule_id: None,
1982 scope: SpendScope::Network,
1983 network: Some("cashu".to_string()),
1984 wallet: None,
1985 window_s: 3600,
1986 max_spend: 100,
1987 token: None,
1988 }])
1989 .await
1990 .expect("network scope should not require exchange_rate");
1991 }
1992
1993 #[test]
1994 fn native_sol_and_evm_assets_can_be_priced_for_global_usd_limits() {
1995 assert_eq!(token_asset("sol", Some("native")), Some(("SOL", 1e9)));
1996 assert_eq!(token_asset("sol", Some("lamports")), Some(("SOL", 1e9)));
1997 assert_eq!(token_asset("evm", Some("native")), Some(("ETH", 1e18)));
1998 assert_eq!(token_asset("evm", Some("wei")), Some(("ETH", 1e18)));
1999 }
2000
2001 #[test]
2002 fn sol_and_evm_limits_without_token_normalize_to_native() {
2003 let mut sol_limit = make_limit(SpendScope::Network, Some("SOL"), None);
2004 normalize_limit(&mut sol_limit);
2005 assert_eq!(sol_limit.network.as_deref(), Some("sol"));
2006 assert_eq!(sol_limit.token.as_deref(), Some("native"));
2007
2008 let mut evm_limit = make_limit(SpendScope::Wallet, Some("evm"), Some("w_evm"));
2009 normalize_limit(&mut evm_limit);
2010 assert_eq!(evm_limit.token.as_deref(), Some("native"));
2011 }
2012
2013 #[test]
2014 fn token_limit_does_not_match_native_gas_debit() {
2015 let mut sol_usdc = make_limit(SpendScope::Network, Some("sol"), None);
2016 sol_usdc.token = Some("usdc".to_string());
2017 normalize_limit(&mut sol_usdc);
2018 assert!(rule_matches_context(&sol_usdc, "sol", None, Some("usdc")));
2019 assert!(!rule_matches_context(
2020 &sol_usdc,
2021 "sol",
2022 None,
2023 Some("native")
2024 ));
2025
2026 let mut evm_usdc = make_limit(SpendScope::Network, Some("evm"), None);
2027 evm_usdc.token = Some("usdc".to_string());
2028 normalize_limit(&mut evm_usdc);
2029 assert!(rule_matches_context(&evm_usdc, "evm", None, Some("usdc")));
2030 assert!(!rule_matches_context(
2031 &evm_usdc,
2032 "evm",
2033 None,
2034 Some("native")
2035 ));
2036 }
2037}