Skip to main content

agent_first_pay/spend/
mod.rs

1#![cfg_attr(not(any(feature = "redb", feature = "postgres")), allow(dead_code))]
2
3pub mod tokens;
4
5use crate::provider::PayError;
6#[cfg(feature = "exchange-rate")]
7use crate::types::ExchangeRateSourceType;
8use crate::types::{ExchangeRateConfig, SpendLimit, SpendLimitStatus, SpendScope};
9#[cfg(feature = "redb")]
10use serde::de::DeserializeOwned;
11use serde::{Deserialize, Serialize};
12use std::collections::hash_map::DefaultHasher;
13use std::hash::{Hash, Hasher};
14use tokio::sync::Mutex;
15
16#[cfg(feature = "redb")]
17use crate::store::db;
18#[cfg(feature = "redb")]
19use redb::{Database, ReadableDatabase, ReadableTable, TableDefinition};
20#[cfg(feature = "redb")]
21use std::path::{Path, PathBuf};
22
23#[cfg(feature = "redb")]
24const META_COUNTER: TableDefinition<&str, u64> = TableDefinition::new("meta_counter");
25#[cfg(feature = "redb")]
26const RULE_BY_ID: TableDefinition<&str, &str> = TableDefinition::new("rule_by_id_v3");
27#[cfg(feature = "redb")]
28const RESERVATION_BY_ID: TableDefinition<u64, &str> = TableDefinition::new("reservation_by_id");
29#[cfg(feature = "redb")]
30const RESERVATION_ID_BY_OP_ID: TableDefinition<&str, u64> =
31    TableDefinition::new("reservation_id_by_op_id");
32#[cfg(feature = "redb")]
33const SPEND_EVENT_BY_ID: TableDefinition<u64, &str> = TableDefinition::new("spend_event_by_id");
34#[cfg(feature = "redb")]
35const FX_QUOTE_BY_PAIR: TableDefinition<&str, &str> = TableDefinition::new("quote_by_pair");
36#[cfg(feature = "redb")]
37const NEXT_RESERVATION_ID_KEY: &str = "next_reservation_id";
38#[cfg(feature = "redb")]
39const NEXT_EVENT_ID_KEY: &str = "next_event_id";
40#[cfg(feature = "redb")]
41const SPEND_VERSION: u64 = 1;
42#[cfg(feature = "redb")]
43const FX_CACHE_VERSION: u64 = 1;
44
45#[derive(Debug, Clone, Hash)]
46pub struct SpendContext {
47    pub network: String,
48    pub wallet: Option<String>,
49    pub amount_native: u64,
50    pub token: Option<String>,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
54enum ReservationStatus {
55    Pending,
56    Confirmed,
57    Cancelled,
58    Expired,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
62struct SpendReservation {
63    reservation_id: u64,
64    op_id: String,
65    network: String,
66    wallet: Option<String>,
67    #[serde(default)]
68    token: Option<String>,
69    amount_native: u64,
70    amount_usd_cents: Option<u64>,
71    status: ReservationStatus,
72    created_at_epoch_ms: u64,
73    expires_at_epoch_ms: u64,
74    finalized_at_epoch_ms: Option<u64>,
75    #[serde(default, skip_serializing_if = "Option::is_none")]
76    request_hash: Option<String>,
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
80struct SpendEvent {
81    event_id: u64,
82    reservation_id: u64,
83    op_id: String,
84    network: String,
85    wallet: Option<String>,
86    #[serde(default)]
87    token: Option<String>,
88    amount_native: u64,
89    amount_usd_cents: Option<u64>,
90    created_at_epoch_ms: u64,
91    confirmed_at_epoch_ms: u64,
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
95struct ExchangeRateQuote {
96    pair: String,
97    source: String,
98    price: f64,
99    fetched_at_epoch_ms: u64,
100    expires_at_epoch_ms: u64,
101}
102
103// ═══════════════════════════════════════════
104// SpendBackend
105// ═══════════════════════════════════════════
106
107#[allow(dead_code)] // None variant used when neither redb nor postgres features are enabled
108enum SpendBackend {
109    #[cfg(feature = "redb")]
110    Redb {
111        data_dir: String,
112    },
113    #[cfg(feature = "postgres")]
114    Postgres {
115        pool: sqlx::PgPool,
116    },
117    None,
118}
119
120// ═══════════════════════════════════════════
121// SpendLedger
122// ═══════════════════════════════════════════
123
124pub struct SpendLedger {
125    backend: SpendBackend,
126    exchange_rate: Option<ExchangeRateConfig>,
127    mu: Mutex<()>,
128    /// Set to true when a cached FX quote's age exceeds 80% of its TTL.
129    fx_stale_warned: std::sync::atomic::AtomicBool,
130}
131
132impl SpendLedger {
133    pub fn new(data_dir: &str, exchange_rate: Option<ExchangeRateConfig>) -> Self {
134        #[cfg(feature = "redb")]
135        let backend = SpendBackend::Redb {
136            data_dir: data_dir.to_string(),
137        };
138        #[cfg(not(feature = "redb"))]
139        let backend = {
140            let _ = data_dir;
141            SpendBackend::None
142        };
143        Self {
144            backend,
145            exchange_rate,
146            mu: Mutex::new(()),
147            fx_stale_warned: std::sync::atomic::AtomicBool::new(false),
148        }
149    }
150
151    #[cfg(feature = "postgres")]
152    pub fn new_postgres(pool: sqlx::PgPool, exchange_rate: Option<ExchangeRateConfig>) -> Self {
153        Self {
154            backend: SpendBackend::Postgres { pool },
155            exchange_rate,
156            mu: Mutex::new(()),
157            fx_stale_warned: std::sync::atomic::AtomicBool::new(false),
158        }
159    }
160
161    /// Returns true (once) if a stale FX quote was used since last check.
162    pub fn take_fx_stale_warning(&self) -> bool {
163        self.fx_stale_warned
164            .swap(false, std::sync::atomic::Ordering::Relaxed)
165    }
166
167    /// Add a single spend limit rule. Generates and assigns a rule_id, returns it.
168    pub async fn add_limit(&self, limit: &mut SpendLimit) -> Result<String, PayError> {
169        normalize_limit(limit);
170        validate_limit(limit, self.exchange_rate.as_ref())?;
171
172        let _guard = self.mu.lock().await;
173
174        match &self.backend {
175            #[cfg(feature = "redb")]
176            SpendBackend::Redb { .. } => self.add_limit_redb(limit),
177            #[cfg(feature = "postgres")]
178            SpendBackend::Postgres { .. } => self.add_limit_postgres(limit).await,
179            SpendBackend::None => Err(PayError::NotImplemented(
180                "no storage backend for spend limits".to_string(),
181            )),
182        }
183    }
184
185    /// Remove a spend limit rule by its rule_id.
186    pub async fn remove_limit(&self, _rule_id: &str) -> Result<(), PayError> {
187        let _guard = self.mu.lock().await;
188
189        match &self.backend {
190            #[cfg(feature = "redb")]
191            SpendBackend::Redb { .. } => self.remove_limit_redb(_rule_id),
192            #[cfg(feature = "postgres")]
193            SpendBackend::Postgres { .. } => self.remove_limit_postgres(_rule_id).await,
194            SpendBackend::None => Err(PayError::NotImplemented(
195                "no storage backend for spend limits".to_string(),
196            )),
197        }
198    }
199
200    /// Replace all spend limits (used by config patch / pipe mode).
201    pub async fn set_limits(&self, limits: &[SpendLimit]) -> Result<(), PayError> {
202        let mut limits = limits.to_vec();
203        for limit in &mut limits {
204            normalize_limit(limit);
205            validate_limit(limit, self.exchange_rate.as_ref())?;
206        }
207
208        let _guard = self.mu.lock().await;
209
210        match &self.backend {
211            #[cfg(feature = "redb")]
212            SpendBackend::Redb { .. } => self.set_limits_redb(&limits),
213            #[cfg(feature = "postgres")]
214            SpendBackend::Postgres { .. } => self.set_limits_postgres(&limits).await,
215            SpendBackend::None => Err(PayError::NotImplemented(
216                "no storage backend for spend limits".to_string(),
217            )),
218        }
219    }
220
221    /// Compute current status for all limits.
222    pub async fn get_status(&self) -> Result<Vec<SpendLimitStatus>, PayError> {
223        let _guard = self.mu.lock().await;
224
225        match &self.backend {
226            #[cfg(feature = "redb")]
227            SpendBackend::Redb { .. } => self.get_status_redb(),
228            #[cfg(feature = "postgres")]
229            SpendBackend::Postgres { .. } => self.get_status_postgres().await,
230            SpendBackend::None => Ok(Vec::new()),
231        }
232    }
233
234    /// Reserve spend against all matching limits, returns reservation id.
235    pub async fn reserve(&self, op_id: &str, ctx: &SpendContext) -> Result<u64, PayError> {
236        if op_id.trim().is_empty() {
237            return Err(PayError::InvalidAmount("op_id cannot be empty".to_string()));
238        }
239        if ctx.network.trim().is_empty() {
240            return Err(PayError::InvalidAmount(
241                "network cannot be empty for spend check".to_string(),
242            ));
243        }
244        let request_hash = spend_request_hash(op_id, ctx);
245
246        let _guard = self.mu.lock().await;
247
248        match &self.backend {
249            #[cfg(feature = "redb")]
250            SpendBackend::Redb { .. } => self.reserve_redb(op_id, ctx, &request_hash).await,
251            #[cfg(feature = "postgres")]
252            SpendBackend::Postgres { .. } => self.reserve_postgres(op_id, ctx, &request_hash).await,
253            SpendBackend::None => Err(PayError::NotImplemented(
254                "no storage backend for spend limits".to_string(),
255            )),
256        }
257    }
258
259    pub async fn confirm(&self, _reservation_id: u64) -> Result<(), PayError> {
260        let _guard = self.mu.lock().await;
261
262        match &self.backend {
263            #[cfg(feature = "redb")]
264            SpendBackend::Redb { .. } => self.confirm_redb(_reservation_id),
265            #[cfg(feature = "postgres")]
266            SpendBackend::Postgres { .. } => self.confirm_postgres(_reservation_id).await,
267            SpendBackend::None => Err(PayError::NotImplemented(
268                "no storage backend for spend limits".to_string(),
269            )),
270        }
271    }
272
273    pub async fn cancel(&self, _reservation_id: u64) -> Result<(), PayError> {
274        let _guard = self.mu.lock().await;
275
276        match &self.backend {
277            #[cfg(feature = "redb")]
278            SpendBackend::Redb { .. } => self.cancel_redb(_reservation_id),
279            #[cfg(feature = "postgres")]
280            SpendBackend::Postgres { .. } => self.cancel_postgres(_reservation_id).await,
281            SpendBackend::None => Ok(()),
282        }
283    }
284}
285
286// ═══════════════════════════════════════════
287// Redb backend implementation
288// ═══════════════════════════════════════════
289
290#[cfg(feature = "redb")]
291impl SpendLedger {
292    fn spend_db_path(&self) -> PathBuf {
293        match &self.backend {
294            SpendBackend::Redb { data_dir } => Path::new(data_dir).join("spend").join("spend.redb"),
295            #[allow(unreachable_patterns)]
296            _ => PathBuf::new(),
297        }
298    }
299
300    fn exchange_rate_db_path(&self) -> PathBuf {
301        match &self.backend {
302            SpendBackend::Redb { data_dir } => Path::new(data_dir)
303                .join("spend")
304                .join("exchange-rate-cache.redb"),
305            #[allow(unreachable_patterns)]
306            _ => PathBuf::new(),
307        }
308    }
309
310    fn open_spend_db(&self) -> Result<Database, PayError> {
311        db::open_and_migrate(
312            &self.spend_db_path(),
313            SPEND_VERSION,
314            &[
315                // v0 → v1: no data migration, just stamp version
316                &|_db: &Database| Ok(()),
317            ],
318        )
319    }
320
321    fn open_exchange_rate_db(&self) -> Result<Database, PayError> {
322        db::open_and_migrate(
323            &self.exchange_rate_db_path(),
324            FX_CACHE_VERSION,
325            &[
326                // v0 → v1: no data migration, just stamp version
327                &|_db: &Database| Ok(()),
328            ],
329        )
330    }
331
332    fn add_limit_redb(&self, limit: &mut SpendLimit) -> Result<String, PayError> {
333        let db = self.open_spend_db()?;
334        let rule_id = generate_rule_identifier()?;
335        limit.rule_id = Some(rule_id.clone());
336        let encoded = encode(limit)?;
337        let write_txn = db
338            .begin_write()
339            .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
340        {
341            let mut rule_table = write_txn
342                .open_table(RULE_BY_ID)
343                .map_err(|e| PayError::InternalError(format!("spend open rule table: {e}")))?;
344            rule_table
345                .insert(rule_id.as_str(), encoded.as_str())
346                .map_err(|e| PayError::InternalError(format!("spend insert rule: {e}")))?;
347        }
348        write_txn
349            .commit()
350            .map_err(|e| PayError::InternalError(format!("spend commit add_limit: {e}")))?;
351        Ok(rule_id)
352    }
353
354    fn remove_limit_redb(&self, rule_id: &str) -> Result<(), PayError> {
355        let db = self.open_spend_db()?;
356        let write_txn = db
357            .begin_write()
358            .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
359        {
360            let mut rule_table = write_txn
361                .open_table(RULE_BY_ID)
362                .map_err(|e| PayError::InternalError(format!("spend open rule table: {e}")))?;
363            let existed = rule_table
364                .remove(rule_id)
365                .map_err(|e| PayError::InternalError(format!("spend remove rule: {e}")))?;
366            if existed.is_none() {
367                return Err(PayError::InvalidAmount(format!(
368                    "rule_id '{rule_id}' not found"
369                )));
370            }
371        }
372        write_txn
373            .commit()
374            .map_err(|e| PayError::InternalError(format!("spend commit remove_limit: {e}")))
375    }
376
377    fn set_limits_redb(&self, limits: &[SpendLimit]) -> Result<(), PayError> {
378        let db = self.open_spend_db()?;
379        let write_txn = db
380            .begin_write()
381            .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
382        {
383            let mut rule_table = write_txn
384                .open_table(RULE_BY_ID)
385                .map_err(|e| PayError::InternalError(format!("spend open rule table: {e}")))?;
386            // Clear existing rules
387            let existing_ids = rule_table
388                .iter()
389                .map_err(|e| PayError::InternalError(format!("spend iterate rules: {e}")))?
390                .map(|entry| {
391                    entry
392                        .map(|(k, _)| k.value().to_string())
393                        .map_err(|e| PayError::InternalError(format!("spend read rule key: {e}")))
394                })
395                .collect::<Result<Vec<_>, _>>()?;
396            for rid in existing_ids {
397                rule_table
398                    .remove(rid.as_str())
399                    .map_err(|e| PayError::InternalError(format!("spend remove rule: {e}")))?;
400            }
401
402            // Insert new rules with generated IDs
403            for limit in limits {
404                let mut rule = limit.clone();
405                let rid = generate_rule_identifier()?;
406                rule.rule_id = Some(rid.clone());
407                let encoded = encode(&rule)?;
408                rule_table
409                    .insert(rid.as_str(), encoded.as_str())
410                    .map_err(|e| PayError::InternalError(format!("spend insert rule: {e}")))?;
411            }
412        }
413        write_txn
414            .commit()
415            .map_err(|e| PayError::InternalError(format!("spend commit set_limits: {e}")))
416    }
417
418    fn get_status_redb(&self) -> Result<Vec<SpendLimitStatus>, PayError> {
419        let db = self.open_spend_db()?;
420        let read_txn = db
421            .begin_read()
422            .map_err(|e| PayError::InternalError(format!("spend begin_read: {e}")))?;
423        let rules = load_rules(&read_txn)?;
424        let reservations = load_reservations(&read_txn)?;
425        let now = now_epoch_ms();
426        let mut out = Vec::with_capacity(rules.len());
427        for rule in rules {
428            let use_usd = rule.scope == SpendScope::GlobalUsdCents;
429            let (spent, oldest_ts) = spent_in_window(&rule, &reservations, now, use_usd)?;
430            let remaining = rule.max_spend.saturating_sub(spent);
431            let window_ms = rule.window_s.saturating_mul(1000);
432            let window_reset_s = oldest_ts
433                .map(|oldest| (oldest.saturating_add(window_ms)).saturating_sub(now) / 1000)
434                .unwrap_or(0);
435            out.push(SpendLimitStatus {
436                rule_id: rule.rule_id.clone().unwrap_or_default(),
437                scope: rule.scope,
438                network: rule.network.clone(),
439                wallet: rule.wallet.clone(),
440                window_s: rule.window_s,
441                max_spend: rule.max_spend,
442                spent,
443                remaining,
444                token: rule.token.clone(),
445                window_reset_s,
446            });
447        }
448        Ok(out)
449    }
450
451    async fn reserve_redb(
452        &self,
453        op_id: &str,
454        ctx: &SpendContext,
455        request_hash: &str,
456    ) -> Result<u64, PayError> {
457        let now = now_epoch_ms();
458        let db = self.open_spend_db()?;
459
460        let read_txn = db
461            .begin_read()
462            .map_err(|e| PayError::InternalError(format!("spend begin_read: {e}")))?;
463        let rules = load_rules(&read_txn)?;
464
465        if rules.iter().any(|r| {
466            r.scope == SpendScope::Wallet
467                && r.network.as_deref() == Some(ctx.network.as_str())
468                && ctx.wallet.is_none()
469        }) {
470            return Err(PayError::InvalidAmount(
471                "wallet-scoped limits require an explicit wallet".to_string(),
472            ));
473        }
474
475        // GlobalUsdCents scope needs USD conversion
476        let needs_usd = rules.iter().any(|r| r.scope == SpendScope::GlobalUsdCents);
477        let amount_usd_cents = if needs_usd {
478            Some(
479                self.amount_to_usd_cents(&ctx.network, ctx.token.as_deref(), ctx.amount_native)
480                    .await?,
481            )
482        } else {
483            None
484        };
485
486        let write_txn = db
487            .begin_write()
488            .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
489
490        let mut encoded_blobs: Vec<String> = Vec::new();
491        let reservation_id = {
492            let mut reservation_index =
493                write_txn.open_table(RESERVATION_ID_BY_OP_ID).map_err(|e| {
494                    PayError::InternalError(format!("spend open reservation op index: {e}"))
495                })?;
496            if let Some(existing) = reservation_index
497                .get(op_id)
498                .map_err(|e| PayError::InternalError(format!("spend read op index: {e}")))?
499            {
500                let existing_id = existing.value();
501                let reservation_table = write_txn.open_table(RESERVATION_BY_ID).map_err(|e| {
502                    PayError::InternalError(format!("spend open reservation table: {e}"))
503                })?;
504                let status = reservation_table
505                    .get(existing_id)
506                    .map_err(|e| PayError::InternalError(format!("spend read reservation: {e}")))?
507                    .map(|value| decode::<SpendReservation>(value.value()))
508                    .transpose()?
509                    .map(|reservation| reservation.status)
510                    .unwrap_or(ReservationStatus::Pending);
511                return Err(duplicate_reservation_error(op_id, existing_id, &status));
512            }
513
514            let mut reservation_table = write_txn.open_table(RESERVATION_BY_ID).map_err(|e| {
515                PayError::InternalError(format!("spend open reservation table: {e}"))
516            })?;
517
518            expire_pending(&mut reservation_table, now)?;
519
520            let reservations = reservation_table
521                .iter()
522                .map_err(|e| PayError::InternalError(format!("spend iterate reservations: {e}")))?
523                .map(|entry| {
524                    let (_k, v) = entry.map_err(|e| {
525                        PayError::InternalError(format!("spend read reservation: {e}"))
526                    })?;
527                    decode::<SpendReservation>(v.value())
528                        .map_err(|e| prepend_err("spend decode reservation", e))
529                })
530                .collect::<Result<Vec<_>, _>>()?;
531
532            for rule in rules.iter() {
533                if !rule_matches_context(
534                    rule,
535                    &ctx.network,
536                    ctx.wallet.as_deref(),
537                    ctx.token.as_deref(),
538                ) {
539                    continue;
540                }
541
542                let use_usd = rule.scope == SpendScope::GlobalUsdCents;
543                let candidate_amount =
544                    amount_for_rule(rule, ctx.amount_native, amount_usd_cents, use_usd)?;
545                let (spent, oldest_ts) = spent_in_window(rule, &reservations, now, use_usd)?;
546                if spent.saturating_add(candidate_amount) > rule.max_spend {
547                    let window_ms = rule.window_s.saturating_mul(1000);
548                    let remaining_s = oldest_ts
549                        .map(|oldest| (oldest.saturating_add(window_ms)).saturating_sub(now) / 1000)
550                        .unwrap_or(0);
551
552                    return Err(PayError::LimitExceeded {
553                        rule_id: rule.rule_id.clone().unwrap_or_default(),
554                        scope: rule.scope,
555                        scope_key: scope_key(rule),
556                        spent,
557                        max_spend: rule.max_spend,
558                        token: rule.token.clone(),
559                        remaining_s,
560                        origin: None,
561                    });
562                }
563            }
564
565            let reservation_id = next_counter(&write_txn, NEXT_RESERVATION_ID_KEY)?;
566            let reservation = SpendReservation {
567                reservation_id,
568                op_id: op_id.to_string(),
569                network: ctx.network.clone(),
570                wallet: ctx.wallet.clone(),
571                token: ctx.token.clone(),
572                amount_native: ctx.amount_native,
573                amount_usd_cents,
574                status: ReservationStatus::Pending,
575                created_at_epoch_ms: now,
576                expires_at_epoch_ms: now.saturating_add(300_000),
577                finalized_at_epoch_ms: None,
578                request_hash: Some(request_hash.to_string()),
579            };
580            encoded_blobs.push(encode(&reservation)?);
581            let encoded = encoded_blobs
582                .last()
583                .ok_or_else(|| PayError::InternalError("missing reservation blob".to_string()))?;
584            reservation_table
585                .insert(reservation_id, encoded.as_str())
586                .map_err(|e| PayError::InternalError(format!("spend insert reservation: {e}")))?;
587            reservation_index
588                .insert(op_id, reservation_id)
589                .map_err(|e| PayError::InternalError(format!("spend insert op index: {e}")))?;
590            reservation_id
591        };
592
593        write_txn
594            .commit()
595            .map_err(|e| PayError::InternalError(format!("spend commit reserve: {e}")))?;
596        Ok(reservation_id)
597    }
598
599    fn confirm_redb(&self, reservation_id: u64) -> Result<(), PayError> {
600        let db = self.open_spend_db()?;
601        let now = now_epoch_ms();
602
603        let write_txn = db
604            .begin_write()
605            .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
606
607        let mut encoded_blobs: Vec<String> = Vec::new();
608        {
609            let mut reservation_table = write_txn.open_table(RESERVATION_BY_ID).map_err(|e| {
610                PayError::InternalError(format!("spend open reservation table: {e}"))
611            })?;
612            let Some(existing_bytes) = reservation_table
613                .get(reservation_id)
614                .map_err(|e| PayError::InternalError(format!("spend read reservation: {e}")))?
615                .map(|g| g.value().to_string())
616            else {
617                return Err(PayError::InternalError(format!(
618                    "reservation {reservation_id} not found"
619                )));
620            };
621
622            let mut reservation: SpendReservation = decode(&existing_bytes)?;
623            if !matches!(reservation.status, ReservationStatus::Pending) {
624                return Ok(());
625            }
626
627            reservation.status = ReservationStatus::Confirmed;
628            reservation.finalized_at_epoch_ms = Some(now);
629            encoded_blobs.push(encode(&reservation)?);
630            let encoded = encoded_blobs
631                .last()
632                .ok_or_else(|| PayError::InternalError("missing reservation blob".to_string()))?;
633            reservation_table
634                .insert(reservation_id, encoded.as_str())
635                .map_err(|e| PayError::InternalError(format!("spend update reservation: {e}")))?;
636
637            let mut events = write_txn
638                .open_table(SPEND_EVENT_BY_ID)
639                .map_err(|e| PayError::InternalError(format!("spend open event table: {e}")))?;
640            let event_id = next_counter(&write_txn, NEXT_EVENT_ID_KEY)?;
641            let event = SpendEvent {
642                event_id,
643                reservation_id,
644                op_id: reservation.op_id,
645                network: reservation.network,
646                wallet: reservation.wallet,
647                token: reservation.token,
648                amount_native: reservation.amount_native,
649                amount_usd_cents: reservation.amount_usd_cents,
650                created_at_epoch_ms: reservation.created_at_epoch_ms,
651                confirmed_at_epoch_ms: now,
652            };
653            encoded_blobs.push(encode(&event)?);
654            let encoded_event = encoded_blobs
655                .last()
656                .ok_or_else(|| PayError::InternalError("missing event blob".to_string()))?;
657            events
658                .insert(event_id, encoded_event.as_str())
659                .map_err(|e| PayError::InternalError(format!("spend insert event: {e}")))?;
660        }
661
662        write_txn
663            .commit()
664            .map_err(|e| PayError::InternalError(format!("spend commit confirm: {e}")))
665    }
666
667    fn cancel_redb(&self, reservation_id: u64) -> Result<(), PayError> {
668        let db = self.open_spend_db()?;
669        let now = now_epoch_ms();
670
671        let write_txn = db
672            .begin_write()
673            .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
674
675        let mut encoded_blobs: Vec<String> = Vec::new();
676        {
677            let mut reservation_table = write_txn.open_table(RESERVATION_BY_ID).map_err(|e| {
678                PayError::InternalError(format!("spend open reservation table: {e}"))
679            })?;
680            let existing = reservation_table
681                .get(reservation_id)
682                .map_err(|e| PayError::InternalError(format!("spend read reservation: {e}")))?;
683            let existing_bytes = existing.map(|g| g.value().to_string());
684            if let Some(existing_bytes) = existing_bytes {
685                let mut reservation: SpendReservation = decode(&existing_bytes)?;
686                if matches!(reservation.status, ReservationStatus::Pending) {
687                    reservation.status = ReservationStatus::Cancelled;
688                    reservation.finalized_at_epoch_ms = Some(now);
689                    encoded_blobs.push(encode(&reservation)?);
690                    let encoded = encoded_blobs.last().ok_or_else(|| {
691                        PayError::InternalError("missing reservation blob".to_string())
692                    })?;
693                    reservation_table
694                        .insert(reservation_id, encoded.as_str())
695                        .map_err(|e| {
696                            PayError::InternalError(format!("spend update reservation: {e}"))
697                        })?;
698                }
699            }
700        }
701
702        write_txn
703            .commit()
704            .map_err(|e| PayError::InternalError(format!("spend commit cancel: {e}")))
705    }
706}
707
708// ═══════════════════════════════════════════
709// Postgres backend implementation
710// ═══════════════════════════════════════════
711
712#[cfg(feature = "postgres")]
713impl SpendLedger {
714    fn pg_pool(&self) -> Result<&sqlx::PgPool, PayError> {
715        match &self.backend {
716            SpendBackend::Postgres { pool } => Ok(pool),
717            _ => Err(PayError::InternalError(
718                "expected postgres spend backend".to_string(),
719            )),
720        }
721    }
722
723    async fn add_limit_postgres(&self, limit: &mut SpendLimit) -> Result<String, PayError> {
724        let pool = self.pg_pool()?;
725        let rule_id = generate_rule_identifier()?;
726        limit.rule_id = Some(rule_id.clone());
727        let rule_json = serde_json::to_value(limit)
728            .map_err(|e| PayError::InternalError(format!("serialize spend rule: {e}")))?;
729
730        sqlx::query("INSERT INTO spend_rules (rule_id, rule) VALUES ($1, $2)")
731            .bind(&rule_id)
732            .bind(&rule_json)
733            .execute(pool)
734            .await
735            .map_err(|e| PayError::InternalError(format!("pg insert spend rule: {e}")))?;
736
737        Ok(rule_id)
738    }
739
740    async fn remove_limit_postgres(&self, rule_id: &str) -> Result<(), PayError> {
741        let pool = self.pg_pool()?;
742        let result = sqlx::query("DELETE FROM spend_rules WHERE rule_id = $1")
743            .bind(rule_id)
744            .execute(pool)
745            .await
746            .map_err(|e| PayError::InternalError(format!("pg delete spend rule: {e}")))?;
747
748        if result.rows_affected() == 0 {
749            return Err(PayError::InvalidAmount(format!(
750                "rule_id '{rule_id}' not found"
751            )));
752        }
753        Ok(())
754    }
755
756    async fn set_limits_postgres(&self, limits: &[SpendLimit]) -> Result<(), PayError> {
757        let pool = self.pg_pool()?;
758        let mut tx = pool
759            .begin()
760            .await
761            .map_err(|e| PayError::InternalError(format!("pg begin tx: {e}")))?;
762
763        sqlx::query("DELETE FROM spend_rules")
764            .execute(&mut *tx)
765            .await
766            .map_err(|e| PayError::InternalError(format!("pg clear spend rules: {e}")))?;
767
768        for limit in limits {
769            let mut rule = limit.clone();
770            let rid = generate_rule_identifier()?;
771            rule.rule_id = Some(rid.clone());
772            let rule_json = serde_json::to_value(&rule)
773                .map_err(|e| PayError::InternalError(format!("serialize spend rule: {e}")))?;
774            sqlx::query("INSERT INTO spend_rules (rule_id, rule) VALUES ($1, $2)")
775                .bind(&rid)
776                .bind(&rule_json)
777                .execute(&mut *tx)
778                .await
779                .map_err(|e| PayError::InternalError(format!("pg insert spend rule: {e}")))?;
780        }
781
782        tx.commit()
783            .await
784            .map_err(|e| PayError::InternalError(format!("pg commit set_limits: {e}")))
785    }
786
787    async fn get_status_postgres(&self) -> Result<Vec<SpendLimitStatus>, PayError> {
788        let pool = self.pg_pool()?;
789        let rules = pg_load_rules(pool).await?;
790        let reservations = pg_load_reservations(pool).await?;
791        let now = now_epoch_ms();
792
793        let mut out = Vec::with_capacity(rules.len());
794        for rule in rules {
795            let use_usd = rule.scope == SpendScope::GlobalUsdCents;
796            let (spent, oldest_ts) = spent_in_window(&rule, &reservations, now, use_usd)?;
797            let remaining = rule.max_spend.saturating_sub(spent);
798            let window_ms = rule.window_s.saturating_mul(1000);
799            let window_reset_s = oldest_ts
800                .map(|oldest| (oldest.saturating_add(window_ms)).saturating_sub(now) / 1000)
801                .unwrap_or(0);
802            out.push(SpendLimitStatus {
803                rule_id: rule.rule_id.clone().unwrap_or_default(),
804                scope: rule.scope,
805                network: rule.network.clone(),
806                wallet: rule.wallet.clone(),
807                window_s: rule.window_s,
808                max_spend: rule.max_spend,
809                spent,
810                remaining,
811                token: rule.token.clone(),
812                window_reset_s,
813            });
814        }
815        Ok(out)
816    }
817
818    async fn reserve_postgres(
819        &self,
820        op_id: &str,
821        ctx: &SpendContext,
822        request_hash: &str,
823    ) -> Result<u64, PayError> {
824        use crate::store::postgres_store::SPEND_ADVISORY_LOCK_KEY;
825
826        let pool = self.pg_pool()?;
827        let now = now_epoch_ms();
828
829        // Pre-flight: load rules outside the transaction for USD conversion
830        let rules = pg_load_rules(pool).await?;
831        if rules.iter().any(|r| {
832            r.scope == SpendScope::Wallet
833                && r.network.as_deref() == Some(ctx.network.as_str())
834                && ctx.wallet.is_none()
835        }) {
836            return Err(PayError::InvalidAmount(
837                "wallet-scoped limits require an explicit wallet".to_string(),
838            ));
839        }
840
841        let needs_usd = rules.iter().any(|r| r.scope == SpendScope::GlobalUsdCents);
842        let amount_usd_cents = if needs_usd {
843            Some(
844                self.amount_to_usd_cents(&ctx.network, ctx.token.as_deref(), ctx.amount_native)
845                    .await?,
846            )
847        } else {
848            None
849        };
850
851        // Begin serializable transaction with advisory lock
852        let mut tx = pool
853            .begin()
854            .await
855            .map_err(|e| PayError::InternalError(format!("pg begin tx: {e}")))?;
856
857        sqlx::query("SELECT pg_advisory_xact_lock($1)")
858            .bind(SPEND_ADVISORY_LOCK_KEY)
859            .execute(&mut *tx)
860            .await
861            .map_err(|e| PayError::InternalError(format!("pg advisory lock: {e}")))?;
862
863        // Check for existing reservation with same op_id (idempotency)
864        let existing: Option<(i64, serde_json::Value)> = sqlx::query_as(
865            "SELECT reservation_id, reservation FROM spend_reservations WHERE op_id = $1",
866        )
867        .bind(op_id)
868        .fetch_optional(&mut *tx)
869        .await
870        .map_err(|e| PayError::InternalError(format!("pg check op_id: {e}")))?;
871
872        if let Some((rid, reservation_json)) = existing {
873            let status = serde_json::from_value::<SpendReservation>(reservation_json)
874                .map(|reservation| reservation.status)
875                .unwrap_or(ReservationStatus::Pending);
876            return Err(duplicate_reservation_error(op_id, rid as u64, &status));
877        }
878
879        // Expire pending reservations
880        pg_expire_pending(&mut tx, now).await?;
881
882        // Load all reservations within the lock
883        let reservations = pg_load_reservations_tx(&mut tx).await?;
884
885        // Re-load rules within the lock (could have changed)
886        let rules = pg_load_rules_tx(&mut tx).await?;
887
888        // Check limits
889        for rule in rules.iter() {
890            if !rule_matches_context(
891                rule,
892                &ctx.network,
893                ctx.wallet.as_deref(),
894                ctx.token.as_deref(),
895            ) {
896                continue;
897            }
898
899            let use_usd = rule.scope == SpendScope::GlobalUsdCents;
900            let candidate_amount =
901                amount_for_rule(rule, ctx.amount_native, amount_usd_cents, use_usd)?;
902            let (spent, oldest_ts) = spent_in_window(rule, &reservations, now, use_usd)?;
903            if spent.saturating_add(candidate_amount) > rule.max_spend {
904                let window_ms = rule.window_s.saturating_mul(1000);
905                let remaining_s = oldest_ts
906                    .map(|oldest| (oldest.saturating_add(window_ms)).saturating_sub(now) / 1000)
907                    .unwrap_or(0);
908
909                return Err(PayError::LimitExceeded {
910                    rule_id: rule.rule_id.clone().unwrap_or_default(),
911                    scope: rule.scope,
912                    scope_key: scope_key(rule),
913                    spent,
914                    max_spend: rule.max_spend,
915                    token: rule.token.clone(),
916                    remaining_s,
917                    origin: None,
918                });
919            }
920        }
921
922        // Insert reservation
923        let reservation = SpendReservation {
924            reservation_id: 0, // will be assigned by BIGSERIAL
925            op_id: op_id.to_string(),
926            network: ctx.network.clone(),
927            wallet: ctx.wallet.clone(),
928            token: ctx.token.clone(),
929            amount_native: ctx.amount_native,
930            amount_usd_cents,
931            status: ReservationStatus::Pending,
932            created_at_epoch_ms: now,
933            expires_at_epoch_ms: now.saturating_add(300_000),
934            finalized_at_epoch_ms: None,
935            request_hash: Some(request_hash.to_string()),
936        };
937        let reservation_json = serde_json::to_value(&reservation)
938            .map_err(|e| PayError::InternalError(format!("serialize reservation: {e}")))?;
939
940        let row: (i64,) = sqlx::query_as(
941            "INSERT INTO spend_reservations (op_id, reservation) \
942             VALUES ($1, $2) RETURNING reservation_id",
943        )
944        .bind(op_id)
945        .bind(&reservation_json)
946        .fetch_one(&mut *tx)
947        .await
948        .map_err(|e| PayError::InternalError(format!("pg insert reservation: {e}")))?;
949
950        let reservation_id = row.0 as u64;
951
952        // Update the reservation JSON with the assigned ID
953        let mut updated_json = reservation_json;
954        updated_json["reservation_id"] = serde_json::json!(reservation_id);
955        sqlx::query("UPDATE spend_reservations SET reservation = $1 WHERE reservation_id = $2")
956            .bind(&updated_json)
957            .bind(row.0)
958            .execute(&mut *tx)
959            .await
960            .map_err(|e| PayError::InternalError(format!("pg update reservation id: {e}")))?;
961
962        tx.commit()
963            .await
964            .map_err(|e| PayError::InternalError(format!("pg commit reserve: {e}")))?;
965
966        Ok(reservation_id)
967    }
968
969    async fn confirm_postgres(&self, reservation_id: u64) -> Result<(), PayError> {
970        let pool = self.pg_pool()?;
971        let now = now_epoch_ms();
972        let rid = reservation_id as i64;
973
974        let row: Option<(serde_json::Value,)> =
975            sqlx::query_as("SELECT reservation FROM spend_reservations WHERE reservation_id = $1")
976                .bind(rid)
977                .fetch_optional(pool)
978                .await
979                .map_err(|e| PayError::InternalError(format!("pg read reservation: {e}")))?;
980
981        let Some((res_json,)) = row else {
982            return Err(PayError::InternalError(format!(
983                "reservation {reservation_id} not found"
984            )));
985        };
986
987        let mut reservation: SpendReservation = serde_json::from_value(res_json)
988            .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))?;
989
990        if !matches!(reservation.status, ReservationStatus::Pending) {
991            return Ok(());
992        }
993
994        reservation.status = ReservationStatus::Confirmed;
995        reservation.finalized_at_epoch_ms = Some(now);
996        let updated_json = serde_json::to_value(&reservation)
997            .map_err(|e| PayError::InternalError(format!("serialize reservation: {e}")))?;
998
999        let event = SpendEvent {
1000            event_id: 0, // assigned by BIGSERIAL
1001            reservation_id,
1002            op_id: reservation.op_id,
1003            network: reservation.network,
1004            wallet: reservation.wallet,
1005            token: reservation.token,
1006            amount_native: reservation.amount_native,
1007            amount_usd_cents: reservation.amount_usd_cents,
1008            created_at_epoch_ms: reservation.created_at_epoch_ms,
1009            confirmed_at_epoch_ms: now,
1010        };
1011        let event_json = serde_json::to_value(&event)
1012            .map_err(|e| PayError::InternalError(format!("serialize spend event: {e}")))?;
1013
1014        let mut tx = pool
1015            .begin()
1016            .await
1017            .map_err(|e| PayError::InternalError(format!("pg begin tx: {e}")))?;
1018
1019        sqlx::query("UPDATE spend_reservations SET reservation = $1 WHERE reservation_id = $2")
1020            .bind(&updated_json)
1021            .bind(rid)
1022            .execute(&mut *tx)
1023            .await
1024            .map_err(|e| PayError::InternalError(format!("pg update reservation: {e}")))?;
1025
1026        sqlx::query("INSERT INTO spend_events (reservation_id, event) VALUES ($1, $2)")
1027            .bind(rid)
1028            .bind(&event_json)
1029            .execute(&mut *tx)
1030            .await
1031            .map_err(|e| PayError::InternalError(format!("pg insert spend event: {e}")))?;
1032
1033        tx.commit()
1034            .await
1035            .map_err(|e| PayError::InternalError(format!("pg commit confirm: {e}")))
1036    }
1037
1038    async fn cancel_postgres(&self, reservation_id: u64) -> Result<(), PayError> {
1039        let pool = self.pg_pool()?;
1040        let now = now_epoch_ms();
1041        let rid = reservation_id as i64;
1042
1043        let row: Option<(serde_json::Value,)> =
1044            sqlx::query_as("SELECT reservation FROM spend_reservations WHERE reservation_id = $1")
1045                .bind(rid)
1046                .fetch_optional(pool)
1047                .await
1048                .map_err(|e| PayError::InternalError(format!("pg read reservation: {e}")))?;
1049
1050        if let Some((res_json,)) = row {
1051            let mut reservation: SpendReservation = serde_json::from_value(res_json)
1052                .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))?;
1053
1054            if matches!(reservation.status, ReservationStatus::Pending) {
1055                reservation.status = ReservationStatus::Cancelled;
1056                reservation.finalized_at_epoch_ms = Some(now);
1057                let updated_json = serde_json::to_value(&reservation)
1058                    .map_err(|e| PayError::InternalError(format!("serialize reservation: {e}")))?;
1059
1060                sqlx::query(
1061                    "UPDATE spend_reservations SET reservation = $1 WHERE reservation_id = $2",
1062                )
1063                .bind(&updated_json)
1064                .bind(rid)
1065                .execute(pool)
1066                .await
1067                .map_err(|e| PayError::InternalError(format!("pg update reservation: {e}")))?;
1068            }
1069        }
1070
1071        Ok(())
1072    }
1073}
1074
1075#[cfg(feature = "postgres")]
1076async fn pg_load_rules(pool: &sqlx::PgPool) -> Result<Vec<SpendLimit>, PayError> {
1077    let rows: Vec<(serde_json::Value,)> =
1078        sqlx::query_as("SELECT rule FROM spend_rules ORDER BY rule_id")
1079            .fetch_all(pool)
1080            .await
1081            .map_err(|e| PayError::InternalError(format!("pg load spend rules: {e}")))?;
1082    rows.into_iter()
1083        .map(|(v,)| {
1084            serde_json::from_value(v)
1085                .map_err(|e| PayError::InternalError(format!("pg parse spend rule: {e}")))
1086        })
1087        .collect()
1088}
1089
1090#[cfg(feature = "postgres")]
1091async fn pg_load_rules_tx(
1092    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1093) -> Result<Vec<SpendLimit>, PayError> {
1094    let rows: Vec<(serde_json::Value,)> =
1095        sqlx::query_as("SELECT rule FROM spend_rules ORDER BY rule_id")
1096            .fetch_all(&mut **tx)
1097            .await
1098            .map_err(|e| PayError::InternalError(format!("pg load spend rules: {e}")))?;
1099    rows.into_iter()
1100        .map(|(v,)| {
1101            serde_json::from_value(v)
1102                .map_err(|e| PayError::InternalError(format!("pg parse spend rule: {e}")))
1103        })
1104        .collect()
1105}
1106
1107#[cfg(feature = "postgres")]
1108async fn pg_load_reservations(pool: &sqlx::PgPool) -> Result<Vec<SpendReservation>, PayError> {
1109    let rows: Vec<(serde_json::Value,)> =
1110        sqlx::query_as("SELECT reservation FROM spend_reservations ORDER BY reservation_id")
1111            .fetch_all(pool)
1112            .await
1113            .map_err(|e| PayError::InternalError(format!("pg load reservations: {e}")))?;
1114    rows.into_iter()
1115        .map(|(v,)| {
1116            serde_json::from_value(v)
1117                .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))
1118        })
1119        .collect()
1120}
1121
1122#[cfg(feature = "postgres")]
1123async fn pg_load_reservations_tx(
1124    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1125) -> Result<Vec<SpendReservation>, PayError> {
1126    let rows: Vec<(serde_json::Value,)> =
1127        sqlx::query_as("SELECT reservation FROM spend_reservations ORDER BY reservation_id")
1128            .fetch_all(&mut **tx)
1129            .await
1130            .map_err(|e| PayError::InternalError(format!("pg load reservations: {e}")))?;
1131    rows.into_iter()
1132        .map(|(v,)| {
1133            serde_json::from_value(v)
1134                .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))
1135        })
1136        .collect()
1137}
1138
1139#[cfg(feature = "postgres")]
1140async fn pg_expire_pending(
1141    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1142    now_ms: u64,
1143) -> Result<(), PayError> {
1144    // Load pending reservations and expire those past their deadline
1145    let rows: Vec<(i64, serde_json::Value)> =
1146        sqlx::query_as("SELECT reservation_id, reservation FROM spend_reservations")
1147            .fetch_all(&mut **tx)
1148            .await
1149            .map_err(|e| {
1150                PayError::InternalError(format!("pg load reservations for expire: {e}"))
1151            })?;
1152
1153    for (rid, res_json) in rows {
1154        let mut reservation: SpendReservation = serde_json::from_value(res_json)
1155            .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))?;
1156        if matches!(reservation.status, ReservationStatus::Pending)
1157            && reservation.expires_at_epoch_ms <= now_ms
1158        {
1159            reservation.status = ReservationStatus::Expired;
1160            reservation.finalized_at_epoch_ms = Some(now_ms);
1161            let updated = serde_json::to_value(&reservation)
1162                .map_err(|e| PayError::InternalError(format!("serialize reservation: {e}")))?;
1163            sqlx::query("UPDATE spend_reservations SET reservation = $1 WHERE reservation_id = $2")
1164                .bind(&updated)
1165                .bind(rid)
1166                .execute(&mut **tx)
1167                .await
1168                .map_err(|e| PayError::InternalError(format!("pg expire reservation: {e}")))?;
1169        }
1170    }
1171    Ok(())
1172}
1173
1174// ═══════════════════════════════════════════
1175// Exchange rate (shared, delegates to backend for caching)
1176// ═══════════════════════════════════════════
1177
1178impl SpendLedger {
1179    async fn amount_to_usd_cents(
1180        &self,
1181        network: &str,
1182        token: Option<&str>,
1183        amount_native: u64,
1184    ) -> Result<u64, PayError> {
1185        let (symbol, divisor) = token_asset(network, token).ok_or_else(|| {
1186            PayError::InvalidAmount(format!(
1187                "network '{network}' token '{token:?}' is unsupported for global-usd-cents limits"
1188            ))
1189        })?;
1190
1191        let quote = if symbol == "USD" {
1192            let now = now_epoch_ms();
1193            ExchangeRateQuote {
1194                pair: "USD/USD".to_string(),
1195                source: "identity".to_string(),
1196                price: 1.0,
1197                fetched_at_epoch_ms: now,
1198                expires_at_epoch_ms: now.saturating_add(86_400_000),
1199            }
1200        } else {
1201            self.get_or_fetch_quote(symbol, "USD").await?
1202        };
1203
1204        // Block if the quote has fully expired (fetch must have failed silently
1205        // in a prior call, or the clock jumped).
1206        let now = now_epoch_ms();
1207        if quote.expires_at_epoch_ms > 0 && now > quote.expires_at_epoch_ms {
1208            return Err(PayError::NetworkError(
1209                "exchange-rate quote expired — cannot convert to USD; check exchange_rate sources"
1210                    .to_string(),
1211            ));
1212        }
1213
1214        // Flag if cached quote age exceeds 80% of its TTL (set on every occurrence
1215        // so callers can surface the warning per-request).
1216        let ttl_ms = quote
1217            .expires_at_epoch_ms
1218            .saturating_sub(quote.fetched_at_epoch_ms);
1219        let age_ms = now.saturating_sub(quote.fetched_at_epoch_ms);
1220        if ttl_ms > 0 && age_ms > ttl_ms * 4 / 5 {
1221            self.fx_stale_warned
1222                .store(true, std::sync::atomic::Ordering::Relaxed);
1223        }
1224
1225        let usd = (amount_native as f64 / divisor) * quote.price;
1226        if !usd.is_finite() || usd < 0f64 {
1227            return Err(PayError::InternalError(
1228                "invalid exchange-rate conversion result".to_string(),
1229            ));
1230        }
1231        Ok((usd * 100f64).round() as u64)
1232    }
1233
1234    async fn get_or_fetch_quote(
1235        &self,
1236        base: &str,
1237        quote: &str,
1238    ) -> Result<ExchangeRateQuote, PayError> {
1239        let pair = format!(
1240            "{}/{}",
1241            base.to_ascii_uppercase(),
1242            quote.to_ascii_uppercase()
1243        );
1244        let now = now_epoch_ms();
1245
1246        // Try cache — redb
1247        #[cfg(feature = "redb")]
1248        if let SpendBackend::Redb { .. } = &self.backend {
1249            let fx_db = self.open_exchange_rate_db()?;
1250            let read_txn = fx_db
1251                .begin_read()
1252                .map_err(|e| PayError::InternalError(format!("fx begin_read: {e}")))?;
1253            if let Ok(table) = read_txn.open_table(FX_QUOTE_BY_PAIR) {
1254                if let Some(entry) = table
1255                    .get(pair.as_str())
1256                    .map_err(|e| PayError::InternalError(format!("fx read quote: {e}")))?
1257                {
1258                    let cached: ExchangeRateQuote = decode(entry.value())?;
1259                    if cached.expires_at_epoch_ms > now {
1260                        return Ok(cached);
1261                    }
1262                }
1263            }
1264        }
1265
1266        // Try cache — postgres
1267        #[cfg(feature = "postgres")]
1268        if let SpendBackend::Postgres { pool } = &self.backend {
1269            let row: Option<(serde_json::Value,)> =
1270                sqlx::query_as("SELECT quote FROM exchange_rate_cache WHERE pair = $1")
1271                    .bind(&pair)
1272                    .fetch_optional(pool)
1273                    .await
1274                    .map_err(|e| PayError::InternalError(format!("pg fx read cache: {e}")))?;
1275            if let Some((quote_json,)) = row {
1276                let cached: ExchangeRateQuote = serde_json::from_value(quote_json)
1277                    .map_err(|e| PayError::InternalError(format!("pg fx parse cache: {e}")))?;
1278                if cached.expires_at_epoch_ms > now {
1279                    return Ok(cached);
1280                }
1281            }
1282        }
1283
1284        let (fetched_price, source_name) = self.fetch_exchange_rate_http(base, quote).await?;
1285        let ttl_s = self
1286            .exchange_rate
1287            .as_ref()
1288            .map(|cfg| cfg.ttl_s)
1289            .unwrap_or(300)
1290            .max(1);
1291        let new_quote = ExchangeRateQuote {
1292            pair: pair.clone(),
1293            source: source_name,
1294            price: fetched_price,
1295            fetched_at_epoch_ms: now,
1296            expires_at_epoch_ms: now.saturating_add(ttl_s.saturating_mul(1000)),
1297        };
1298
1299        // Write cache — redb
1300        #[cfg(feature = "redb")]
1301        if let SpendBackend::Redb { .. } = &self.backend {
1302            let fx_db = self.open_exchange_rate_db()?;
1303            let write_txn = fx_db
1304                .begin_write()
1305                .map_err(|e| PayError::InternalError(format!("fx begin_write: {e}")))?;
1306            let mut encoded_blobs: Vec<String> = Vec::new();
1307            {
1308                let mut table = write_txn
1309                    .open_table(FX_QUOTE_BY_PAIR)
1310                    .map_err(|e| PayError::InternalError(format!("fx open quote table: {e}")))?;
1311                encoded_blobs.push(encode(&new_quote)?);
1312                let encoded = encoded_blobs
1313                    .last()
1314                    .ok_or_else(|| PayError::InternalError("missing quote blob".to_string()))?;
1315                table
1316                    .insert(pair.as_str(), encoded.as_str())
1317                    .map_err(|e| PayError::InternalError(format!("fx insert quote: {e}")))?;
1318            }
1319            write_txn
1320                .commit()
1321                .map_err(|e| PayError::InternalError(format!("fx commit write: {e}")))?;
1322        }
1323
1324        // Write cache — postgres
1325        #[cfg(feature = "postgres")]
1326        if let SpendBackend::Postgres { pool } = &self.backend {
1327            let quote_json = serde_json::to_value(&new_quote)
1328                .map_err(|e| PayError::InternalError(format!("serialize fx quote: {e}")))?;
1329            let _ = sqlx::query(
1330                "INSERT INTO exchange_rate_cache (pair, quote) VALUES ($1, $2) \
1331                 ON CONFLICT (pair) DO UPDATE SET quote = $2",
1332            )
1333            .bind(&pair)
1334            .bind(&quote_json)
1335            .execute(pool)
1336            .await;
1337        }
1338
1339        Ok(new_quote)
1340    }
1341
1342    #[cfg(feature = "exchange-rate")]
1343    async fn fetch_exchange_rate_http(
1344        &self,
1345        base: &str,
1346        quote_currency: &str,
1347    ) -> Result<(f64, String), PayError> {
1348        let cfg = self.exchange_rate.as_ref().cloned().unwrap_or_default();
1349
1350        if cfg.sources.is_empty() {
1351            return Err(PayError::InvalidAmount(
1352                "exchange_rate.sources is empty — no exchange-rate API configured".to_string(),
1353            ));
1354        }
1355
1356        let client = reqwest::Client::new();
1357        let mut last_err = String::new();
1358
1359        for source in &cfg.sources {
1360            match fetch_from_source(&client, source, base, quote_currency).await {
1361                Ok(price) => return Ok((price, source.endpoint.clone())),
1362                Err(e) => {
1363                    last_err =
1364                        format!("{} ({}): {e}", source.endpoint, source.source_type.as_str());
1365                }
1366            }
1367        }
1368
1369        Err(PayError::NetworkError(format!(
1370            "all exchange-rate sources failed; last: {last_err}"
1371        )))
1372    }
1373
1374    #[cfg(not(feature = "exchange-rate"))]
1375    async fn fetch_exchange_rate_http(
1376        &self,
1377        _base: &str,
1378        _quote_currency: &str,
1379    ) -> Result<(f64, String), PayError> {
1380        Err(PayError::NotImplemented(
1381            "exchange-rate HTTP support is not built in this feature set".to_string(),
1382        ))
1383    }
1384}
1385
1386// ═══════════════════════════════════════════
1387// Helpers
1388// ═══════════════════════════════════════════
1389
1390fn now_epoch_ms() -> u64 {
1391    std::time::SystemTime::now()
1392        .duration_since(std::time::UNIX_EPOCH)
1393        .map(|d| d.as_millis() as u64)
1394        .unwrap_or(0)
1395}
1396
1397fn spend_request_hash(op_id: &str, ctx: &SpendContext) -> String {
1398    let mut hasher = DefaultHasher::new();
1399    op_id.hash(&mut hasher);
1400    ctx.hash(&mut hasher);
1401    format!("{:016x}", hasher.finish())
1402}
1403
1404fn reservation_status_label(status: &ReservationStatus) -> &'static str {
1405    match status {
1406        ReservationStatus::Pending => "pending",
1407        ReservationStatus::Confirmed => "confirmed",
1408        ReservationStatus::Cancelled => "cancelled",
1409        ReservationStatus::Expired => "expired",
1410    }
1411}
1412
1413fn duplicate_reservation_error(
1414    op_id: &str,
1415    reservation_id: u64,
1416    status: &ReservationStatus,
1417) -> PayError {
1418    PayError::InvalidAmount(format!(
1419        "duplicate spend operation id '{op_id}' already has reservation {reservation_id} ({status}); refusing to re-execute payment",
1420        status = reservation_status_label(status)
1421    ))
1422}
1423
1424fn normalize_limit(rule: &mut SpendLimit) {
1425    rule.network = rule
1426        .network
1427        .as_deref()
1428        .map(str::trim)
1429        .filter(|value| !value.is_empty())
1430        .map(|value| value.to_ascii_lowercase());
1431    rule.wallet = rule
1432        .wallet
1433        .as_deref()
1434        .map(str::trim)
1435        .filter(|value| !value.is_empty())
1436        .map(str::to_string);
1437    rule.token = rule
1438        .token
1439        .as_deref()
1440        .map(str::trim)
1441        .filter(|value| !value.is_empty())
1442        .map(|value| canonical_spend_token(rule.network.as_deref().unwrap_or(""), value));
1443
1444    if matches!(rule.scope, SpendScope::Network | SpendScope::Wallet)
1445        && matches!(rule.network.as_deref(), Some("sol" | "evm"))
1446        && rule.token.is_none()
1447    {
1448        rule.token = Some("native".to_string());
1449    }
1450}
1451
1452fn canonical_spend_token(network: &str, token: &str) -> String {
1453    let token = token.trim().to_ascii_lowercase();
1454    match (network, token.as_str()) {
1455        ("sol", "sol" | "native" | "lamports") => "native".to_string(),
1456        ("evm", "eth" | "native" | "wei") => "native".to_string(),
1457        _ => token,
1458    }
1459}
1460
1461fn token_asset(network: &str, token: Option<&str>) -> Option<(&'static str, f64)> {
1462    let network = network.to_ascii_lowercase();
1463    match token.map(|t| canonical_spend_token(&network, t)).as_deref() {
1464        Some("native") => {
1465            if network == "sol" {
1466                Some(("SOL", 1e9))
1467            } else if network == "evm" {
1468                Some(("ETH", 1e18))
1469            } else if network.starts_with("ln") || network == "cashu" || network == "btc" {
1470                Some(("BTC", 1e8))
1471            } else {
1472                None
1473            }
1474        }
1475        Some("btc" | "sat" | "sats") => Some(("BTC", 1e8)),
1476        Some("sol") => Some(("SOL", 1e9)),
1477        Some("eth") => Some(("ETH", 1e18)),
1478        Some("usdc" | "usdt") => Some(("USD", 1e6)),
1479        Some(_) => None,
1480        None => {
1481            if network.starts_with("ln") || network == "cashu" || network == "btc" {
1482                Some(("BTC", 1e8))
1483            } else {
1484                None
1485            }
1486        }
1487    }
1488}
1489
1490#[cfg(feature = "exchange-rate")]
1491fn extract_price_generic(value: &serde_json::Value) -> Option<f64> {
1492    value
1493        .get("price")
1494        .and_then(|v| v.as_f64())
1495        .or_else(|| value.get("rate").and_then(|v| v.as_f64()))
1496        .or_else(|| value.get("usd_per_base").and_then(|v| v.as_f64()))
1497        .or_else(|| {
1498            value
1499                .get("data")
1500                .and_then(|d| d.get("price"))
1501                .and_then(|v| v.as_f64())
1502        })
1503}
1504
1505#[cfg(feature = "exchange-rate")]
1506impl ExchangeRateSourceType {
1507    fn as_str(self) -> &'static str {
1508        match self {
1509            Self::Generic => "generic",
1510            Self::CoinGecko => "coingecko",
1511            Self::Kraken => "kraken",
1512        }
1513    }
1514}
1515
1516#[cfg(feature = "exchange-rate")]
1517fn coingecko_coin_id(symbol: &str) -> Option<&'static str> {
1518    match symbol.to_ascii_uppercase().as_str() {
1519        "BTC" => Some("bitcoin"),
1520        "SOL" => Some("solana"),
1521        "ETH" => Some("ethereum"),
1522        _ => None,
1523    }
1524}
1525
1526#[cfg(feature = "exchange-rate")]
1527fn kraken_pair(symbol: &str) -> Option<&'static str> {
1528    match symbol.to_ascii_uppercase().as_str() {
1529        "BTC" => Some("XBTUSD"),
1530        "SOL" => Some("SOLUSD"),
1531        "ETH" => Some("ETHUSD"),
1532        _ => None,
1533    }
1534}
1535
1536#[cfg(feature = "exchange-rate")]
1537async fn fetch_from_source(
1538    client: &reqwest::Client,
1539    source: &crate::types::ExchangeRateSource,
1540    base: &str,
1541    quote_currency: &str,
1542) -> Result<f64, String> {
1543    type PriceExtractor = Box<dyn Fn(&serde_json::Value) -> Option<f64> + Send>;
1544    let (url, extract_fn): (String, PriceExtractor) = match source.source_type {
1545        ExchangeRateSourceType::Kraken => {
1546            let pair = kraken_pair(base)
1547                .ok_or_else(|| format!("kraken: unsupported base asset '{base}'"))?;
1548            let url = format!("{}/0/public/Ticker?pair={pair}", source.endpoint);
1549            let pair_owned = pair.to_string();
1550            (
1551                url,
1552                Box::new(move |v: &serde_json::Value| {
1553                    let result = v.get("result")?;
1554                    let ticker = result
1555                        .get(&pair_owned)
1556                        .or_else(|| result.as_object().and_then(|m| m.values().next()))?;
1557                    let price_str = ticker.get("c")?.as_array()?.first()?.as_str()?;
1558                    price_str.parse::<f64>().ok()
1559                }),
1560            )
1561        }
1562        ExchangeRateSourceType::CoinGecko => {
1563            let coin_id = coingecko_coin_id(base)
1564                .ok_or_else(|| format!("coingecko: unsupported base asset '{base}'"))?;
1565            let vs = quote_currency.to_ascii_lowercase();
1566            let url = format!(
1567                "{}/simple/price?ids={coin_id}&vs_currencies={vs}",
1568                source.endpoint
1569            );
1570            let coin_id_owned = coin_id.to_string();
1571            let vs_owned = vs.clone();
1572            (
1573                url,
1574                Box::new(move |v: &serde_json::Value| {
1575                    v.get(&coin_id_owned)?.get(&vs_owned)?.as_f64()
1576                }),
1577            )
1578        }
1579        ExchangeRateSourceType::Generic => {
1580            let sep = if source.endpoint.contains('?') {
1581                '&'
1582            } else {
1583                '?'
1584            };
1585            let url = format!(
1586                "{}{sep}base={}&quote={}",
1587                source.endpoint,
1588                base.to_ascii_uppercase(),
1589                quote_currency.to_ascii_uppercase()
1590            );
1591            (url, Box::new(extract_price_generic))
1592        }
1593    };
1594
1595    let mut req = client.get(&url);
1596    if let Some(key) = &source.api_key_secret {
1597        req = req.header("Authorization", format!("Bearer {key}"));
1598        req = req.header("X-Api-Key", key);
1599    }
1600
1601    let resp = req
1602        .send()
1603        .await
1604        .map_err(|e| format!("request failed: {e}"))?;
1605    if !resp.status().is_success() {
1606        return Err(format!("status {}", resp.status()));
1607    }
1608
1609    let value: serde_json::Value = resp
1610        .json()
1611        .await
1612        .map_err(|e| format!("parse failed: {e}"))?;
1613
1614    extract_fn(&value).ok_or_else(|| "could not extract price from response".to_string())
1615}
1616
1617#[cfg(feature = "redb")]
1618fn encode<T: Serialize>(value: &T) -> Result<String, PayError> {
1619    serde_json::to_string(value)
1620        .map_err(|e| PayError::InternalError(format!("spend encode failed: {e}")))
1621}
1622
1623#[cfg(feature = "redb")]
1624fn decode<T: DeserializeOwned>(encoded: &str) -> Result<T, PayError> {
1625    serde_json::from_str(encoded).map_err(|e| {
1626        let preview_len = encoded.len().min(48);
1627        let preview = &encoded[..preview_len];
1628        PayError::InternalError(format!(
1629            "spend decode failed (len={}, preview={}): {e}",
1630            encoded.len(),
1631            preview
1632        ))
1633    })
1634}
1635
1636#[cfg(feature = "redb")]
1637fn prepend_err(prefix: &str, err: PayError) -> PayError {
1638    match err {
1639        PayError::InternalError(msg) => PayError::InternalError(format!("{prefix}: {msg}")),
1640        other => other,
1641    }
1642}
1643
1644fn generate_rule_identifier() -> Result<String, PayError> {
1645    let mut buf = [0u8; 4];
1646    getrandom::fill(&mut buf).map_err(|e| PayError::InternalError(format!("rng failed: {e}")))?;
1647    Ok(format!("r_{}", hex::encode(buf)))
1648}
1649
1650fn validate_limit(
1651    rule: &SpendLimit,
1652    exchange_rate: Option<&ExchangeRateConfig>,
1653) -> Result<(), PayError> {
1654    if rule.window_s == 0 {
1655        return Err(PayError::InvalidAmount(
1656            "limit rule has zero window_s".to_string(),
1657        ));
1658    }
1659    if rule.max_spend == 0 {
1660        return Err(PayError::InvalidAmount(
1661            "limit rule has zero max_spend".to_string(),
1662        ));
1663    }
1664
1665    match rule.scope {
1666        SpendScope::GlobalUsdCents => {
1667            if rule.network.is_some() || rule.wallet.is_some() {
1668                return Err(PayError::InvalidAmount(
1669                    "scope=global-usd-cents cannot set network/wallet".to_string(),
1670                ));
1671            }
1672            if rule.token.is_some() {
1673                return Err(PayError::InvalidAmount(
1674                    "scope=global-usd-cents cannot set token".to_string(),
1675                ));
1676            }
1677        }
1678        SpendScope::Network => {
1679            if rule.network.as_deref().unwrap_or("").trim().is_empty() {
1680                return Err(PayError::InvalidAmount(
1681                    "scope=network requires network".to_string(),
1682                ));
1683            }
1684            if rule.wallet.is_some() {
1685                return Err(PayError::InvalidAmount(
1686                    "scope=network cannot set wallet".to_string(),
1687                ));
1688            }
1689        }
1690        SpendScope::Wallet => {
1691            if rule.network.as_deref().unwrap_or("").trim().is_empty() {
1692                return Err(PayError::InvalidAmount(
1693                    "scope=wallet requires network".to_string(),
1694                ));
1695            }
1696            if rule.wallet.as_deref().unwrap_or("").trim().is_empty() {
1697                return Err(PayError::InvalidAmount(
1698                    "scope=wallet requires wallet".to_string(),
1699                ));
1700            }
1701        }
1702    }
1703
1704    if rule.scope == SpendScope::GlobalUsdCents && exchange_rate.is_none() {
1705        return Err(PayError::InvalidAmount(
1706            "scope=global-usd-cents requires config.exchange_rate".to_string(),
1707        ));
1708    }
1709    Ok(())
1710}
1711
1712#[cfg(feature = "redb")]
1713fn load_rules(read_txn: &redb::ReadTransaction) -> Result<Vec<SpendLimit>, PayError> {
1714    let Ok(rule_table) = read_txn.open_table(RULE_BY_ID) else {
1715        return Ok(vec![]);
1716    };
1717    rule_table
1718        .iter()
1719        .map_err(|e| PayError::InternalError(format!("spend iterate rules: {e}")))?
1720        .map(|entry| {
1721            let (_k, v) = entry
1722                .map_err(|e| PayError::InternalError(format!("spend read rule entry: {e}")))?;
1723            decode::<SpendLimit>(v.value()).map_err(|e| prepend_err("spend decode rule", e))
1724        })
1725        .collect()
1726}
1727
1728#[cfg(feature = "redb")]
1729fn load_reservations(read_txn: &redb::ReadTransaction) -> Result<Vec<SpendReservation>, PayError> {
1730    let Ok(table) = read_txn.open_table(RESERVATION_BY_ID) else {
1731        return Ok(vec![]);
1732    };
1733    table
1734        .iter()
1735        .map_err(|e| PayError::InternalError(format!("spend iterate reservations: {e}")))?
1736        .map(|entry| {
1737            let (_k, v) = entry
1738                .map_err(|e| PayError::InternalError(format!("spend read reservation: {e}")))?;
1739            decode::<SpendReservation>(v.value())
1740                .map_err(|e| prepend_err("spend decode reservation", e))
1741        })
1742        .collect()
1743}
1744
1745#[cfg(feature = "redb")]
1746fn expire_pending(_table: &mut redb::Table<u64, &str>, _now_ms: u64) -> Result<(), PayError> {
1747    Ok(())
1748}
1749
1750fn amount_for_rule(
1751    _rule: &SpendLimit,
1752    amount_native: u64,
1753    amount_usd_cents: Option<u64>,
1754    use_usd: bool,
1755) -> Result<u64, PayError> {
1756    if use_usd {
1757        amount_usd_cents.ok_or_else(|| {
1758            PayError::InternalError("missing USD amount for non-native unit rule".to_string())
1759        })
1760    } else {
1761        Ok(amount_native)
1762    }
1763}
1764
1765fn reservation_active_for_window(r: &SpendReservation, now_ms: u64) -> bool {
1766    match r.status {
1767        ReservationStatus::Confirmed => true,
1768        ReservationStatus::Pending => r.expires_at_epoch_ms > now_ms,
1769        ReservationStatus::Cancelled | ReservationStatus::Expired => false,
1770    }
1771}
1772
1773fn rule_matches_context(
1774    rule: &SpendLimit,
1775    network: &str,
1776    wallet: Option<&str>,
1777    token: Option<&str>,
1778) -> bool {
1779    if let Some(rule_token) = &rule.token {
1780        let normalized_rule_token = canonical_spend_token(network, rule_token);
1781        match token.map(|ctx_token| canonical_spend_token(network, ctx_token)) {
1782            Some(ctx_token) if ctx_token == normalized_rule_token => {}
1783            _ => return false,
1784        }
1785    }
1786    match rule.scope {
1787        SpendScope::GlobalUsdCents => true,
1788        SpendScope::Network => rule.network.as_deref() == Some(network),
1789        SpendScope::Wallet => {
1790            rule.network.as_deref() == Some(network) && rule.wallet.as_deref() == wallet
1791        }
1792    }
1793}
1794
1795fn scope_key(rule: &SpendLimit) -> String {
1796    match rule.scope {
1797        SpendScope::GlobalUsdCents => "global-usd-cents".to_string(),
1798        SpendScope::Network => rule.network.clone().unwrap_or_default(),
1799        SpendScope::Wallet => format!(
1800            "{}/{}",
1801            rule.network.clone().unwrap_or_default(),
1802            rule.wallet.clone().unwrap_or_default()
1803        ),
1804    }
1805}
1806
1807fn spent_in_window(
1808    rule: &SpendLimit,
1809    reservations: &[SpendReservation],
1810    now_ms: u64,
1811    use_usd: bool,
1812) -> Result<(u64, Option<u64>), PayError> {
1813    let window_ms = rule.window_s.saturating_mul(1000);
1814    let cutoff = now_ms.saturating_sub(window_ms);
1815
1816    let mut spent = 0u64;
1817    let mut oldest: Option<u64> = None;
1818
1819    for r in reservations {
1820        if !reservation_active_for_window(r, now_ms) {
1821            continue;
1822        }
1823        if r.created_at_epoch_ms < cutoff {
1824            continue;
1825        }
1826        if !rule_matches_context(rule, &r.network, r.wallet.as_deref(), r.token.as_deref()) {
1827            continue;
1828        }
1829
1830        let amount = if use_usd {
1831            r.amount_usd_cents.ok_or_else(|| {
1832                PayError::InternalError("reservation missing USD amount".to_string())
1833            })?
1834        } else {
1835            r.amount_native
1836        };
1837        spent = spent.saturating_add(amount);
1838        oldest = Some(oldest.map_or(r.created_at_epoch_ms, |v| v.min(r.created_at_epoch_ms)));
1839    }
1840
1841    Ok((spent, oldest))
1842}
1843
1844#[cfg(feature = "redb")]
1845fn next_counter(write_txn: &redb::WriteTransaction, key: &str) -> Result<u64, PayError> {
1846    let mut meta = write_txn
1847        .open_table(META_COUNTER)
1848        .map_err(|e| PayError::InternalError(format!("spend open meta table: {e}")))?;
1849    let current = match meta
1850        .get(key)
1851        .map_err(|e| PayError::InternalError(format!("spend read counter {key}: {e}")))?
1852    {
1853        Some(v) => v.value(),
1854        None => 0,
1855    };
1856    let next = current.saturating_add(1);
1857    meta.insert(key, next)
1858        .map_err(|e| PayError::InternalError(format!("spend write counter {key}: {e}")))?;
1859    Ok(next)
1860}
1861
1862#[cfg(test)]
1863#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1864mod tests {
1865    use super::*;
1866
1867    fn make_limit(scope: SpendScope, network: Option<&str>, wallet: Option<&str>) -> SpendLimit {
1868        SpendLimit {
1869            rule_id: None,
1870            scope,
1871            network: network.map(|s| s.to_string()),
1872            wallet: wallet.map(|s| s.to_string()),
1873            window_s: 3600,
1874            max_spend: 1000,
1875            token: None,
1876        }
1877    }
1878
1879    #[cfg(feature = "redb")]
1880    #[tokio::test]
1881    async fn provider_limit_reserve_and_confirm() {
1882        let tmp = tempfile::tempdir().unwrap();
1883        let ledger = SpendLedger::new(tmp.path().to_str().unwrap(), None);
1884
1885        ledger
1886            .set_limits(&[make_limit(SpendScope::Network, Some("cashu"), None)])
1887            .await
1888            .unwrap();
1889
1890        let ctx = SpendContext {
1891            network: "cashu".to_string(),
1892            wallet: Some("w_01".to_string()),
1893            amount_native: 400,
1894            token: None,
1895        };
1896        let r1 = ledger.reserve("op_1", &ctx).await.unwrap();
1897        ledger.confirm(r1).await.unwrap();
1898
1899        let r2 = ledger.reserve("op_2", &ctx).await.unwrap();
1900        let err = ledger.reserve("op_3", &ctx).await.unwrap_err();
1901        assert!(matches!(err, PayError::LimitExceeded { .. }));
1902
1903        ledger.cancel(r2).await.unwrap();
1904    }
1905
1906    #[cfg(feature = "redb")]
1907    #[tokio::test]
1908    async fn duplicate_operation_id_is_rejected_after_confirm() {
1909        let tmp = tempfile::tempdir().unwrap();
1910        let ledger = SpendLedger::new(tmp.path().to_str().unwrap(), None);
1911
1912        ledger
1913            .set_limits(&[make_limit(SpendScope::Network, Some("cashu"), None)])
1914            .await
1915            .unwrap();
1916
1917        let ctx = SpendContext {
1918            network: "cashu".to_string(),
1919            wallet: Some("w_01".to_string()),
1920            amount_native: 100,
1921            token: None,
1922        };
1923        let rid = ledger.reserve("op_duplicate", &ctx).await.unwrap();
1924        ledger.confirm(rid).await.unwrap();
1925
1926        let err = ledger.reserve("op_duplicate", &ctx).await.unwrap_err();
1927        assert!(matches!(err, PayError::InvalidAmount(_)));
1928        assert!(err.to_string().contains("refusing to re-execute"));
1929    }
1930
1931    #[cfg(feature = "redb")]
1932    #[tokio::test]
1933    async fn wallet_scope_requires_wallet_context() {
1934        let tmp = tempfile::tempdir().unwrap();
1935        let ledger = SpendLedger::new(tmp.path().to_str().unwrap(), None);
1936
1937        ledger
1938            .set_limits(&[make_limit(SpendScope::Wallet, Some("cashu"), Some("w_abc"))])
1939            .await
1940            .unwrap();
1941
1942        let ctx = SpendContext {
1943            network: "cashu".to_string(),
1944            wallet: None,
1945            amount_native: 1,
1946            token: None,
1947        };
1948        let err = ledger.reserve("op_1", &ctx).await.unwrap_err();
1949        assert!(matches!(err, PayError::InvalidAmount(_)));
1950    }
1951
1952    #[tokio::test]
1953    async fn global_usd_cents_scope_requires_exchange_rate_config() {
1954        let tmp = tempfile::tempdir().unwrap();
1955        let ledger = SpendLedger::new(tmp.path().to_str().unwrap(), None);
1956
1957        let err = ledger
1958            .set_limits(&[SpendLimit {
1959                rule_id: None,
1960                scope: SpendScope::GlobalUsdCents,
1961                network: None,
1962                wallet: None,
1963                window_s: 3600,
1964                max_spend: 100,
1965                token: None,
1966            }])
1967            .await
1968            .unwrap_err();
1969
1970        assert!(matches!(err, PayError::InvalidAmount(_)));
1971    }
1972
1973    #[cfg(feature = "redb")]
1974    #[tokio::test]
1975    async fn network_scope_native_token_ok_without_exchange_rate() {
1976        let tmp = tempfile::tempdir().unwrap();
1977        let ledger = SpendLedger::new(tmp.path().to_str().unwrap(), None);
1978
1979        ledger
1980            .set_limits(&[SpendLimit {
1981                rule_id: None,
1982                scope: SpendScope::Network,
1983                network: Some("cashu".to_string()),
1984                wallet: None,
1985                window_s: 3600,
1986                max_spend: 100,
1987                token: None,
1988            }])
1989            .await
1990            .expect("network scope should not require exchange_rate");
1991    }
1992
1993    #[test]
1994    fn native_sol_and_evm_assets_can_be_priced_for_global_usd_limits() {
1995        assert_eq!(token_asset("sol", Some("native")), Some(("SOL", 1e9)));
1996        assert_eq!(token_asset("sol", Some("lamports")), Some(("SOL", 1e9)));
1997        assert_eq!(token_asset("evm", Some("native")), Some(("ETH", 1e18)));
1998        assert_eq!(token_asset("evm", Some("wei")), Some(("ETH", 1e18)));
1999    }
2000
2001    #[test]
2002    fn sol_and_evm_limits_without_token_normalize_to_native() {
2003        let mut sol_limit = make_limit(SpendScope::Network, Some("SOL"), None);
2004        normalize_limit(&mut sol_limit);
2005        assert_eq!(sol_limit.network.as_deref(), Some("sol"));
2006        assert_eq!(sol_limit.token.as_deref(), Some("native"));
2007
2008        let mut evm_limit = make_limit(SpendScope::Wallet, Some("evm"), Some("w_evm"));
2009        normalize_limit(&mut evm_limit);
2010        assert_eq!(evm_limit.token.as_deref(), Some("native"));
2011    }
2012
2013    #[test]
2014    fn token_limit_does_not_match_native_gas_debit() {
2015        let mut sol_usdc = make_limit(SpendScope::Network, Some("sol"), None);
2016        sol_usdc.token = Some("usdc".to_string());
2017        normalize_limit(&mut sol_usdc);
2018        assert!(rule_matches_context(&sol_usdc, "sol", None, Some("usdc")));
2019        assert!(!rule_matches_context(
2020            &sol_usdc,
2021            "sol",
2022            None,
2023            Some("native")
2024        ));
2025
2026        let mut evm_usdc = make_limit(SpendScope::Network, Some("evm"), None);
2027        evm_usdc.token = Some("usdc".to_string());
2028        normalize_limit(&mut evm_usdc);
2029        assert!(rule_matches_context(&evm_usdc, "evm", None, Some("usdc")));
2030        assert!(!rule_matches_context(
2031            &evm_usdc,
2032            "evm",
2033            None,
2034            Some("native")
2035        ));
2036    }
2037}