Skip to main content

agent_first_pay/spend/
mod.rs

1#![cfg_attr(not(any(feature = "redb", feature = "postgres")), allow(dead_code))]
2
3pub mod tokens;
4
5use crate::provider::PayError;
6#[cfg(feature = "exchange-rate")]
7use crate::types::ExchangeRateSourceType;
8use crate::types::{ExchangeRateConfig, SpendLimit, SpendLimitStatus, SpendScope};
9#[cfg(feature = "redb")]
10use serde::de::DeserializeOwned;
11use serde::{Deserialize, Serialize};
12use tokio::sync::Mutex;
13
14#[cfg(feature = "redb")]
15use crate::store::db;
16#[cfg(feature = "redb")]
17use redb::{Database, ReadableDatabase, ReadableTable, TableDefinition};
18#[cfg(feature = "redb")]
19use std::path::{Path, PathBuf};
20
21#[cfg(feature = "redb")]
22const META_COUNTER: TableDefinition<&str, u64> = TableDefinition::new("meta_counter");
23#[cfg(feature = "redb")]
24const RULE_BY_ID: TableDefinition<&str, &str> = TableDefinition::new("rule_by_id_v3");
25#[cfg(feature = "redb")]
26const RESERVATION_BY_ID: TableDefinition<u64, &str> = TableDefinition::new("reservation_by_id");
27#[cfg(feature = "redb")]
28const RESERVATION_ID_BY_OP_ID: TableDefinition<&str, u64> =
29    TableDefinition::new("reservation_id_by_op_id");
30#[cfg(feature = "redb")]
31const SPEND_EVENT_BY_ID: TableDefinition<u64, &str> = TableDefinition::new("spend_event_by_id");
32#[cfg(feature = "redb")]
33const FX_QUOTE_BY_PAIR: TableDefinition<&str, &str> = TableDefinition::new("quote_by_pair");
34#[cfg(feature = "redb")]
35const NEXT_RESERVATION_ID_KEY: &str = "next_reservation_id";
36#[cfg(feature = "redb")]
37const NEXT_EVENT_ID_KEY: &str = "next_event_id";
38#[cfg(feature = "redb")]
39const SPEND_VERSION: u64 = 1;
40#[cfg(feature = "redb")]
41const FX_CACHE_VERSION: u64 = 1;
42
43#[derive(Debug, Clone)]
44pub struct SpendContext {
45    pub network: String,
46    pub wallet: Option<String>,
47    pub amount_native: u64,
48    pub token: Option<String>,
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
52enum ReservationStatus {
53    Pending,
54    Confirmed,
55    Cancelled,
56    Expired,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
60struct SpendReservation {
61    reservation_id: u64,
62    op_id: String,
63    network: String,
64    wallet: Option<String>,
65    #[serde(default)]
66    token: Option<String>,
67    amount_native: u64,
68    amount_usd_cents: Option<u64>,
69    status: ReservationStatus,
70    created_at_epoch_ms: u64,
71    expires_at_epoch_ms: u64,
72    finalized_at_epoch_ms: Option<u64>,
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
76struct SpendEvent {
77    event_id: u64,
78    reservation_id: u64,
79    op_id: String,
80    network: String,
81    wallet: Option<String>,
82    #[serde(default)]
83    token: Option<String>,
84    amount_native: u64,
85    amount_usd_cents: Option<u64>,
86    created_at_epoch_ms: u64,
87    confirmed_at_epoch_ms: u64,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91struct ExchangeRateQuote {
92    pair: String,
93    source: String,
94    price: f64,
95    fetched_at_epoch_ms: u64,
96    expires_at_epoch_ms: u64,
97}
98
99// ═══════════════════════════════════════════
100// SpendBackend
101// ═══════════════════════════════════════════
102
103#[allow(dead_code)] // None variant used when neither redb nor postgres features are enabled
104enum SpendBackend {
105    #[cfg(feature = "redb")]
106    Redb {
107        data_dir: String,
108    },
109    #[cfg(feature = "postgres")]
110    Postgres {
111        pool: sqlx::PgPool,
112    },
113    None,
114}
115
116// ═══════════════════════════════════════════
117// SpendLedger
118// ═══════════════════════════════════════════
119
120pub struct SpendLedger {
121    backend: SpendBackend,
122    exchange_rate: Option<ExchangeRateConfig>,
123    mu: Mutex<()>,
124    /// Set to true when a cached FX quote's age exceeds 80% of its TTL.
125    fx_stale_warned: std::sync::atomic::AtomicBool,
126}
127
128impl SpendLedger {
129    pub fn new(data_dir: &str, exchange_rate: Option<ExchangeRateConfig>) -> Self {
130        #[cfg(feature = "redb")]
131        let backend = SpendBackend::Redb {
132            data_dir: data_dir.to_string(),
133        };
134        #[cfg(not(feature = "redb"))]
135        let backend = {
136            let _ = data_dir;
137            SpendBackend::None
138        };
139        Self {
140            backend,
141            exchange_rate,
142            mu: Mutex::new(()),
143            fx_stale_warned: std::sync::atomic::AtomicBool::new(false),
144        }
145    }
146
147    #[cfg(feature = "postgres")]
148    pub fn new_postgres(pool: sqlx::PgPool, exchange_rate: Option<ExchangeRateConfig>) -> Self {
149        Self {
150            backend: SpendBackend::Postgres { pool },
151            exchange_rate,
152            mu: Mutex::new(()),
153            fx_stale_warned: std::sync::atomic::AtomicBool::new(false),
154        }
155    }
156
157    /// Returns true (once) if a stale FX quote was used since last check.
158    pub fn take_fx_stale_warning(&self) -> bool {
159        self.fx_stale_warned
160            .swap(false, std::sync::atomic::Ordering::Relaxed)
161    }
162
163    /// Add a single spend limit rule. Generates and assigns a rule_id, returns it.
164    pub async fn add_limit(&self, limit: &mut SpendLimit) -> Result<String, PayError> {
165        validate_limit(limit, self.exchange_rate.as_ref())?;
166
167        let _guard = self.mu.lock().await;
168
169        match &self.backend {
170            #[cfg(feature = "redb")]
171            SpendBackend::Redb { .. } => self.add_limit_redb(limit),
172            #[cfg(feature = "postgres")]
173            SpendBackend::Postgres { .. } => self.add_limit_postgres(limit).await,
174            SpendBackend::None => Err(PayError::NotImplemented(
175                "no storage backend for spend limits".to_string(),
176            )),
177        }
178    }
179
180    /// Remove a spend limit rule by its rule_id.
181    pub async fn remove_limit(&self, _rule_id: &str) -> Result<(), PayError> {
182        let _guard = self.mu.lock().await;
183
184        match &self.backend {
185            #[cfg(feature = "redb")]
186            SpendBackend::Redb { .. } => self.remove_limit_redb(_rule_id),
187            #[cfg(feature = "postgres")]
188            SpendBackend::Postgres { .. } => self.remove_limit_postgres(_rule_id).await,
189            SpendBackend::None => Err(PayError::NotImplemented(
190                "no storage backend for spend limits".to_string(),
191            )),
192        }
193    }
194
195    /// Replace all spend limits (used by config patch / pipe mode).
196    pub async fn set_limits(&self, limits: &[SpendLimit]) -> Result<(), PayError> {
197        for limit in limits {
198            validate_limit(limit, self.exchange_rate.as_ref())?;
199        }
200
201        let _guard = self.mu.lock().await;
202
203        match &self.backend {
204            #[cfg(feature = "redb")]
205            SpendBackend::Redb { .. } => self.set_limits_redb(limits),
206            #[cfg(feature = "postgres")]
207            SpendBackend::Postgres { .. } => self.set_limits_postgres(limits).await,
208            SpendBackend::None => Err(PayError::NotImplemented(
209                "no storage backend for spend limits".to_string(),
210            )),
211        }
212    }
213
214    /// Compute current status for all limits.
215    pub async fn get_status(&self) -> Result<Vec<SpendLimitStatus>, PayError> {
216        let _guard = self.mu.lock().await;
217
218        match &self.backend {
219            #[cfg(feature = "redb")]
220            SpendBackend::Redb { .. } => self.get_status_redb(),
221            #[cfg(feature = "postgres")]
222            SpendBackend::Postgres { .. } => self.get_status_postgres().await,
223            SpendBackend::None => Ok(Vec::new()),
224        }
225    }
226
227    /// Reserve spend against all matching limits, returns reservation id.
228    pub async fn reserve(&self, op_id: &str, ctx: &SpendContext) -> Result<u64, PayError> {
229        if op_id.trim().is_empty() {
230            return Err(PayError::InvalidAmount("op_id cannot be empty".to_string()));
231        }
232        if ctx.network.trim().is_empty() {
233            return Err(PayError::InvalidAmount(
234                "network cannot be empty for spend check".to_string(),
235            ));
236        }
237
238        let _guard = self.mu.lock().await;
239
240        match &self.backend {
241            #[cfg(feature = "redb")]
242            SpendBackend::Redb { .. } => self.reserve_redb(op_id, ctx).await,
243            #[cfg(feature = "postgres")]
244            SpendBackend::Postgres { .. } => self.reserve_postgres(op_id, ctx).await,
245            SpendBackend::None => Err(PayError::NotImplemented(
246                "no storage backend for spend limits".to_string(),
247            )),
248        }
249    }
250
251    pub async fn confirm(&self, _reservation_id: u64) -> Result<(), PayError> {
252        let _guard = self.mu.lock().await;
253
254        match &self.backend {
255            #[cfg(feature = "redb")]
256            SpendBackend::Redb { .. } => self.confirm_redb(_reservation_id),
257            #[cfg(feature = "postgres")]
258            SpendBackend::Postgres { .. } => self.confirm_postgres(_reservation_id).await,
259            SpendBackend::None => Err(PayError::NotImplemented(
260                "no storage backend for spend limits".to_string(),
261            )),
262        }
263    }
264
265    pub async fn cancel(&self, _reservation_id: u64) -> Result<(), PayError> {
266        let _guard = self.mu.lock().await;
267
268        match &self.backend {
269            #[cfg(feature = "redb")]
270            SpendBackend::Redb { .. } => self.cancel_redb(_reservation_id),
271            #[cfg(feature = "postgres")]
272            SpendBackend::Postgres { .. } => self.cancel_postgres(_reservation_id).await,
273            SpendBackend::None => Ok(()),
274        }
275    }
276}
277
278// ═══════════════════════════════════════════
279// Redb backend implementation
280// ═══════════════════════════════════════════
281
282#[cfg(feature = "redb")]
283impl SpendLedger {
284    fn spend_db_path(&self) -> PathBuf {
285        match &self.backend {
286            SpendBackend::Redb { data_dir } => Path::new(data_dir).join("spend").join("spend.redb"),
287            #[allow(unreachable_patterns)]
288            _ => PathBuf::new(),
289        }
290    }
291
292    fn exchange_rate_db_path(&self) -> PathBuf {
293        match &self.backend {
294            SpendBackend::Redb { data_dir } => Path::new(data_dir)
295                .join("spend")
296                .join("exchange-rate-cache.redb"),
297            #[allow(unreachable_patterns)]
298            _ => PathBuf::new(),
299        }
300    }
301
302    fn open_spend_db(&self) -> Result<Database, PayError> {
303        db::open_and_migrate(
304            &self.spend_db_path(),
305            SPEND_VERSION,
306            &[
307                // v0 → v1: no data migration, just stamp version
308                &|_db: &Database| Ok(()),
309            ],
310        )
311    }
312
313    fn open_exchange_rate_db(&self) -> Result<Database, PayError> {
314        db::open_and_migrate(
315            &self.exchange_rate_db_path(),
316            FX_CACHE_VERSION,
317            &[
318                // v0 → v1: no data migration, just stamp version
319                &|_db: &Database| Ok(()),
320            ],
321        )
322    }
323
324    fn add_limit_redb(&self, limit: &mut SpendLimit) -> Result<String, PayError> {
325        let db = self.open_spend_db()?;
326        let rule_id = generate_rule_identifier()?;
327        limit.rule_id = Some(rule_id.clone());
328        let encoded = encode(limit)?;
329        let write_txn = db
330            .begin_write()
331            .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
332        {
333            let mut rule_table = write_txn
334                .open_table(RULE_BY_ID)
335                .map_err(|e| PayError::InternalError(format!("spend open rule table: {e}")))?;
336            rule_table
337                .insert(rule_id.as_str(), encoded.as_str())
338                .map_err(|e| PayError::InternalError(format!("spend insert rule: {e}")))?;
339        }
340        write_txn
341            .commit()
342            .map_err(|e| PayError::InternalError(format!("spend commit add_limit: {e}")))?;
343        Ok(rule_id)
344    }
345
346    fn remove_limit_redb(&self, rule_id: &str) -> Result<(), PayError> {
347        let db = self.open_spend_db()?;
348        let write_txn = db
349            .begin_write()
350            .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
351        {
352            let mut rule_table = write_txn
353                .open_table(RULE_BY_ID)
354                .map_err(|e| PayError::InternalError(format!("spend open rule table: {e}")))?;
355            let existed = rule_table
356                .remove(rule_id)
357                .map_err(|e| PayError::InternalError(format!("spend remove rule: {e}")))?;
358            if existed.is_none() {
359                return Err(PayError::InvalidAmount(format!(
360                    "rule_id '{rule_id}' not found"
361                )));
362            }
363        }
364        write_txn
365            .commit()
366            .map_err(|e| PayError::InternalError(format!("spend commit remove_limit: {e}")))
367    }
368
369    fn set_limits_redb(&self, limits: &[SpendLimit]) -> Result<(), PayError> {
370        let db = self.open_spend_db()?;
371        let write_txn = db
372            .begin_write()
373            .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
374        {
375            let mut rule_table = write_txn
376                .open_table(RULE_BY_ID)
377                .map_err(|e| PayError::InternalError(format!("spend open rule table: {e}")))?;
378            // Clear existing rules
379            let existing_ids = rule_table
380                .iter()
381                .map_err(|e| PayError::InternalError(format!("spend iterate rules: {e}")))?
382                .map(|entry| {
383                    entry
384                        .map(|(k, _)| k.value().to_string())
385                        .map_err(|e| PayError::InternalError(format!("spend read rule key: {e}")))
386                })
387                .collect::<Result<Vec<_>, _>>()?;
388            for rid in existing_ids {
389                rule_table
390                    .remove(rid.as_str())
391                    .map_err(|e| PayError::InternalError(format!("spend remove rule: {e}")))?;
392            }
393
394            // Insert new rules with generated IDs
395            for limit in limits {
396                let mut rule = limit.clone();
397                let rid = generate_rule_identifier()?;
398                rule.rule_id = Some(rid.clone());
399                let encoded = encode(&rule)?;
400                rule_table
401                    .insert(rid.as_str(), encoded.as_str())
402                    .map_err(|e| PayError::InternalError(format!("spend insert rule: {e}")))?;
403            }
404        }
405        write_txn
406            .commit()
407            .map_err(|e| PayError::InternalError(format!("spend commit set_limits: {e}")))
408    }
409
410    fn get_status_redb(&self) -> Result<Vec<SpendLimitStatus>, PayError> {
411        let db = self.open_spend_db()?;
412        let read_txn = db
413            .begin_read()
414            .map_err(|e| PayError::InternalError(format!("spend begin_read: {e}")))?;
415        let rules = load_rules(&read_txn)?;
416        let reservations = load_reservations(&read_txn)?;
417        let now = now_epoch_ms();
418        let mut out = Vec::with_capacity(rules.len());
419        for rule in rules {
420            let use_usd = rule.scope == SpendScope::GlobalUsdCents;
421            let (spent, oldest_ts) = spent_in_window(&rule, &reservations, now, use_usd)?;
422            let remaining = rule.max_spend.saturating_sub(spent);
423            let window_ms = rule.window_s.saturating_mul(1000);
424            let window_reset_s = oldest_ts
425                .map(|oldest| (oldest.saturating_add(window_ms)).saturating_sub(now) / 1000)
426                .unwrap_or(0);
427            out.push(SpendLimitStatus {
428                rule_id: rule.rule_id.clone().unwrap_or_default(),
429                scope: rule.scope,
430                network: rule.network.clone(),
431                wallet: rule.wallet.clone(),
432                window_s: rule.window_s,
433                max_spend: rule.max_spend,
434                spent,
435                remaining,
436                token: rule.token.clone(),
437                window_reset_s,
438            });
439        }
440        Ok(out)
441    }
442
443    async fn reserve_redb(&self, op_id: &str, ctx: &SpendContext) -> Result<u64, PayError> {
444        let now = now_epoch_ms();
445        let db = self.open_spend_db()?;
446
447        let read_txn = db
448            .begin_read()
449            .map_err(|e| PayError::InternalError(format!("spend begin_read: {e}")))?;
450        let rules = load_rules(&read_txn)?;
451
452        if rules.iter().any(|r| {
453            r.scope == SpendScope::Wallet
454                && r.network.as_deref() == Some(ctx.network.as_str())
455                && ctx.wallet.is_none()
456        }) {
457            return Err(PayError::InvalidAmount(
458                "wallet-scoped limits require an explicit wallet".to_string(),
459            ));
460        }
461
462        // GlobalUsdCents scope needs USD conversion
463        let needs_usd = rules.iter().any(|r| r.scope == SpendScope::GlobalUsdCents);
464        let amount_usd_cents = if needs_usd {
465            Some(
466                self.amount_to_usd_cents(&ctx.network, ctx.token.as_deref(), ctx.amount_native)
467                    .await?,
468            )
469        } else {
470            None
471        };
472
473        let write_txn = db
474            .begin_write()
475            .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
476
477        let mut encoded_blobs: Vec<String> = Vec::new();
478        let reservation_id = {
479            let mut reservation_index =
480                write_txn.open_table(RESERVATION_ID_BY_OP_ID).map_err(|e| {
481                    PayError::InternalError(format!("spend open reservation op index: {e}"))
482                })?;
483            if let Some(existing) = reservation_index
484                .get(op_id)
485                .map_err(|e| PayError::InternalError(format!("spend read op index: {e}")))?
486            {
487                let existing_id = existing.value();
488                return Ok(existing_id);
489            }
490
491            let mut reservation_table = write_txn.open_table(RESERVATION_BY_ID).map_err(|e| {
492                PayError::InternalError(format!("spend open reservation table: {e}"))
493            })?;
494
495            expire_pending(&mut reservation_table, now)?;
496
497            let reservations = reservation_table
498                .iter()
499                .map_err(|e| PayError::InternalError(format!("spend iterate reservations: {e}")))?
500                .map(|entry| {
501                    let (_k, v) = entry.map_err(|e| {
502                        PayError::InternalError(format!("spend read reservation: {e}"))
503                    })?;
504                    decode::<SpendReservation>(v.value())
505                        .map_err(|e| prepend_err("spend decode reservation", e))
506                })
507                .collect::<Result<Vec<_>, _>>()?;
508
509            for rule in rules.iter() {
510                if !rule_matches_context(
511                    rule,
512                    &ctx.network,
513                    ctx.wallet.as_deref(),
514                    ctx.token.as_deref(),
515                ) {
516                    continue;
517                }
518
519                let use_usd = rule.scope == SpendScope::GlobalUsdCents;
520                let candidate_amount =
521                    amount_for_rule(rule, ctx.amount_native, amount_usd_cents, use_usd)?;
522                let (spent, oldest_ts) = spent_in_window(rule, &reservations, now, use_usd)?;
523                if spent.saturating_add(candidate_amount) > rule.max_spend {
524                    let window_ms = rule.window_s.saturating_mul(1000);
525                    let remaining_s = oldest_ts
526                        .map(|oldest| (oldest.saturating_add(window_ms)).saturating_sub(now) / 1000)
527                        .unwrap_or(0);
528
529                    return Err(PayError::LimitExceeded {
530                        rule_id: rule.rule_id.clone().unwrap_or_default(),
531                        scope: rule.scope,
532                        scope_key: scope_key(rule),
533                        spent,
534                        max_spend: rule.max_spend,
535                        token: rule.token.clone(),
536                        remaining_s,
537                        origin: None,
538                    });
539                }
540            }
541
542            let reservation_id = next_counter(&write_txn, NEXT_RESERVATION_ID_KEY)?;
543            let reservation = SpendReservation {
544                reservation_id,
545                op_id: op_id.to_string(),
546                network: ctx.network.clone(),
547                wallet: ctx.wallet.clone(),
548                token: ctx.token.clone(),
549                amount_native: ctx.amount_native,
550                amount_usd_cents,
551                status: ReservationStatus::Pending,
552                created_at_epoch_ms: now,
553                expires_at_epoch_ms: now.saturating_add(300_000),
554                finalized_at_epoch_ms: None,
555            };
556            encoded_blobs.push(encode(&reservation)?);
557            let encoded = encoded_blobs
558                .last()
559                .ok_or_else(|| PayError::InternalError("missing reservation blob".to_string()))?;
560            reservation_table
561                .insert(reservation_id, encoded.as_str())
562                .map_err(|e| PayError::InternalError(format!("spend insert reservation: {e}")))?;
563            reservation_index
564                .insert(op_id, reservation_id)
565                .map_err(|e| PayError::InternalError(format!("spend insert op index: {e}")))?;
566            reservation_id
567        };
568
569        write_txn
570            .commit()
571            .map_err(|e| PayError::InternalError(format!("spend commit reserve: {e}")))?;
572        Ok(reservation_id)
573    }
574
575    fn confirm_redb(&self, reservation_id: u64) -> Result<(), PayError> {
576        let db = self.open_spend_db()?;
577        let now = now_epoch_ms();
578
579        let write_txn = db
580            .begin_write()
581            .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
582
583        let mut encoded_blobs: Vec<String> = Vec::new();
584        {
585            let mut reservation_table = write_txn.open_table(RESERVATION_BY_ID).map_err(|e| {
586                PayError::InternalError(format!("spend open reservation table: {e}"))
587            })?;
588            let Some(existing_bytes) = reservation_table
589                .get(reservation_id)
590                .map_err(|e| PayError::InternalError(format!("spend read reservation: {e}")))?
591                .map(|g| g.value().to_string())
592            else {
593                return Err(PayError::InternalError(format!(
594                    "reservation {reservation_id} not found"
595                )));
596            };
597
598            let mut reservation: SpendReservation = decode(&existing_bytes)?;
599            if !matches!(reservation.status, ReservationStatus::Pending) {
600                return Ok(());
601            }
602
603            reservation.status = ReservationStatus::Confirmed;
604            reservation.finalized_at_epoch_ms = Some(now);
605            encoded_blobs.push(encode(&reservation)?);
606            let encoded = encoded_blobs
607                .last()
608                .ok_or_else(|| PayError::InternalError("missing reservation blob".to_string()))?;
609            reservation_table
610                .insert(reservation_id, encoded.as_str())
611                .map_err(|e| PayError::InternalError(format!("spend update reservation: {e}")))?;
612
613            let mut events = write_txn
614                .open_table(SPEND_EVENT_BY_ID)
615                .map_err(|e| PayError::InternalError(format!("spend open event table: {e}")))?;
616            let event_id = next_counter(&write_txn, NEXT_EVENT_ID_KEY)?;
617            let event = SpendEvent {
618                event_id,
619                reservation_id,
620                op_id: reservation.op_id,
621                network: reservation.network,
622                wallet: reservation.wallet,
623                token: reservation.token,
624                amount_native: reservation.amount_native,
625                amount_usd_cents: reservation.amount_usd_cents,
626                created_at_epoch_ms: reservation.created_at_epoch_ms,
627                confirmed_at_epoch_ms: now,
628            };
629            encoded_blobs.push(encode(&event)?);
630            let encoded_event = encoded_blobs
631                .last()
632                .ok_or_else(|| PayError::InternalError("missing event blob".to_string()))?;
633            events
634                .insert(event_id, encoded_event.as_str())
635                .map_err(|e| PayError::InternalError(format!("spend insert event: {e}")))?;
636        }
637
638        write_txn
639            .commit()
640            .map_err(|e| PayError::InternalError(format!("spend commit confirm: {e}")))
641    }
642
643    fn cancel_redb(&self, reservation_id: u64) -> Result<(), PayError> {
644        let db = self.open_spend_db()?;
645        let now = now_epoch_ms();
646
647        let write_txn = db
648            .begin_write()
649            .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
650
651        let mut encoded_blobs: Vec<String> = Vec::new();
652        {
653            let mut reservation_table = write_txn.open_table(RESERVATION_BY_ID).map_err(|e| {
654                PayError::InternalError(format!("spend open reservation table: {e}"))
655            })?;
656            let existing = reservation_table
657                .get(reservation_id)
658                .map_err(|e| PayError::InternalError(format!("spend read reservation: {e}")))?;
659            let existing_bytes = existing.map(|g| g.value().to_string());
660            if let Some(existing_bytes) = existing_bytes {
661                let mut reservation: SpendReservation = decode(&existing_bytes)?;
662                if matches!(reservation.status, ReservationStatus::Pending) {
663                    reservation.status = ReservationStatus::Cancelled;
664                    reservation.finalized_at_epoch_ms = Some(now);
665                    encoded_blobs.push(encode(&reservation)?);
666                    let encoded = encoded_blobs.last().ok_or_else(|| {
667                        PayError::InternalError("missing reservation blob".to_string())
668                    })?;
669                    reservation_table
670                        .insert(reservation_id, encoded.as_str())
671                        .map_err(|e| {
672                            PayError::InternalError(format!("spend update reservation: {e}"))
673                        })?;
674                }
675            }
676        }
677
678        write_txn
679            .commit()
680            .map_err(|e| PayError::InternalError(format!("spend commit cancel: {e}")))
681    }
682}
683
684// ═══════════════════════════════════════════
685// Postgres backend implementation
686// ═══════════════════════════════════════════
687
688#[cfg(feature = "postgres")]
689impl SpendLedger {
690    fn pg_pool(&self) -> Result<&sqlx::PgPool, PayError> {
691        match &self.backend {
692            SpendBackend::Postgres { pool } => Ok(pool),
693            _ => Err(PayError::InternalError(
694                "expected postgres spend backend".to_string(),
695            )),
696        }
697    }
698
699    async fn add_limit_postgres(&self, limit: &mut SpendLimit) -> Result<String, PayError> {
700        let pool = self.pg_pool()?;
701        let rule_id = generate_rule_identifier()?;
702        limit.rule_id = Some(rule_id.clone());
703        let rule_json = serde_json::to_value(limit)
704            .map_err(|e| PayError::InternalError(format!("serialize spend rule: {e}")))?;
705
706        sqlx::query("INSERT INTO spend_rules (rule_id, rule) VALUES ($1, $2)")
707            .bind(&rule_id)
708            .bind(&rule_json)
709            .execute(pool)
710            .await
711            .map_err(|e| PayError::InternalError(format!("pg insert spend rule: {e}")))?;
712
713        Ok(rule_id)
714    }
715
716    async fn remove_limit_postgres(&self, rule_id: &str) -> Result<(), PayError> {
717        let pool = self.pg_pool()?;
718        let result = sqlx::query("DELETE FROM spend_rules WHERE rule_id = $1")
719            .bind(rule_id)
720            .execute(pool)
721            .await
722            .map_err(|e| PayError::InternalError(format!("pg delete spend rule: {e}")))?;
723
724        if result.rows_affected() == 0 {
725            return Err(PayError::InvalidAmount(format!(
726                "rule_id '{rule_id}' not found"
727            )));
728        }
729        Ok(())
730    }
731
732    async fn set_limits_postgres(&self, limits: &[SpendLimit]) -> Result<(), PayError> {
733        let pool = self.pg_pool()?;
734        let mut tx = pool
735            .begin()
736            .await
737            .map_err(|e| PayError::InternalError(format!("pg begin tx: {e}")))?;
738
739        sqlx::query("DELETE FROM spend_rules")
740            .execute(&mut *tx)
741            .await
742            .map_err(|e| PayError::InternalError(format!("pg clear spend rules: {e}")))?;
743
744        for limit in limits {
745            let mut rule = limit.clone();
746            let rid = generate_rule_identifier()?;
747            rule.rule_id = Some(rid.clone());
748            let rule_json = serde_json::to_value(&rule)
749                .map_err(|e| PayError::InternalError(format!("serialize spend rule: {e}")))?;
750            sqlx::query("INSERT INTO spend_rules (rule_id, rule) VALUES ($1, $2)")
751                .bind(&rid)
752                .bind(&rule_json)
753                .execute(&mut *tx)
754                .await
755                .map_err(|e| PayError::InternalError(format!("pg insert spend rule: {e}")))?;
756        }
757
758        tx.commit()
759            .await
760            .map_err(|e| PayError::InternalError(format!("pg commit set_limits: {e}")))
761    }
762
763    async fn get_status_postgres(&self) -> Result<Vec<SpendLimitStatus>, PayError> {
764        let pool = self.pg_pool()?;
765        let rules = pg_load_rules(pool).await?;
766        let reservations = pg_load_reservations(pool).await?;
767        let now = now_epoch_ms();
768
769        let mut out = Vec::with_capacity(rules.len());
770        for rule in rules {
771            let use_usd = rule.scope == SpendScope::GlobalUsdCents;
772            let (spent, oldest_ts) = spent_in_window(&rule, &reservations, now, use_usd)?;
773            let remaining = rule.max_spend.saturating_sub(spent);
774            let window_ms = rule.window_s.saturating_mul(1000);
775            let window_reset_s = oldest_ts
776                .map(|oldest| (oldest.saturating_add(window_ms)).saturating_sub(now) / 1000)
777                .unwrap_or(0);
778            out.push(SpendLimitStatus {
779                rule_id: rule.rule_id.clone().unwrap_or_default(),
780                scope: rule.scope,
781                network: rule.network.clone(),
782                wallet: rule.wallet.clone(),
783                window_s: rule.window_s,
784                max_spend: rule.max_spend,
785                spent,
786                remaining,
787                token: rule.token.clone(),
788                window_reset_s,
789            });
790        }
791        Ok(out)
792    }
793
794    async fn reserve_postgres(&self, op_id: &str, ctx: &SpendContext) -> Result<u64, PayError> {
795        use crate::store::postgres_store::SPEND_ADVISORY_LOCK_KEY;
796
797        let pool = self.pg_pool()?;
798        let now = now_epoch_ms();
799
800        // Pre-flight: load rules outside the transaction for USD conversion
801        let rules = pg_load_rules(pool).await?;
802        if rules.iter().any(|r| {
803            r.scope == SpendScope::Wallet
804                && r.network.as_deref() == Some(ctx.network.as_str())
805                && ctx.wallet.is_none()
806        }) {
807            return Err(PayError::InvalidAmount(
808                "wallet-scoped limits require an explicit wallet".to_string(),
809            ));
810        }
811
812        let needs_usd = rules.iter().any(|r| r.scope == SpendScope::GlobalUsdCents);
813        let amount_usd_cents = if needs_usd {
814            Some(
815                self.amount_to_usd_cents(&ctx.network, ctx.token.as_deref(), ctx.amount_native)
816                    .await?,
817            )
818        } else {
819            None
820        };
821
822        // Begin serializable transaction with advisory lock
823        let mut tx = pool
824            .begin()
825            .await
826            .map_err(|e| PayError::InternalError(format!("pg begin tx: {e}")))?;
827
828        sqlx::query("SELECT pg_advisory_xact_lock($1)")
829            .bind(SPEND_ADVISORY_LOCK_KEY)
830            .execute(&mut *tx)
831            .await
832            .map_err(|e| PayError::InternalError(format!("pg advisory lock: {e}")))?;
833
834        // Check for existing reservation with same op_id (idempotency)
835        let existing: Option<(i64,)> =
836            sqlx::query_as("SELECT reservation_id FROM spend_reservations WHERE op_id = $1")
837                .bind(op_id)
838                .fetch_optional(&mut *tx)
839                .await
840                .map_err(|e| PayError::InternalError(format!("pg check op_id: {e}")))?;
841
842        if let Some((rid,)) = existing {
843            return Ok(rid as u64);
844        }
845
846        // Expire pending reservations
847        pg_expire_pending(&mut tx, now).await?;
848
849        // Load all reservations within the lock
850        let reservations = pg_load_reservations_tx(&mut tx).await?;
851
852        // Re-load rules within the lock (could have changed)
853        let rules = pg_load_rules_tx(&mut tx).await?;
854
855        // Check limits
856        for rule in rules.iter() {
857            if !rule_matches_context(
858                rule,
859                &ctx.network,
860                ctx.wallet.as_deref(),
861                ctx.token.as_deref(),
862            ) {
863                continue;
864            }
865
866            let use_usd = rule.scope == SpendScope::GlobalUsdCents;
867            let candidate_amount =
868                amount_for_rule(rule, ctx.amount_native, amount_usd_cents, use_usd)?;
869            let (spent, oldest_ts) = spent_in_window(rule, &reservations, now, use_usd)?;
870            if spent.saturating_add(candidate_amount) > rule.max_spend {
871                let window_ms = rule.window_s.saturating_mul(1000);
872                let remaining_s = oldest_ts
873                    .map(|oldest| (oldest.saturating_add(window_ms)).saturating_sub(now) / 1000)
874                    .unwrap_or(0);
875
876                return Err(PayError::LimitExceeded {
877                    rule_id: rule.rule_id.clone().unwrap_or_default(),
878                    scope: rule.scope,
879                    scope_key: scope_key(rule),
880                    spent,
881                    max_spend: rule.max_spend,
882                    token: rule.token.clone(),
883                    remaining_s,
884                    origin: None,
885                });
886            }
887        }
888
889        // Insert reservation
890        let reservation = SpendReservation {
891            reservation_id: 0, // will be assigned by BIGSERIAL
892            op_id: op_id.to_string(),
893            network: ctx.network.clone(),
894            wallet: ctx.wallet.clone(),
895            token: ctx.token.clone(),
896            amount_native: ctx.amount_native,
897            amount_usd_cents,
898            status: ReservationStatus::Pending,
899            created_at_epoch_ms: now,
900            expires_at_epoch_ms: now.saturating_add(300_000),
901            finalized_at_epoch_ms: None,
902        };
903        let reservation_json = serde_json::to_value(&reservation)
904            .map_err(|e| PayError::InternalError(format!("serialize reservation: {e}")))?;
905
906        let row: (i64,) = sqlx::query_as(
907            "INSERT INTO spend_reservations (op_id, reservation) \
908             VALUES ($1, $2) RETURNING reservation_id",
909        )
910        .bind(op_id)
911        .bind(&reservation_json)
912        .fetch_one(&mut *tx)
913        .await
914        .map_err(|e| PayError::InternalError(format!("pg insert reservation: {e}")))?;
915
916        let reservation_id = row.0 as u64;
917
918        // Update the reservation JSON with the assigned ID
919        let mut updated_json = reservation_json;
920        updated_json["reservation_id"] = serde_json::json!(reservation_id);
921        sqlx::query("UPDATE spend_reservations SET reservation = $1 WHERE reservation_id = $2")
922            .bind(&updated_json)
923            .bind(row.0)
924            .execute(&mut *tx)
925            .await
926            .map_err(|e| PayError::InternalError(format!("pg update reservation id: {e}")))?;
927
928        tx.commit()
929            .await
930            .map_err(|e| PayError::InternalError(format!("pg commit reserve: {e}")))?;
931
932        Ok(reservation_id)
933    }
934
935    async fn confirm_postgres(&self, reservation_id: u64) -> Result<(), PayError> {
936        let pool = self.pg_pool()?;
937        let now = now_epoch_ms();
938        let rid = reservation_id as i64;
939
940        let row: Option<(serde_json::Value,)> =
941            sqlx::query_as("SELECT reservation FROM spend_reservations WHERE reservation_id = $1")
942                .bind(rid)
943                .fetch_optional(pool)
944                .await
945                .map_err(|e| PayError::InternalError(format!("pg read reservation: {e}")))?;
946
947        let Some((res_json,)) = row else {
948            return Err(PayError::InternalError(format!(
949                "reservation {reservation_id} not found"
950            )));
951        };
952
953        let mut reservation: SpendReservation = serde_json::from_value(res_json)
954            .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))?;
955
956        if !matches!(reservation.status, ReservationStatus::Pending) {
957            return Ok(());
958        }
959
960        reservation.status = ReservationStatus::Confirmed;
961        reservation.finalized_at_epoch_ms = Some(now);
962        let updated_json = serde_json::to_value(&reservation)
963            .map_err(|e| PayError::InternalError(format!("serialize reservation: {e}")))?;
964
965        let event = SpendEvent {
966            event_id: 0, // assigned by BIGSERIAL
967            reservation_id,
968            op_id: reservation.op_id,
969            network: reservation.network,
970            wallet: reservation.wallet,
971            token: reservation.token,
972            amount_native: reservation.amount_native,
973            amount_usd_cents: reservation.amount_usd_cents,
974            created_at_epoch_ms: reservation.created_at_epoch_ms,
975            confirmed_at_epoch_ms: now,
976        };
977        let event_json = serde_json::to_value(&event)
978            .map_err(|e| PayError::InternalError(format!("serialize spend event: {e}")))?;
979
980        let mut tx = pool
981            .begin()
982            .await
983            .map_err(|e| PayError::InternalError(format!("pg begin tx: {e}")))?;
984
985        sqlx::query("UPDATE spend_reservations SET reservation = $1 WHERE reservation_id = $2")
986            .bind(&updated_json)
987            .bind(rid)
988            .execute(&mut *tx)
989            .await
990            .map_err(|e| PayError::InternalError(format!("pg update reservation: {e}")))?;
991
992        sqlx::query("INSERT INTO spend_events (reservation_id, event) VALUES ($1, $2)")
993            .bind(rid)
994            .bind(&event_json)
995            .execute(&mut *tx)
996            .await
997            .map_err(|e| PayError::InternalError(format!("pg insert spend event: {e}")))?;
998
999        tx.commit()
1000            .await
1001            .map_err(|e| PayError::InternalError(format!("pg commit confirm: {e}")))
1002    }
1003
1004    async fn cancel_postgres(&self, reservation_id: u64) -> Result<(), PayError> {
1005        let pool = self.pg_pool()?;
1006        let now = now_epoch_ms();
1007        let rid = reservation_id as i64;
1008
1009        let row: Option<(serde_json::Value,)> =
1010            sqlx::query_as("SELECT reservation FROM spend_reservations WHERE reservation_id = $1")
1011                .bind(rid)
1012                .fetch_optional(pool)
1013                .await
1014                .map_err(|e| PayError::InternalError(format!("pg read reservation: {e}")))?;
1015
1016        if let Some((res_json,)) = row {
1017            let mut reservation: SpendReservation = serde_json::from_value(res_json)
1018                .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))?;
1019
1020            if matches!(reservation.status, ReservationStatus::Pending) {
1021                reservation.status = ReservationStatus::Cancelled;
1022                reservation.finalized_at_epoch_ms = Some(now);
1023                let updated_json = serde_json::to_value(&reservation)
1024                    .map_err(|e| PayError::InternalError(format!("serialize reservation: {e}")))?;
1025
1026                sqlx::query(
1027                    "UPDATE spend_reservations SET reservation = $1 WHERE reservation_id = $2",
1028                )
1029                .bind(&updated_json)
1030                .bind(rid)
1031                .execute(pool)
1032                .await
1033                .map_err(|e| PayError::InternalError(format!("pg update reservation: {e}")))?;
1034            }
1035        }
1036
1037        Ok(())
1038    }
1039}
1040
1041#[cfg(feature = "postgres")]
1042async fn pg_load_rules(pool: &sqlx::PgPool) -> Result<Vec<SpendLimit>, PayError> {
1043    let rows: Vec<(serde_json::Value,)> =
1044        sqlx::query_as("SELECT rule FROM spend_rules ORDER BY rule_id")
1045            .fetch_all(pool)
1046            .await
1047            .map_err(|e| PayError::InternalError(format!("pg load spend rules: {e}")))?;
1048    rows.into_iter()
1049        .map(|(v,)| {
1050            serde_json::from_value(v)
1051                .map_err(|e| PayError::InternalError(format!("pg parse spend rule: {e}")))
1052        })
1053        .collect()
1054}
1055
1056#[cfg(feature = "postgres")]
1057async fn pg_load_rules_tx(
1058    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1059) -> Result<Vec<SpendLimit>, PayError> {
1060    let rows: Vec<(serde_json::Value,)> =
1061        sqlx::query_as("SELECT rule FROM spend_rules ORDER BY rule_id")
1062            .fetch_all(&mut **tx)
1063            .await
1064            .map_err(|e| PayError::InternalError(format!("pg load spend rules: {e}")))?;
1065    rows.into_iter()
1066        .map(|(v,)| {
1067            serde_json::from_value(v)
1068                .map_err(|e| PayError::InternalError(format!("pg parse spend rule: {e}")))
1069        })
1070        .collect()
1071}
1072
1073#[cfg(feature = "postgres")]
1074async fn pg_load_reservations(pool: &sqlx::PgPool) -> Result<Vec<SpendReservation>, PayError> {
1075    let rows: Vec<(serde_json::Value,)> =
1076        sqlx::query_as("SELECT reservation FROM spend_reservations ORDER BY reservation_id")
1077            .fetch_all(pool)
1078            .await
1079            .map_err(|e| PayError::InternalError(format!("pg load reservations: {e}")))?;
1080    rows.into_iter()
1081        .map(|(v,)| {
1082            serde_json::from_value(v)
1083                .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))
1084        })
1085        .collect()
1086}
1087
1088#[cfg(feature = "postgres")]
1089async fn pg_load_reservations_tx(
1090    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1091) -> Result<Vec<SpendReservation>, PayError> {
1092    let rows: Vec<(serde_json::Value,)> =
1093        sqlx::query_as("SELECT reservation FROM spend_reservations ORDER BY reservation_id")
1094            .fetch_all(&mut **tx)
1095            .await
1096            .map_err(|e| PayError::InternalError(format!("pg load reservations: {e}")))?;
1097    rows.into_iter()
1098        .map(|(v,)| {
1099            serde_json::from_value(v)
1100                .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))
1101        })
1102        .collect()
1103}
1104
1105#[cfg(feature = "postgres")]
1106async fn pg_expire_pending(
1107    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1108    now_ms: u64,
1109) -> Result<(), PayError> {
1110    // Load pending reservations and expire those past their deadline
1111    let rows: Vec<(i64, serde_json::Value)> =
1112        sqlx::query_as("SELECT reservation_id, reservation FROM spend_reservations")
1113            .fetch_all(&mut **tx)
1114            .await
1115            .map_err(|e| {
1116                PayError::InternalError(format!("pg load reservations for expire: {e}"))
1117            })?;
1118
1119    for (rid, res_json) in rows {
1120        let mut reservation: SpendReservation = serde_json::from_value(res_json)
1121            .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))?;
1122        if matches!(reservation.status, ReservationStatus::Pending)
1123            && reservation.expires_at_epoch_ms <= now_ms
1124        {
1125            reservation.status = ReservationStatus::Expired;
1126            reservation.finalized_at_epoch_ms = Some(now_ms);
1127            let updated = serde_json::to_value(&reservation)
1128                .map_err(|e| PayError::InternalError(format!("serialize reservation: {e}")))?;
1129            sqlx::query("UPDATE spend_reservations SET reservation = $1 WHERE reservation_id = $2")
1130                .bind(&updated)
1131                .bind(rid)
1132                .execute(&mut **tx)
1133                .await
1134                .map_err(|e| PayError::InternalError(format!("pg expire reservation: {e}")))?;
1135        }
1136    }
1137    Ok(())
1138}
1139
1140// ═══════════════════════════════════════════
1141// Exchange rate (shared, delegates to backend for caching)
1142// ═══════════════════════════════════════════
1143
1144impl SpendLedger {
1145    async fn amount_to_usd_cents(
1146        &self,
1147        network: &str,
1148        token: Option<&str>,
1149        amount_native: u64,
1150    ) -> Result<u64, PayError> {
1151        let (symbol, divisor) = token_asset(network, token).ok_or_else(|| {
1152            PayError::InvalidAmount(format!(
1153                "network '{network}' token '{token:?}' is unsupported for global-usd-cents limits"
1154            ))
1155        })?;
1156
1157        let quote = self.get_or_fetch_quote(symbol, "USD").await?;
1158
1159        // Block if the quote has fully expired (fetch must have failed silently
1160        // in a prior call, or the clock jumped).
1161        let now = now_epoch_ms();
1162        if quote.expires_at_epoch_ms > 0 && now > quote.expires_at_epoch_ms {
1163            return Err(PayError::NetworkError(
1164                "exchange-rate quote expired — cannot convert to USD; check exchange_rate sources"
1165                    .to_string(),
1166            ));
1167        }
1168
1169        // Flag if cached quote age exceeds 80% of its TTL (set on every occurrence
1170        // so callers can surface the warning per-request).
1171        let ttl_ms = quote
1172            .expires_at_epoch_ms
1173            .saturating_sub(quote.fetched_at_epoch_ms);
1174        let age_ms = now.saturating_sub(quote.fetched_at_epoch_ms);
1175        if ttl_ms > 0 && age_ms > ttl_ms * 4 / 5 {
1176            self.fx_stale_warned
1177                .store(true, std::sync::atomic::Ordering::Relaxed);
1178        }
1179
1180        let usd = (amount_native as f64 / divisor) * quote.price;
1181        if !usd.is_finite() || usd < 0f64 {
1182            return Err(PayError::InternalError(
1183                "invalid exchange-rate conversion result".to_string(),
1184            ));
1185        }
1186        Ok((usd * 100f64).round() as u64)
1187    }
1188
1189    async fn get_or_fetch_quote(
1190        &self,
1191        base: &str,
1192        quote: &str,
1193    ) -> Result<ExchangeRateQuote, PayError> {
1194        let pair = format!(
1195            "{}/{}",
1196            base.to_ascii_uppercase(),
1197            quote.to_ascii_uppercase()
1198        );
1199        let now = now_epoch_ms();
1200
1201        // Try cache — redb
1202        #[cfg(feature = "redb")]
1203        if let SpendBackend::Redb { .. } = &self.backend {
1204            let fx_db = self.open_exchange_rate_db()?;
1205            let read_txn = fx_db
1206                .begin_read()
1207                .map_err(|e| PayError::InternalError(format!("fx begin_read: {e}")))?;
1208            if let Ok(table) = read_txn.open_table(FX_QUOTE_BY_PAIR) {
1209                if let Some(entry) = table
1210                    .get(pair.as_str())
1211                    .map_err(|e| PayError::InternalError(format!("fx read quote: {e}")))?
1212                {
1213                    let cached: ExchangeRateQuote = decode(entry.value())?;
1214                    if cached.expires_at_epoch_ms > now {
1215                        return Ok(cached);
1216                    }
1217                }
1218            }
1219        }
1220
1221        // Try cache — postgres
1222        #[cfg(feature = "postgres")]
1223        if let SpendBackend::Postgres { pool } = &self.backend {
1224            let row: Option<(serde_json::Value,)> =
1225                sqlx::query_as("SELECT quote FROM exchange_rate_cache WHERE pair = $1")
1226                    .bind(&pair)
1227                    .fetch_optional(pool)
1228                    .await
1229                    .map_err(|e| PayError::InternalError(format!("pg fx read cache: {e}")))?;
1230            if let Some((quote_json,)) = row {
1231                let cached: ExchangeRateQuote = serde_json::from_value(quote_json)
1232                    .map_err(|e| PayError::InternalError(format!("pg fx parse cache: {e}")))?;
1233                if cached.expires_at_epoch_ms > now {
1234                    return Ok(cached);
1235                }
1236            }
1237        }
1238
1239        let (fetched_price, source_name) = self.fetch_exchange_rate_http(base, quote).await?;
1240        let ttl_s = self
1241            .exchange_rate
1242            .as_ref()
1243            .map(|cfg| cfg.ttl_s)
1244            .unwrap_or(300)
1245            .max(1);
1246        let new_quote = ExchangeRateQuote {
1247            pair: pair.clone(),
1248            source: source_name,
1249            price: fetched_price,
1250            fetched_at_epoch_ms: now,
1251            expires_at_epoch_ms: now.saturating_add(ttl_s.saturating_mul(1000)),
1252        };
1253
1254        // Write cache — redb
1255        #[cfg(feature = "redb")]
1256        if let SpendBackend::Redb { .. } = &self.backend {
1257            let fx_db = self.open_exchange_rate_db()?;
1258            let write_txn = fx_db
1259                .begin_write()
1260                .map_err(|e| PayError::InternalError(format!("fx begin_write: {e}")))?;
1261            let mut encoded_blobs: Vec<String> = Vec::new();
1262            {
1263                let mut table = write_txn
1264                    .open_table(FX_QUOTE_BY_PAIR)
1265                    .map_err(|e| PayError::InternalError(format!("fx open quote table: {e}")))?;
1266                encoded_blobs.push(encode(&new_quote)?);
1267                let encoded = encoded_blobs
1268                    .last()
1269                    .ok_or_else(|| PayError::InternalError("missing quote blob".to_string()))?;
1270                table
1271                    .insert(pair.as_str(), encoded.as_str())
1272                    .map_err(|e| PayError::InternalError(format!("fx insert quote: {e}")))?;
1273            }
1274            write_txn
1275                .commit()
1276                .map_err(|e| PayError::InternalError(format!("fx commit write: {e}")))?;
1277        }
1278
1279        // Write cache — postgres
1280        #[cfg(feature = "postgres")]
1281        if let SpendBackend::Postgres { pool } = &self.backend {
1282            let quote_json = serde_json::to_value(&new_quote)
1283                .map_err(|e| PayError::InternalError(format!("serialize fx quote: {e}")))?;
1284            let _ = sqlx::query(
1285                "INSERT INTO exchange_rate_cache (pair, quote) VALUES ($1, $2) \
1286                 ON CONFLICT (pair) DO UPDATE SET quote = $2",
1287            )
1288            .bind(&pair)
1289            .bind(&quote_json)
1290            .execute(pool)
1291            .await;
1292        }
1293
1294        Ok(new_quote)
1295    }
1296
1297    #[cfg(feature = "exchange-rate")]
1298    async fn fetch_exchange_rate_http(
1299        &self,
1300        base: &str,
1301        quote_currency: &str,
1302    ) -> Result<(f64, String), PayError> {
1303        let cfg = self.exchange_rate.as_ref().cloned().unwrap_or_default();
1304
1305        if cfg.sources.is_empty() {
1306            return Err(PayError::InvalidAmount(
1307                "exchange_rate.sources is empty — no exchange-rate API configured".to_string(),
1308            ));
1309        }
1310
1311        let client = reqwest::Client::new();
1312        let mut last_err = String::new();
1313
1314        for source in &cfg.sources {
1315            match fetch_from_source(&client, source, base, quote_currency).await {
1316                Ok(price) => return Ok((price, source.endpoint.clone())),
1317                Err(e) => {
1318                    last_err =
1319                        format!("{} ({}): {e}", source.endpoint, source.source_type.as_str());
1320                }
1321            }
1322        }
1323
1324        Err(PayError::NetworkError(format!(
1325            "all exchange-rate sources failed; last: {last_err}"
1326        )))
1327    }
1328
1329    #[cfg(not(feature = "exchange-rate"))]
1330    async fn fetch_exchange_rate_http(
1331        &self,
1332        _base: &str,
1333        _quote_currency: &str,
1334    ) -> Result<(f64, String), PayError> {
1335        Err(PayError::NotImplemented(
1336            "exchange-rate HTTP support is not built in this feature set".to_string(),
1337        ))
1338    }
1339}
1340
1341// ═══════════════════════════════════════════
1342// Helpers
1343// ═══════════════════════════════════════════
1344
1345fn now_epoch_ms() -> u64 {
1346    std::time::SystemTime::now()
1347        .duration_since(std::time::UNIX_EPOCH)
1348        .map(|d| d.as_millis() as u64)
1349        .unwrap_or(0)
1350}
1351
1352fn token_asset(network: &str, token: Option<&str>) -> Option<(&'static str, f64)> {
1353    match token.map(|t| t.to_ascii_lowercase()).as_deref() {
1354        Some("sol") => Some(("SOL", 1e9)),
1355        Some("eth") => Some(("ETH", 1e18)),
1356        Some("usdc" | "usdt") => Some(("USD", 1e6)),
1357        Some(_) => None,
1358        None => {
1359            let p = network.to_ascii_lowercase();
1360            if p.starts_with("ln") || p == "cashu" || p == "btc" {
1361                Some(("BTC", 1e8))
1362            } else {
1363                None
1364            }
1365        }
1366    }
1367}
1368
1369#[cfg(feature = "exchange-rate")]
1370fn extract_price_generic(value: &serde_json::Value) -> Option<f64> {
1371    value
1372        .get("price")
1373        .and_then(|v| v.as_f64())
1374        .or_else(|| value.get("rate").and_then(|v| v.as_f64()))
1375        .or_else(|| value.get("usd_per_base").and_then(|v| v.as_f64()))
1376        .or_else(|| {
1377            value
1378                .get("data")
1379                .and_then(|d| d.get("price"))
1380                .and_then(|v| v.as_f64())
1381        })
1382}
1383
1384#[cfg(feature = "exchange-rate")]
1385impl ExchangeRateSourceType {
1386    fn as_str(self) -> &'static str {
1387        match self {
1388            Self::Generic => "generic",
1389            Self::CoinGecko => "coingecko",
1390            Self::Kraken => "kraken",
1391        }
1392    }
1393}
1394
1395#[cfg(feature = "exchange-rate")]
1396fn coingecko_coin_id(symbol: &str) -> Option<&'static str> {
1397    match symbol.to_ascii_uppercase().as_str() {
1398        "BTC" => Some("bitcoin"),
1399        "SOL" => Some("solana"),
1400        "ETH" => Some("ethereum"),
1401        _ => None,
1402    }
1403}
1404
1405#[cfg(feature = "exchange-rate")]
1406fn kraken_pair(symbol: &str) -> Option<&'static str> {
1407    match symbol.to_ascii_uppercase().as_str() {
1408        "BTC" => Some("XBTUSD"),
1409        "SOL" => Some("SOLUSD"),
1410        "ETH" => Some("ETHUSD"),
1411        _ => None,
1412    }
1413}
1414
1415#[cfg(feature = "exchange-rate")]
1416async fn fetch_from_source(
1417    client: &reqwest::Client,
1418    source: &crate::types::ExchangeRateSource,
1419    base: &str,
1420    quote_currency: &str,
1421) -> Result<f64, String> {
1422    type PriceExtractor = Box<dyn Fn(&serde_json::Value) -> Option<f64> + Send>;
1423    let (url, extract_fn): (String, PriceExtractor) = match source.source_type {
1424        ExchangeRateSourceType::Kraken => {
1425            let pair = kraken_pair(base)
1426                .ok_or_else(|| format!("kraken: unsupported base asset '{base}'"))?;
1427            let url = format!("{}/0/public/Ticker?pair={pair}", source.endpoint);
1428            let pair_owned = pair.to_string();
1429            (
1430                url,
1431                Box::new(move |v: &serde_json::Value| {
1432                    let result = v.get("result")?;
1433                    let ticker = result
1434                        .get(&pair_owned)
1435                        .or_else(|| result.as_object().and_then(|m| m.values().next()))?;
1436                    let price_str = ticker.get("c")?.as_array()?.first()?.as_str()?;
1437                    price_str.parse::<f64>().ok()
1438                }),
1439            )
1440        }
1441        ExchangeRateSourceType::CoinGecko => {
1442            let coin_id = coingecko_coin_id(base)
1443                .ok_or_else(|| format!("coingecko: unsupported base asset '{base}'"))?;
1444            let vs = quote_currency.to_ascii_lowercase();
1445            let url = format!(
1446                "{}/simple/price?ids={coin_id}&vs_currencies={vs}",
1447                source.endpoint
1448            );
1449            let coin_id_owned = coin_id.to_string();
1450            let vs_owned = vs.clone();
1451            (
1452                url,
1453                Box::new(move |v: &serde_json::Value| {
1454                    v.get(&coin_id_owned)?.get(&vs_owned)?.as_f64()
1455                }),
1456            )
1457        }
1458        ExchangeRateSourceType::Generic => {
1459            let sep = if source.endpoint.contains('?') {
1460                '&'
1461            } else {
1462                '?'
1463            };
1464            let url = format!(
1465                "{}{sep}base={}&quote={}",
1466                source.endpoint,
1467                base.to_ascii_uppercase(),
1468                quote_currency.to_ascii_uppercase()
1469            );
1470            (url, Box::new(extract_price_generic))
1471        }
1472    };
1473
1474    let mut req = client.get(&url);
1475    if let Some(key) = &source.api_key {
1476        req = req.header("Authorization", format!("Bearer {key}"));
1477        req = req.header("X-Api-Key", key);
1478    }
1479
1480    let resp = req
1481        .send()
1482        .await
1483        .map_err(|e| format!("request failed: {e}"))?;
1484    if !resp.status().is_success() {
1485        return Err(format!("status {}", resp.status()));
1486    }
1487
1488    let value: serde_json::Value = resp
1489        .json()
1490        .await
1491        .map_err(|e| format!("parse failed: {e}"))?;
1492
1493    extract_fn(&value).ok_or_else(|| "could not extract price from response".to_string())
1494}
1495
1496#[cfg(feature = "redb")]
1497fn encode<T: Serialize>(value: &T) -> Result<String, PayError> {
1498    serde_json::to_string(value)
1499        .map_err(|e| PayError::InternalError(format!("spend encode failed: {e}")))
1500}
1501
1502#[cfg(feature = "redb")]
1503fn decode<T: DeserializeOwned>(encoded: &str) -> Result<T, PayError> {
1504    serde_json::from_str(encoded).map_err(|e| {
1505        let preview_len = encoded.len().min(48);
1506        let preview = &encoded[..preview_len];
1507        PayError::InternalError(format!(
1508            "spend decode failed (len={}, preview={}): {e}",
1509            encoded.len(),
1510            preview
1511        ))
1512    })
1513}
1514
1515#[cfg(feature = "redb")]
1516fn prepend_err(prefix: &str, err: PayError) -> PayError {
1517    match err {
1518        PayError::InternalError(msg) => PayError::InternalError(format!("{prefix}: {msg}")),
1519        other => other,
1520    }
1521}
1522
1523fn generate_rule_identifier() -> Result<String, PayError> {
1524    let mut buf = [0u8; 4];
1525    getrandom::fill(&mut buf).map_err(|e| PayError::InternalError(format!("rng failed: {e}")))?;
1526    Ok(format!("r_{}", hex::encode(buf)))
1527}
1528
1529fn validate_limit(
1530    rule: &SpendLimit,
1531    exchange_rate: Option<&ExchangeRateConfig>,
1532) -> Result<(), PayError> {
1533    if rule.window_s == 0 {
1534        return Err(PayError::InvalidAmount(
1535            "limit rule has zero window_s".to_string(),
1536        ));
1537    }
1538    if rule.max_spend == 0 {
1539        return Err(PayError::InvalidAmount(
1540            "limit rule has zero max_spend".to_string(),
1541        ));
1542    }
1543
1544    match rule.scope {
1545        SpendScope::GlobalUsdCents => {
1546            if rule.network.is_some() || rule.wallet.is_some() {
1547                return Err(PayError::InvalidAmount(
1548                    "scope=global-usd-cents cannot set network/wallet".to_string(),
1549                ));
1550            }
1551            if rule.token.is_some() {
1552                return Err(PayError::InvalidAmount(
1553                    "scope=global-usd-cents cannot set token".to_string(),
1554                ));
1555            }
1556        }
1557        SpendScope::Network => {
1558            if rule.network.as_deref().unwrap_or("").trim().is_empty() {
1559                return Err(PayError::InvalidAmount(
1560                    "scope=network requires network".to_string(),
1561                ));
1562            }
1563            if rule.wallet.is_some() {
1564                return Err(PayError::InvalidAmount(
1565                    "scope=network cannot set wallet".to_string(),
1566                ));
1567            }
1568        }
1569        SpendScope::Wallet => {
1570            if rule.network.as_deref().unwrap_or("").trim().is_empty() {
1571                return Err(PayError::InvalidAmount(
1572                    "scope=wallet requires network".to_string(),
1573                ));
1574            }
1575            if rule.wallet.as_deref().unwrap_or("").trim().is_empty() {
1576                return Err(PayError::InvalidAmount(
1577                    "scope=wallet requires wallet".to_string(),
1578                ));
1579            }
1580        }
1581    }
1582
1583    if rule.scope == SpendScope::GlobalUsdCents && exchange_rate.is_none() {
1584        return Err(PayError::InvalidAmount(
1585            "scope=global-usd-cents requires config.exchange_rate".to_string(),
1586        ));
1587    }
1588    Ok(())
1589}
1590
1591#[cfg(feature = "redb")]
1592fn load_rules(read_txn: &redb::ReadTransaction) -> Result<Vec<SpendLimit>, PayError> {
1593    let Ok(rule_table) = read_txn.open_table(RULE_BY_ID) else {
1594        return Ok(vec![]);
1595    };
1596    rule_table
1597        .iter()
1598        .map_err(|e| PayError::InternalError(format!("spend iterate rules: {e}")))?
1599        .map(|entry| {
1600            let (_k, v) = entry
1601                .map_err(|e| PayError::InternalError(format!("spend read rule entry: {e}")))?;
1602            decode::<SpendLimit>(v.value()).map_err(|e| prepend_err("spend decode rule", e))
1603        })
1604        .collect()
1605}
1606
1607#[cfg(feature = "redb")]
1608fn load_reservations(read_txn: &redb::ReadTransaction) -> Result<Vec<SpendReservation>, PayError> {
1609    let Ok(table) = read_txn.open_table(RESERVATION_BY_ID) else {
1610        return Ok(vec![]);
1611    };
1612    table
1613        .iter()
1614        .map_err(|e| PayError::InternalError(format!("spend iterate reservations: {e}")))?
1615        .map(|entry| {
1616            let (_k, v) = entry
1617                .map_err(|e| PayError::InternalError(format!("spend read reservation: {e}")))?;
1618            decode::<SpendReservation>(v.value())
1619                .map_err(|e| prepend_err("spend decode reservation", e))
1620        })
1621        .collect()
1622}
1623
1624#[cfg(feature = "redb")]
1625fn expire_pending(_table: &mut redb::Table<u64, &str>, _now_ms: u64) -> Result<(), PayError> {
1626    Ok(())
1627}
1628
1629fn amount_for_rule(
1630    _rule: &SpendLimit,
1631    amount_native: u64,
1632    amount_usd_cents: Option<u64>,
1633    use_usd: bool,
1634) -> Result<u64, PayError> {
1635    if use_usd {
1636        amount_usd_cents.ok_or_else(|| {
1637            PayError::InternalError("missing USD amount for non-native unit rule".to_string())
1638        })
1639    } else {
1640        Ok(amount_native)
1641    }
1642}
1643
1644fn reservation_active_for_window(r: &SpendReservation, now_ms: u64) -> bool {
1645    match r.status {
1646        ReservationStatus::Confirmed => true,
1647        ReservationStatus::Pending => r.expires_at_epoch_ms > now_ms,
1648        ReservationStatus::Cancelled | ReservationStatus::Expired => false,
1649    }
1650}
1651
1652fn rule_matches_context(
1653    rule: &SpendLimit,
1654    network: &str,
1655    wallet: Option<&str>,
1656    token: Option<&str>,
1657) -> bool {
1658    if let Some(rule_token) = &rule.token {
1659        match token {
1660            Some(ctx_token) if ctx_token.eq_ignore_ascii_case(rule_token) => {}
1661            _ => return false,
1662        }
1663    }
1664    match rule.scope {
1665        SpendScope::GlobalUsdCents => true,
1666        SpendScope::Network => rule.network.as_deref() == Some(network),
1667        SpendScope::Wallet => {
1668            rule.network.as_deref() == Some(network) && rule.wallet.as_deref() == wallet
1669        }
1670    }
1671}
1672
1673fn scope_key(rule: &SpendLimit) -> String {
1674    match rule.scope {
1675        SpendScope::GlobalUsdCents => "global-usd-cents".to_string(),
1676        SpendScope::Network => rule.network.clone().unwrap_or_default(),
1677        SpendScope::Wallet => format!(
1678            "{}/{}",
1679            rule.network.clone().unwrap_or_default(),
1680            rule.wallet.clone().unwrap_or_default()
1681        ),
1682    }
1683}
1684
1685fn spent_in_window(
1686    rule: &SpendLimit,
1687    reservations: &[SpendReservation],
1688    now_ms: u64,
1689    use_usd: bool,
1690) -> Result<(u64, Option<u64>), PayError> {
1691    let window_ms = rule.window_s.saturating_mul(1000);
1692    let cutoff = now_ms.saturating_sub(window_ms);
1693
1694    let mut spent = 0u64;
1695    let mut oldest: Option<u64> = None;
1696
1697    for r in reservations {
1698        if !reservation_active_for_window(r, now_ms) {
1699            continue;
1700        }
1701        if r.created_at_epoch_ms < cutoff {
1702            continue;
1703        }
1704        if !rule_matches_context(rule, &r.network, r.wallet.as_deref(), r.token.as_deref()) {
1705            continue;
1706        }
1707
1708        let amount = if use_usd {
1709            r.amount_usd_cents.ok_or_else(|| {
1710                PayError::InternalError("reservation missing USD amount".to_string())
1711            })?
1712        } else {
1713            r.amount_native
1714        };
1715        spent = spent.saturating_add(amount);
1716        oldest = Some(oldest.map_or(r.created_at_epoch_ms, |v| v.min(r.created_at_epoch_ms)));
1717    }
1718
1719    Ok((spent, oldest))
1720}
1721
1722#[cfg(feature = "redb")]
1723fn next_counter(write_txn: &redb::WriteTransaction, key: &str) -> Result<u64, PayError> {
1724    let mut meta = write_txn
1725        .open_table(META_COUNTER)
1726        .map_err(|e| PayError::InternalError(format!("spend open meta table: {e}")))?;
1727    let current = match meta
1728        .get(key)
1729        .map_err(|e| PayError::InternalError(format!("spend read counter {key}: {e}")))?
1730    {
1731        Some(v) => v.value(),
1732        None => 0,
1733    };
1734    let next = current.saturating_add(1);
1735    meta.insert(key, next)
1736        .map_err(|e| PayError::InternalError(format!("spend write counter {key}: {e}")))?;
1737    Ok(next)
1738}
1739
1740#[cfg(test)]
1741mod tests {
1742    use super::*;
1743
1744    fn make_limit(scope: SpendScope, network: Option<&str>, wallet: Option<&str>) -> SpendLimit {
1745        SpendLimit {
1746            rule_id: None,
1747            scope,
1748            network: network.map(|s| s.to_string()),
1749            wallet: wallet.map(|s| s.to_string()),
1750            window_s: 3600,
1751            max_spend: 1000,
1752            token: None,
1753        }
1754    }
1755
1756    #[cfg(feature = "redb")]
1757    #[tokio::test]
1758    async fn provider_limit_reserve_and_confirm() {
1759        let tmp = tempfile::tempdir().unwrap();
1760        let ledger = SpendLedger::new(tmp.path().to_str().unwrap(), None);
1761
1762        ledger
1763            .set_limits(&[make_limit(SpendScope::Network, Some("cashu"), None)])
1764            .await
1765            .unwrap();
1766
1767        let ctx = SpendContext {
1768            network: "cashu".to_string(),
1769            wallet: Some("w_01".to_string()),
1770            amount_native: 400,
1771            token: None,
1772        };
1773        let r1 = ledger.reserve("op_1", &ctx).await.unwrap();
1774        ledger.confirm(r1).await.unwrap();
1775
1776        let r2 = ledger.reserve("op_2", &ctx).await.unwrap();
1777        let err = ledger.reserve("op_3", &ctx).await.unwrap_err();
1778        assert!(matches!(err, PayError::LimitExceeded { .. }));
1779
1780        ledger.cancel(r2).await.unwrap();
1781    }
1782
1783    #[cfg(feature = "redb")]
1784    #[tokio::test]
1785    async fn wallet_scope_requires_wallet_context() {
1786        let tmp = tempfile::tempdir().unwrap();
1787        let ledger = SpendLedger::new(tmp.path().to_str().unwrap(), None);
1788
1789        ledger
1790            .set_limits(&[make_limit(SpendScope::Wallet, Some("cashu"), Some("w_abc"))])
1791            .await
1792            .unwrap();
1793
1794        let ctx = SpendContext {
1795            network: "cashu".to_string(),
1796            wallet: None,
1797            amount_native: 1,
1798            token: None,
1799        };
1800        let err = ledger.reserve("op_1", &ctx).await.unwrap_err();
1801        assert!(matches!(err, PayError::InvalidAmount(_)));
1802    }
1803
1804    #[tokio::test]
1805    async fn global_usd_cents_scope_requires_exchange_rate_config() {
1806        let tmp = tempfile::tempdir().unwrap();
1807        let ledger = SpendLedger::new(tmp.path().to_str().unwrap(), None);
1808
1809        let err = ledger
1810            .set_limits(&[SpendLimit {
1811                rule_id: None,
1812                scope: SpendScope::GlobalUsdCents,
1813                network: None,
1814                wallet: None,
1815                window_s: 3600,
1816                max_spend: 100,
1817                token: None,
1818            }])
1819            .await
1820            .unwrap_err();
1821
1822        assert!(matches!(err, PayError::InvalidAmount(_)));
1823    }
1824
1825    #[cfg(feature = "redb")]
1826    #[tokio::test]
1827    async fn network_scope_native_token_ok_without_exchange_rate() {
1828        let tmp = tempfile::tempdir().unwrap();
1829        let ledger = SpendLedger::new(tmp.path().to_str().unwrap(), None);
1830
1831        ledger
1832            .set_limits(&[SpendLimit {
1833                rule_id: None,
1834                scope: SpendScope::Network,
1835                network: Some("cashu".to_string()),
1836                wallet: None,
1837                window_s: 3600,
1838                max_spend: 100,
1839                token: None,
1840            }])
1841            .await
1842            .expect("network scope should not require exchange_rate");
1843    }
1844}