Skip to main content

agent_first_pay/spend/
mod.rs

1#![cfg_attr(not(any(feature = "redb", feature = "postgres")), allow(dead_code))]
2
3pub mod tokens;
4
5use crate::provider::PayError;
6#[cfg(feature = "exchange-rate")]
7use crate::types::ExchangeRateSourceType;
8use crate::types::{ExchangeRateConfig, SpendLimit, SpendLimitStatus, SpendScope};
9#[cfg(feature = "redb")]
10use serde::de::DeserializeOwned;
11use serde::{Deserialize, Serialize};
12use tokio::sync::Mutex;
13
14#[cfg(feature = "redb")]
15use crate::store::db;
16#[cfg(feature = "redb")]
17use redb::{Database, ReadableDatabase, ReadableTable, TableDefinition};
18#[cfg(feature = "redb")]
19use std::path::{Path, PathBuf};
20
21#[cfg(feature = "redb")]
22const META_COUNTER: TableDefinition<&str, u64> = TableDefinition::new("meta_counter");
23#[cfg(feature = "redb")]
24const RULE_BY_ID: TableDefinition<&str, &str> = TableDefinition::new("rule_by_id_v3");
25#[cfg(feature = "redb")]
26const RESERVATION_BY_ID: TableDefinition<u64, &str> = TableDefinition::new("reservation_by_id");
27#[cfg(feature = "redb")]
28const RESERVATION_ID_BY_OP_ID: TableDefinition<&str, u64> =
29    TableDefinition::new("reservation_id_by_op_id");
30#[cfg(feature = "redb")]
31const SPEND_EVENT_BY_ID: TableDefinition<u64, &str> = TableDefinition::new("spend_event_by_id");
32#[cfg(feature = "redb")]
33const FX_QUOTE_BY_PAIR: TableDefinition<&str, &str> = TableDefinition::new("quote_by_pair");
34#[cfg(feature = "redb")]
35const NEXT_RESERVATION_ID_KEY: &str = "next_reservation_id";
36#[cfg(feature = "redb")]
37const NEXT_EVENT_ID_KEY: &str = "next_event_id";
38#[cfg(feature = "redb")]
39const SPEND_VERSION: u64 = 1;
40#[cfg(feature = "redb")]
41const FX_CACHE_VERSION: u64 = 1;
42
43#[derive(Debug, Clone)]
44pub struct SpendContext {
45    pub network: String,
46    pub wallet: Option<String>,
47    pub amount_native: u64,
48    pub token: Option<String>,
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
52enum ReservationStatus {
53    Pending,
54    Confirmed,
55    Cancelled,
56    Expired,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
60struct SpendReservation {
61    reservation_id: u64,
62    op_id: String,
63    network: String,
64    wallet: Option<String>,
65    #[serde(default)]
66    token: Option<String>,
67    amount_native: u64,
68    amount_usd_cents: Option<u64>,
69    status: ReservationStatus,
70    created_at_epoch_ms: u64,
71    expires_at_epoch_ms: u64,
72    finalized_at_epoch_ms: Option<u64>,
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
76struct SpendEvent {
77    event_id: u64,
78    reservation_id: u64,
79    op_id: String,
80    network: String,
81    wallet: Option<String>,
82    #[serde(default)]
83    token: Option<String>,
84    amount_native: u64,
85    amount_usd_cents: Option<u64>,
86    created_at_epoch_ms: u64,
87    confirmed_at_epoch_ms: u64,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91struct ExchangeRateQuote {
92    pair: String,
93    source: String,
94    price: f64,
95    fetched_at_epoch_ms: u64,
96    expires_at_epoch_ms: u64,
97}
98
99// ═══════════════════════════════════════════
100// SpendBackend
101// ═══════════════════════════════════════════
102
103#[allow(dead_code)] // None variant used when neither redb nor postgres features are enabled
104enum SpendBackend {
105    #[cfg(feature = "redb")]
106    Redb {
107        data_dir: String,
108    },
109    #[cfg(feature = "postgres")]
110    Postgres {
111        pool: sqlx::PgPool,
112    },
113    None,
114}
115
116// ═══════════════════════════════════════════
117// SpendLedger
118// ═══════════════════════════════════════════
119
120pub struct SpendLedger {
121    backend: SpendBackend,
122    exchange_rate: Option<ExchangeRateConfig>,
123    mu: Mutex<()>,
124    /// Set to true when a cached FX quote's age exceeds 80% of its TTL.
125    fx_stale_warned: std::sync::atomic::AtomicBool,
126}
127
128impl SpendLedger {
129    pub fn new(data_dir: &str, exchange_rate: Option<ExchangeRateConfig>) -> Self {
130        #[cfg(feature = "redb")]
131        let backend = SpendBackend::Redb {
132            data_dir: data_dir.to_string(),
133        };
134        #[cfg(not(feature = "redb"))]
135        let backend = {
136            let _ = data_dir;
137            SpendBackend::None
138        };
139        Self {
140            backend,
141            exchange_rate,
142            mu: Mutex::new(()),
143            fx_stale_warned: std::sync::atomic::AtomicBool::new(false),
144        }
145    }
146
147    #[cfg(feature = "postgres")]
148    pub fn new_postgres(pool: sqlx::PgPool, exchange_rate: Option<ExchangeRateConfig>) -> Self {
149        Self {
150            backend: SpendBackend::Postgres { pool },
151            exchange_rate,
152            mu: Mutex::new(()),
153            fx_stale_warned: std::sync::atomic::AtomicBool::new(false),
154        }
155    }
156
157    /// Returns true (once) if a stale FX quote was used since last check.
158    pub fn take_fx_stale_warning(&self) -> bool {
159        self.fx_stale_warned
160            .swap(false, std::sync::atomic::Ordering::Relaxed)
161    }
162
163    /// Add a single spend limit rule. Generates and assigns a rule_id, returns it.
164    pub async fn add_limit(&self, limit: &mut SpendLimit) -> Result<String, PayError> {
165        validate_limit(limit, self.exchange_rate.as_ref())?;
166
167        let _guard = self.mu.lock().await;
168
169        match &self.backend {
170            #[cfg(feature = "redb")]
171            SpendBackend::Redb { .. } => self.add_limit_redb(limit),
172            #[cfg(feature = "postgres")]
173            SpendBackend::Postgres { .. } => self.add_limit_postgres(limit).await,
174            SpendBackend::None => Err(PayError::NotImplemented(
175                "no storage backend for spend limits".to_string(),
176            )),
177        }
178    }
179
180    /// Remove a spend limit rule by its rule_id.
181    pub async fn remove_limit(&self, _rule_id: &str) -> Result<(), PayError> {
182        let _guard = self.mu.lock().await;
183
184        match &self.backend {
185            #[cfg(feature = "redb")]
186            SpendBackend::Redb { .. } => self.remove_limit_redb(_rule_id),
187            #[cfg(feature = "postgres")]
188            SpendBackend::Postgres { .. } => self.remove_limit_postgres(_rule_id).await,
189            SpendBackend::None => Err(PayError::NotImplemented(
190                "no storage backend for spend limits".to_string(),
191            )),
192        }
193    }
194
195    /// Replace all spend limits (used by config patch / pipe mode).
196    pub async fn set_limits(&self, limits: &[SpendLimit]) -> Result<(), PayError> {
197        for limit in limits {
198            validate_limit(limit, self.exchange_rate.as_ref())?;
199        }
200
201        let _guard = self.mu.lock().await;
202
203        match &self.backend {
204            #[cfg(feature = "redb")]
205            SpendBackend::Redb { .. } => self.set_limits_redb(limits),
206            #[cfg(feature = "postgres")]
207            SpendBackend::Postgres { .. } => self.set_limits_postgres(limits).await,
208            SpendBackend::None => Err(PayError::NotImplemented(
209                "no storage backend for spend limits".to_string(),
210            )),
211        }
212    }
213
214    /// Compute current status for all limits.
215    pub async fn get_status(&self) -> Result<Vec<SpendLimitStatus>, PayError> {
216        let _guard = self.mu.lock().await;
217
218        match &self.backend {
219            #[cfg(feature = "redb")]
220            SpendBackend::Redb { .. } => self.get_status_redb(),
221            #[cfg(feature = "postgres")]
222            SpendBackend::Postgres { .. } => self.get_status_postgres().await,
223            SpendBackend::None => Ok(Vec::new()),
224        }
225    }
226
227    /// Reserve spend against all matching limits, returns reservation id.
228    pub async fn reserve(&self, op_id: &str, ctx: &SpendContext) -> Result<u64, PayError> {
229        if op_id.trim().is_empty() {
230            return Err(PayError::InvalidAmount("op_id cannot be empty".to_string()));
231        }
232        if ctx.network.trim().is_empty() {
233            return Err(PayError::InvalidAmount(
234                "network cannot be empty for spend check".to_string(),
235            ));
236        }
237
238        let _guard = self.mu.lock().await;
239
240        match &self.backend {
241            #[cfg(feature = "redb")]
242            SpendBackend::Redb { .. } => self.reserve_redb(op_id, ctx).await,
243            #[cfg(feature = "postgres")]
244            SpendBackend::Postgres { .. } => self.reserve_postgres(op_id, ctx).await,
245            SpendBackend::None => Err(PayError::NotImplemented(
246                "no storage backend for spend limits".to_string(),
247            )),
248        }
249    }
250
251    pub async fn confirm(&self, _reservation_id: u64) -> Result<(), PayError> {
252        let _guard = self.mu.lock().await;
253
254        match &self.backend {
255            #[cfg(feature = "redb")]
256            SpendBackend::Redb { .. } => self.confirm_redb(_reservation_id),
257            #[cfg(feature = "postgres")]
258            SpendBackend::Postgres { .. } => self.confirm_postgres(_reservation_id).await,
259            SpendBackend::None => Err(PayError::NotImplemented(
260                "no storage backend for spend limits".to_string(),
261            )),
262        }
263    }
264
265    pub async fn cancel(&self, _reservation_id: u64) -> Result<(), PayError> {
266        let _guard = self.mu.lock().await;
267
268        match &self.backend {
269            #[cfg(feature = "redb")]
270            SpendBackend::Redb { .. } => self.cancel_redb(_reservation_id),
271            #[cfg(feature = "postgres")]
272            SpendBackend::Postgres { .. } => self.cancel_postgres(_reservation_id).await,
273            SpendBackend::None => Ok(()),
274        }
275    }
276}
277
278// ═══════════════════════════════════════════
279// Redb backend implementation
280// ═══════════════════════════════════════════
281
282#[cfg(feature = "redb")]
283impl SpendLedger {
284    fn spend_db_path(&self) -> PathBuf {
285        match &self.backend {
286            SpendBackend::Redb { data_dir } => Path::new(data_dir).join("spend").join("spend.redb"),
287            #[allow(unreachable_patterns)]
288            _ => PathBuf::new(),
289        }
290    }
291
292    fn exchange_rate_db_path(&self) -> PathBuf {
293        match &self.backend {
294            SpendBackend::Redb { data_dir } => Path::new(data_dir)
295                .join("spend")
296                .join("exchange-rate-cache.redb"),
297            #[allow(unreachable_patterns)]
298            _ => PathBuf::new(),
299        }
300    }
301
302    fn open_spend_db(&self) -> Result<Database, PayError> {
303        db::open_and_migrate(
304            &self.spend_db_path(),
305            SPEND_VERSION,
306            &[
307                // v0 → v1: no data migration, just stamp version
308                &|_db: &Database| Ok(()),
309            ],
310        )
311    }
312
313    fn open_exchange_rate_db(&self) -> Result<Database, PayError> {
314        db::open_and_migrate(
315            &self.exchange_rate_db_path(),
316            FX_CACHE_VERSION,
317            &[
318                // v0 → v1: no data migration, just stamp version
319                &|_db: &Database| Ok(()),
320            ],
321        )
322    }
323
324    fn add_limit_redb(&self, limit: &mut SpendLimit) -> Result<String, PayError> {
325        let db = self.open_spend_db()?;
326        let rule_id = generate_rule_identifier()?;
327        limit.rule_id = Some(rule_id.clone());
328        let encoded = encode(limit)?;
329        let write_txn = db
330            .begin_write()
331            .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
332        {
333            let mut rule_table = write_txn
334                .open_table(RULE_BY_ID)
335                .map_err(|e| PayError::InternalError(format!("spend open rule table: {e}")))?;
336            rule_table
337                .insert(rule_id.as_str(), encoded.as_str())
338                .map_err(|e| PayError::InternalError(format!("spend insert rule: {e}")))?;
339        }
340        write_txn
341            .commit()
342            .map_err(|e| PayError::InternalError(format!("spend commit add_limit: {e}")))?;
343        Ok(rule_id)
344    }
345
346    fn remove_limit_redb(&self, rule_id: &str) -> Result<(), PayError> {
347        let db = self.open_spend_db()?;
348        let write_txn = db
349            .begin_write()
350            .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
351        {
352            let mut rule_table = write_txn
353                .open_table(RULE_BY_ID)
354                .map_err(|e| PayError::InternalError(format!("spend open rule table: {e}")))?;
355            let existed = rule_table
356                .remove(rule_id)
357                .map_err(|e| PayError::InternalError(format!("spend remove rule: {e}")))?;
358            if existed.is_none() {
359                return Err(PayError::InvalidAmount(format!(
360                    "rule_id '{rule_id}' not found"
361                )));
362            }
363        }
364        write_txn
365            .commit()
366            .map_err(|e| PayError::InternalError(format!("spend commit remove_limit: {e}")))
367    }
368
369    fn set_limits_redb(&self, limits: &[SpendLimit]) -> Result<(), PayError> {
370        let db = self.open_spend_db()?;
371        let write_txn = db
372            .begin_write()
373            .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
374        {
375            let mut rule_table = write_txn
376                .open_table(RULE_BY_ID)
377                .map_err(|e| PayError::InternalError(format!("spend open rule table: {e}")))?;
378            // Clear existing rules
379            let existing_ids = rule_table
380                .iter()
381                .map_err(|e| PayError::InternalError(format!("spend iterate rules: {e}")))?
382                .map(|entry| {
383                    entry
384                        .map(|(k, _)| k.value().to_string())
385                        .map_err(|e| PayError::InternalError(format!("spend read rule key: {e}")))
386                })
387                .collect::<Result<Vec<_>, _>>()?;
388            for rid in existing_ids {
389                rule_table
390                    .remove(rid.as_str())
391                    .map_err(|e| PayError::InternalError(format!("spend remove rule: {e}")))?;
392            }
393
394            // Insert new rules with generated IDs
395            for limit in limits {
396                let mut rule = limit.clone();
397                let rid = generate_rule_identifier()?;
398                rule.rule_id = Some(rid.clone());
399                let encoded = encode(&rule)?;
400                rule_table
401                    .insert(rid.as_str(), encoded.as_str())
402                    .map_err(|e| PayError::InternalError(format!("spend insert rule: {e}")))?;
403            }
404        }
405        write_txn
406            .commit()
407            .map_err(|e| PayError::InternalError(format!("spend commit set_limits: {e}")))
408    }
409
410    fn get_status_redb(&self) -> Result<Vec<SpendLimitStatus>, PayError> {
411        let db = self.open_spend_db()?;
412        let read_txn = db
413            .begin_read()
414            .map_err(|e| PayError::InternalError(format!("spend begin_read: {e}")))?;
415        let rules = load_rules(&read_txn)?;
416        let reservations = load_reservations(&read_txn)?;
417        let now = now_epoch_ms();
418        let mut out = Vec::with_capacity(rules.len());
419        for rule in rules {
420            let use_usd = rule.scope == SpendScope::GlobalUsdCents;
421            let (spent, oldest_ts) = spent_in_window(&rule, &reservations, now, use_usd)?;
422            let remaining = rule.max_spend.saturating_sub(spent);
423            let window_ms = rule.window_s.saturating_mul(1000);
424            let window_reset_s = oldest_ts
425                .map(|oldest| (oldest.saturating_add(window_ms)).saturating_sub(now) / 1000)
426                .unwrap_or(0);
427            out.push(SpendLimitStatus {
428                rule_id: rule.rule_id.clone().unwrap_or_default(),
429                scope: rule.scope,
430                network: rule.network.clone(),
431                wallet: rule.wallet.clone(),
432                window_s: rule.window_s,
433                max_spend: rule.max_spend,
434                spent,
435                remaining,
436                token: rule.token.clone(),
437                window_reset_s,
438            });
439        }
440        Ok(out)
441    }
442
443    async fn reserve_redb(&self, op_id: &str, ctx: &SpendContext) -> Result<u64, PayError> {
444        let now = now_epoch_ms();
445        let db = self.open_spend_db()?;
446
447        let read_txn = db
448            .begin_read()
449            .map_err(|e| PayError::InternalError(format!("spend begin_read: {e}")))?;
450        let rules = load_rules(&read_txn)?;
451
452        if rules.iter().any(|r| {
453            r.scope == SpendScope::Wallet
454                && r.network.as_deref() == Some(ctx.network.as_str())
455                && ctx.wallet.is_none()
456        }) {
457            return Err(PayError::InvalidAmount(
458                "wallet-scoped limits require an explicit wallet".to_string(),
459            ));
460        }
461
462        // GlobalUsdCents scope needs USD conversion
463        let needs_usd = rules.iter().any(|r| r.scope == SpendScope::GlobalUsdCents);
464        let amount_usd_cents = if needs_usd {
465            Some(
466                self.amount_to_usd_cents(&ctx.network, ctx.token.as_deref(), ctx.amount_native)
467                    .await?,
468            )
469        } else {
470            None
471        };
472
473        let write_txn = db
474            .begin_write()
475            .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
476
477        let mut encoded_blobs: Vec<String> = Vec::new();
478        let reservation_id = {
479            let mut reservation_index =
480                write_txn.open_table(RESERVATION_ID_BY_OP_ID).map_err(|e| {
481                    PayError::InternalError(format!("spend open reservation op index: {e}"))
482                })?;
483            if let Some(existing) = reservation_index
484                .get(op_id)
485                .map_err(|e| PayError::InternalError(format!("spend read op index: {e}")))?
486            {
487                let existing_id = existing.value();
488                return Ok(existing_id);
489            }
490
491            let mut reservation_table = write_txn.open_table(RESERVATION_BY_ID).map_err(|e| {
492                PayError::InternalError(format!("spend open reservation table: {e}"))
493            })?;
494
495            expire_pending(&mut reservation_table, now)?;
496
497            let reservations = reservation_table
498                .iter()
499                .map_err(|e| PayError::InternalError(format!("spend iterate reservations: {e}")))?
500                .map(|entry| {
501                    let (_k, v) = entry.map_err(|e| {
502                        PayError::InternalError(format!("spend read reservation: {e}"))
503                    })?;
504                    decode::<SpendReservation>(v.value())
505                        .map_err(|e| prepend_err("spend decode reservation", e))
506                })
507                .collect::<Result<Vec<_>, _>>()?;
508
509            for rule in rules.iter() {
510                if !rule_matches_context(
511                    rule,
512                    &ctx.network,
513                    ctx.wallet.as_deref(),
514                    ctx.token.as_deref(),
515                ) {
516                    continue;
517                }
518
519                let use_usd = rule.scope == SpendScope::GlobalUsdCents;
520                let candidate_amount =
521                    amount_for_rule(rule, ctx.amount_native, amount_usd_cents, use_usd)?;
522                let (spent, oldest_ts) = spent_in_window(rule, &reservations, now, use_usd)?;
523                if spent.saturating_add(candidate_amount) > rule.max_spend {
524                    let window_ms = rule.window_s.saturating_mul(1000);
525                    let remaining_s = oldest_ts
526                        .map(|oldest| (oldest.saturating_add(window_ms)).saturating_sub(now) / 1000)
527                        .unwrap_or(0);
528
529                    return Err(PayError::LimitExceeded {
530                        rule_id: rule.rule_id.clone().unwrap_or_default(),
531                        scope: rule.scope,
532                        scope_key: scope_key(rule),
533                        spent,
534                        max_spend: rule.max_spend,
535                        token: rule.token.clone(),
536                        remaining_s,
537                        origin: None,
538                    });
539                }
540            }
541
542            let reservation_id = next_counter(&write_txn, NEXT_RESERVATION_ID_KEY)?;
543            let reservation = SpendReservation {
544                reservation_id,
545                op_id: op_id.to_string(),
546                network: ctx.network.clone(),
547                wallet: ctx.wallet.clone(),
548                token: ctx.token.clone(),
549                amount_native: ctx.amount_native,
550                amount_usd_cents,
551                status: ReservationStatus::Pending,
552                created_at_epoch_ms: now,
553                expires_at_epoch_ms: now.saturating_add(300_000),
554                finalized_at_epoch_ms: None,
555            };
556            encoded_blobs.push(encode(&reservation)?);
557            let encoded = encoded_blobs
558                .last()
559                .ok_or_else(|| PayError::InternalError("missing reservation blob".to_string()))?;
560            reservation_table
561                .insert(reservation_id, encoded.as_str())
562                .map_err(|e| PayError::InternalError(format!("spend insert reservation: {e}")))?;
563            reservation_index
564                .insert(op_id, reservation_id)
565                .map_err(|e| PayError::InternalError(format!("spend insert op index: {e}")))?;
566            reservation_id
567        };
568
569        write_txn
570            .commit()
571            .map_err(|e| PayError::InternalError(format!("spend commit reserve: {e}")))?;
572        Ok(reservation_id)
573    }
574
575    fn confirm_redb(&self, reservation_id: u64) -> Result<(), PayError> {
576        let db = self.open_spend_db()?;
577        let now = now_epoch_ms();
578
579        let write_txn = db
580            .begin_write()
581            .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
582
583        let mut encoded_blobs: Vec<String> = Vec::new();
584        {
585            let mut reservation_table = write_txn.open_table(RESERVATION_BY_ID).map_err(|e| {
586                PayError::InternalError(format!("spend open reservation table: {e}"))
587            })?;
588            let Some(existing_bytes) = reservation_table
589                .get(reservation_id)
590                .map_err(|e| PayError::InternalError(format!("spend read reservation: {e}")))?
591                .map(|g| g.value().to_string())
592            else {
593                return Err(PayError::InternalError(format!(
594                    "reservation {reservation_id} not found"
595                )));
596            };
597
598            let mut reservation: SpendReservation = decode(&existing_bytes)?;
599            if !matches!(reservation.status, ReservationStatus::Pending) {
600                return Ok(());
601            }
602
603            reservation.status = ReservationStatus::Confirmed;
604            reservation.finalized_at_epoch_ms = Some(now);
605            encoded_blobs.push(encode(&reservation)?);
606            let encoded = encoded_blobs
607                .last()
608                .ok_or_else(|| PayError::InternalError("missing reservation blob".to_string()))?;
609            reservation_table
610                .insert(reservation_id, encoded.as_str())
611                .map_err(|e| PayError::InternalError(format!("spend update reservation: {e}")))?;
612
613            let mut events = write_txn
614                .open_table(SPEND_EVENT_BY_ID)
615                .map_err(|e| PayError::InternalError(format!("spend open event table: {e}")))?;
616            let event_id = next_counter(&write_txn, NEXT_EVENT_ID_KEY)?;
617            let event = SpendEvent {
618                event_id,
619                reservation_id,
620                op_id: reservation.op_id,
621                network: reservation.network,
622                wallet: reservation.wallet,
623                token: reservation.token,
624                amount_native: reservation.amount_native,
625                amount_usd_cents: reservation.amount_usd_cents,
626                created_at_epoch_ms: reservation.created_at_epoch_ms,
627                confirmed_at_epoch_ms: now,
628            };
629            encoded_blobs.push(encode(&event)?);
630            let encoded_event = encoded_blobs
631                .last()
632                .ok_or_else(|| PayError::InternalError("missing event blob".to_string()))?;
633            events
634                .insert(event_id, encoded_event.as_str())
635                .map_err(|e| PayError::InternalError(format!("spend insert event: {e}")))?;
636        }
637
638        write_txn
639            .commit()
640            .map_err(|e| PayError::InternalError(format!("spend commit confirm: {e}")))
641    }
642
643    fn cancel_redb(&self, reservation_id: u64) -> Result<(), PayError> {
644        let db = self.open_spend_db()?;
645        let now = now_epoch_ms();
646
647        let write_txn = db
648            .begin_write()
649            .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
650
651        let mut encoded_blobs: Vec<String> = Vec::new();
652        {
653            let mut reservation_table = write_txn.open_table(RESERVATION_BY_ID).map_err(|e| {
654                PayError::InternalError(format!("spend open reservation table: {e}"))
655            })?;
656            let existing = reservation_table
657                .get(reservation_id)
658                .map_err(|e| PayError::InternalError(format!("spend read reservation: {e}")))?;
659            let existing_bytes = existing.map(|g| g.value().to_string());
660            if let Some(existing_bytes) = existing_bytes {
661                let mut reservation: SpendReservation = decode(&existing_bytes)?;
662                if matches!(reservation.status, ReservationStatus::Pending) {
663                    reservation.status = ReservationStatus::Cancelled;
664                    reservation.finalized_at_epoch_ms = Some(now);
665                    encoded_blobs.push(encode(&reservation)?);
666                    let encoded = encoded_blobs.last().ok_or_else(|| {
667                        PayError::InternalError("missing reservation blob".to_string())
668                    })?;
669                    reservation_table
670                        .insert(reservation_id, encoded.as_str())
671                        .map_err(|e| {
672                            PayError::InternalError(format!("spend update reservation: {e}"))
673                        })?;
674                }
675            }
676        }
677
678        write_txn
679            .commit()
680            .map_err(|e| PayError::InternalError(format!("spend commit cancel: {e}")))
681    }
682}
683
684// ═══════════════════════════════════════════
685// Postgres backend implementation
686// ═══════════════════════════════════════════
687
688#[cfg(feature = "postgres")]
689impl SpendLedger {
690    fn pg_pool(&self) -> Result<&sqlx::PgPool, PayError> {
691        match &self.backend {
692            SpendBackend::Postgres { pool } => Ok(pool),
693            _ => Err(PayError::InternalError(
694                "expected postgres spend backend".to_string(),
695            )),
696        }
697    }
698
699    async fn add_limit_postgres(&self, limit: &mut SpendLimit) -> Result<String, PayError> {
700        let pool = self.pg_pool()?;
701        let rule_id = generate_rule_identifier()?;
702        limit.rule_id = Some(rule_id.clone());
703        let rule_json = serde_json::to_value(limit)
704            .map_err(|e| PayError::InternalError(format!("serialize spend rule: {e}")))?;
705
706        sqlx::query("INSERT INTO spend_rules (rule_id, rule) VALUES ($1, $2)")
707            .bind(&rule_id)
708            .bind(&rule_json)
709            .execute(pool)
710            .await
711            .map_err(|e| PayError::InternalError(format!("pg insert spend rule: {e}")))?;
712
713        Ok(rule_id)
714    }
715
716    async fn remove_limit_postgres(&self, rule_id: &str) -> Result<(), PayError> {
717        let pool = self.pg_pool()?;
718        let result = sqlx::query("DELETE FROM spend_rules WHERE rule_id = $1")
719            .bind(rule_id)
720            .execute(pool)
721            .await
722            .map_err(|e| PayError::InternalError(format!("pg delete spend rule: {e}")))?;
723
724        if result.rows_affected() == 0 {
725            return Err(PayError::InvalidAmount(format!(
726                "rule_id '{rule_id}' not found"
727            )));
728        }
729        Ok(())
730    }
731
732    async fn set_limits_postgres(&self, limits: &[SpendLimit]) -> Result<(), PayError> {
733        let pool = self.pg_pool()?;
734        let mut tx = pool
735            .begin()
736            .await
737            .map_err(|e| PayError::InternalError(format!("pg begin tx: {e}")))?;
738
739        sqlx::query("DELETE FROM spend_rules")
740            .execute(&mut *tx)
741            .await
742            .map_err(|e| PayError::InternalError(format!("pg clear spend rules: {e}")))?;
743
744        for limit in limits {
745            let mut rule = limit.clone();
746            let rid = generate_rule_identifier()?;
747            rule.rule_id = Some(rid.clone());
748            let rule_json = serde_json::to_value(&rule)
749                .map_err(|e| PayError::InternalError(format!("serialize spend rule: {e}")))?;
750            sqlx::query("INSERT INTO spend_rules (rule_id, rule) VALUES ($1, $2)")
751                .bind(&rid)
752                .bind(&rule_json)
753                .execute(&mut *tx)
754                .await
755                .map_err(|e| PayError::InternalError(format!("pg insert spend rule: {e}")))?;
756        }
757
758        tx.commit()
759            .await
760            .map_err(|e| PayError::InternalError(format!("pg commit set_limits: {e}")))
761    }
762
763    async fn get_status_postgres(&self) -> Result<Vec<SpendLimitStatus>, PayError> {
764        let pool = self.pg_pool()?;
765        let rules = pg_load_rules(pool).await?;
766        let reservations = pg_load_reservations(pool).await?;
767        let now = now_epoch_ms();
768
769        let mut out = Vec::with_capacity(rules.len());
770        for rule in rules {
771            let use_usd = rule.scope == SpendScope::GlobalUsdCents;
772            let (spent, oldest_ts) = spent_in_window(&rule, &reservations, now, use_usd)?;
773            let remaining = rule.max_spend.saturating_sub(spent);
774            let window_ms = rule.window_s.saturating_mul(1000);
775            let window_reset_s = oldest_ts
776                .map(|oldest| (oldest.saturating_add(window_ms)).saturating_sub(now) / 1000)
777                .unwrap_or(0);
778            out.push(SpendLimitStatus {
779                rule_id: rule.rule_id.clone().unwrap_or_default(),
780                scope: rule.scope,
781                network: rule.network.clone(),
782                wallet: rule.wallet.clone(),
783                window_s: rule.window_s,
784                max_spend: rule.max_spend,
785                spent,
786                remaining,
787                token: rule.token.clone(),
788                window_reset_s,
789            });
790        }
791        Ok(out)
792    }
793
794    async fn reserve_postgres(&self, op_id: &str, ctx: &SpendContext) -> Result<u64, PayError> {
795        use crate::store::postgres_store::SPEND_ADVISORY_LOCK_KEY;
796
797        let pool = self.pg_pool()?;
798        let now = now_epoch_ms();
799
800        // Pre-flight: load rules outside the transaction for USD conversion
801        let rules = pg_load_rules(pool).await?;
802        if rules.iter().any(|r| {
803            r.scope == SpendScope::Wallet
804                && r.network.as_deref() == Some(ctx.network.as_str())
805                && ctx.wallet.is_none()
806        }) {
807            return Err(PayError::InvalidAmount(
808                "wallet-scoped limits require an explicit wallet".to_string(),
809            ));
810        }
811
812        let needs_usd = rules.iter().any(|r| r.scope == SpendScope::GlobalUsdCents);
813        let amount_usd_cents = if needs_usd {
814            Some(
815                self.amount_to_usd_cents(&ctx.network, ctx.token.as_deref(), ctx.amount_native)
816                    .await?,
817            )
818        } else {
819            None
820        };
821
822        // Begin serializable transaction with advisory lock
823        let mut tx = pool
824            .begin()
825            .await
826            .map_err(|e| PayError::InternalError(format!("pg begin tx: {e}")))?;
827
828        sqlx::query("SELECT pg_advisory_xact_lock($1)")
829            .bind(SPEND_ADVISORY_LOCK_KEY)
830            .execute(&mut *tx)
831            .await
832            .map_err(|e| PayError::InternalError(format!("pg advisory lock: {e}")))?;
833
834        // Check for existing reservation with same op_id (idempotency)
835        let existing: Option<(i64,)> =
836            sqlx::query_as("SELECT reservation_id FROM spend_reservations WHERE op_id = $1")
837                .bind(op_id)
838                .fetch_optional(&mut *tx)
839                .await
840                .map_err(|e| PayError::InternalError(format!("pg check op_id: {e}")))?;
841
842        if let Some((rid,)) = existing {
843            return Ok(rid as u64);
844        }
845
846        // Expire pending reservations
847        pg_expire_pending(&mut tx, now).await?;
848
849        // Load all reservations within the lock
850        let reservations = pg_load_reservations_tx(&mut tx).await?;
851
852        // Re-load rules within the lock (could have changed)
853        let rules = pg_load_rules_tx(&mut tx).await?;
854
855        // Check limits
856        for rule in rules.iter() {
857            if !rule_matches_context(
858                rule,
859                &ctx.network,
860                ctx.wallet.as_deref(),
861                ctx.token.as_deref(),
862            ) {
863                continue;
864            }
865
866            let use_usd = rule.scope == SpendScope::GlobalUsdCents;
867            let candidate_amount =
868                amount_for_rule(rule, ctx.amount_native, amount_usd_cents, use_usd)?;
869            let (spent, oldest_ts) = spent_in_window(rule, &reservations, now, use_usd)?;
870            if spent.saturating_add(candidate_amount) > rule.max_spend {
871                let window_ms = rule.window_s.saturating_mul(1000);
872                let remaining_s = oldest_ts
873                    .map(|oldest| (oldest.saturating_add(window_ms)).saturating_sub(now) / 1000)
874                    .unwrap_or(0);
875
876                return Err(PayError::LimitExceeded {
877                    rule_id: rule.rule_id.clone().unwrap_or_default(),
878                    scope: rule.scope,
879                    scope_key: scope_key(rule),
880                    spent,
881                    max_spend: rule.max_spend,
882                    token: rule.token.clone(),
883                    remaining_s,
884                    origin: None,
885                });
886            }
887        }
888
889        // Insert reservation
890        let reservation = SpendReservation {
891            reservation_id: 0, // will be assigned by BIGSERIAL
892            op_id: op_id.to_string(),
893            network: ctx.network.clone(),
894            wallet: ctx.wallet.clone(),
895            token: ctx.token.clone(),
896            amount_native: ctx.amount_native,
897            amount_usd_cents,
898            status: ReservationStatus::Pending,
899            created_at_epoch_ms: now,
900            expires_at_epoch_ms: now.saturating_add(300_000),
901            finalized_at_epoch_ms: None,
902        };
903        let reservation_json = serde_json::to_value(&reservation)
904            .map_err(|e| PayError::InternalError(format!("serialize reservation: {e}")))?;
905
906        let row: (i64,) = sqlx::query_as(
907            "INSERT INTO spend_reservations (op_id, reservation) \
908             VALUES ($1, $2) RETURNING reservation_id",
909        )
910        .bind(op_id)
911        .bind(&reservation_json)
912        .fetch_one(&mut *tx)
913        .await
914        .map_err(|e| PayError::InternalError(format!("pg insert reservation: {e}")))?;
915
916        let reservation_id = row.0 as u64;
917
918        // Update the reservation JSON with the assigned ID
919        let mut updated_json = reservation_json;
920        updated_json["reservation_id"] = serde_json::json!(reservation_id);
921        sqlx::query("UPDATE spend_reservations SET reservation = $1 WHERE reservation_id = $2")
922            .bind(&updated_json)
923            .bind(row.0)
924            .execute(&mut *tx)
925            .await
926            .map_err(|e| PayError::InternalError(format!("pg update reservation id: {e}")))?;
927
928        tx.commit()
929            .await
930            .map_err(|e| PayError::InternalError(format!("pg commit reserve: {e}")))?;
931
932        Ok(reservation_id)
933    }
934
935    async fn confirm_postgres(&self, reservation_id: u64) -> Result<(), PayError> {
936        let pool = self.pg_pool()?;
937        let now = now_epoch_ms();
938        let rid = reservation_id as i64;
939
940        let row: Option<(serde_json::Value,)> =
941            sqlx::query_as("SELECT reservation FROM spend_reservations WHERE reservation_id = $1")
942                .bind(rid)
943                .fetch_optional(pool)
944                .await
945                .map_err(|e| PayError::InternalError(format!("pg read reservation: {e}")))?;
946
947        let Some((res_json,)) = row else {
948            return Err(PayError::InternalError(format!(
949                "reservation {reservation_id} not found"
950            )));
951        };
952
953        let mut reservation: SpendReservation = serde_json::from_value(res_json)
954            .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))?;
955
956        if !matches!(reservation.status, ReservationStatus::Pending) {
957            return Ok(());
958        }
959
960        reservation.status = ReservationStatus::Confirmed;
961        reservation.finalized_at_epoch_ms = Some(now);
962        let updated_json = serde_json::to_value(&reservation)
963            .map_err(|e| PayError::InternalError(format!("serialize reservation: {e}")))?;
964
965        let event = SpendEvent {
966            event_id: 0, // assigned by BIGSERIAL
967            reservation_id,
968            op_id: reservation.op_id,
969            network: reservation.network,
970            wallet: reservation.wallet,
971            token: reservation.token,
972            amount_native: reservation.amount_native,
973            amount_usd_cents: reservation.amount_usd_cents,
974            created_at_epoch_ms: reservation.created_at_epoch_ms,
975            confirmed_at_epoch_ms: now,
976        };
977        let event_json = serde_json::to_value(&event)
978            .map_err(|e| PayError::InternalError(format!("serialize spend event: {e}")))?;
979
980        let mut tx = pool
981            .begin()
982            .await
983            .map_err(|e| PayError::InternalError(format!("pg begin tx: {e}")))?;
984
985        sqlx::query("UPDATE spend_reservations SET reservation = $1 WHERE reservation_id = $2")
986            .bind(&updated_json)
987            .bind(rid)
988            .execute(&mut *tx)
989            .await
990            .map_err(|e| PayError::InternalError(format!("pg update reservation: {e}")))?;
991
992        sqlx::query("INSERT INTO spend_events (reservation_id, event) VALUES ($1, $2)")
993            .bind(rid)
994            .bind(&event_json)
995            .execute(&mut *tx)
996            .await
997            .map_err(|e| PayError::InternalError(format!("pg insert spend event: {e}")))?;
998
999        tx.commit()
1000            .await
1001            .map_err(|e| PayError::InternalError(format!("pg commit confirm: {e}")))
1002    }
1003
1004    async fn cancel_postgres(&self, reservation_id: u64) -> Result<(), PayError> {
1005        let pool = self.pg_pool()?;
1006        let now = now_epoch_ms();
1007        let rid = reservation_id as i64;
1008
1009        let row: Option<(serde_json::Value,)> =
1010            sqlx::query_as("SELECT reservation FROM spend_reservations WHERE reservation_id = $1")
1011                .bind(rid)
1012                .fetch_optional(pool)
1013                .await
1014                .map_err(|e| PayError::InternalError(format!("pg read reservation: {e}")))?;
1015
1016        if let Some((res_json,)) = row {
1017            let mut reservation: SpendReservation = serde_json::from_value(res_json)
1018                .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))?;
1019
1020            if matches!(reservation.status, ReservationStatus::Pending) {
1021                reservation.status = ReservationStatus::Cancelled;
1022                reservation.finalized_at_epoch_ms = Some(now);
1023                let updated_json = serde_json::to_value(&reservation)
1024                    .map_err(|e| PayError::InternalError(format!("serialize reservation: {e}")))?;
1025
1026                sqlx::query(
1027                    "UPDATE spend_reservations SET reservation = $1 WHERE reservation_id = $2",
1028                )
1029                .bind(&updated_json)
1030                .bind(rid)
1031                .execute(pool)
1032                .await
1033                .map_err(|e| PayError::InternalError(format!("pg update reservation: {e}")))?;
1034            }
1035        }
1036
1037        Ok(())
1038    }
1039}
1040
1041#[cfg(feature = "postgres")]
1042async fn pg_load_rules(pool: &sqlx::PgPool) -> Result<Vec<SpendLimit>, PayError> {
1043    let rows: Vec<(serde_json::Value,)> =
1044        sqlx::query_as("SELECT rule FROM spend_rules ORDER BY rule_id")
1045            .fetch_all(pool)
1046            .await
1047            .map_err(|e| PayError::InternalError(format!("pg load spend rules: {e}")))?;
1048    rows.into_iter()
1049        .map(|(v,)| {
1050            serde_json::from_value(v)
1051                .map_err(|e| PayError::InternalError(format!("pg parse spend rule: {e}")))
1052        })
1053        .collect()
1054}
1055
1056#[cfg(feature = "postgres")]
1057async fn pg_load_rules_tx(
1058    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1059) -> Result<Vec<SpendLimit>, PayError> {
1060    let rows: Vec<(serde_json::Value,)> =
1061        sqlx::query_as("SELECT rule FROM spend_rules ORDER BY rule_id")
1062            .fetch_all(&mut **tx)
1063            .await
1064            .map_err(|e| PayError::InternalError(format!("pg load spend rules: {e}")))?;
1065    rows.into_iter()
1066        .map(|(v,)| {
1067            serde_json::from_value(v)
1068                .map_err(|e| PayError::InternalError(format!("pg parse spend rule: {e}")))
1069        })
1070        .collect()
1071}
1072
1073#[cfg(feature = "postgres")]
1074async fn pg_load_reservations(pool: &sqlx::PgPool) -> Result<Vec<SpendReservation>, PayError> {
1075    let rows: Vec<(serde_json::Value,)> =
1076        sqlx::query_as("SELECT reservation FROM spend_reservations ORDER BY reservation_id")
1077            .fetch_all(pool)
1078            .await
1079            .map_err(|e| PayError::InternalError(format!("pg load reservations: {e}")))?;
1080    rows.into_iter()
1081        .map(|(v,)| {
1082            serde_json::from_value(v)
1083                .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))
1084        })
1085        .collect()
1086}
1087
1088#[cfg(feature = "postgres")]
1089async fn pg_load_reservations_tx(
1090    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1091) -> Result<Vec<SpendReservation>, PayError> {
1092    let rows: Vec<(serde_json::Value,)> =
1093        sqlx::query_as("SELECT reservation FROM spend_reservations ORDER BY reservation_id")
1094            .fetch_all(&mut **tx)
1095            .await
1096            .map_err(|e| PayError::InternalError(format!("pg load reservations: {e}")))?;
1097    rows.into_iter()
1098        .map(|(v,)| {
1099            serde_json::from_value(v)
1100                .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))
1101        })
1102        .collect()
1103}
1104
1105#[cfg(feature = "postgres")]
1106async fn pg_expire_pending(
1107    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1108    now_ms: u64,
1109) -> Result<(), PayError> {
1110    // Load pending reservations and expire those past their deadline
1111    let rows: Vec<(i64, serde_json::Value)> =
1112        sqlx::query_as("SELECT reservation_id, reservation FROM spend_reservations")
1113            .fetch_all(&mut **tx)
1114            .await
1115            .map_err(|e| {
1116                PayError::InternalError(format!("pg load reservations for expire: {e}"))
1117            })?;
1118
1119    for (rid, res_json) in rows {
1120        let mut reservation: SpendReservation = serde_json::from_value(res_json)
1121            .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))?;
1122        if matches!(reservation.status, ReservationStatus::Pending)
1123            && reservation.expires_at_epoch_ms <= now_ms
1124        {
1125            reservation.status = ReservationStatus::Expired;
1126            reservation.finalized_at_epoch_ms = Some(now_ms);
1127            let updated = serde_json::to_value(&reservation)
1128                .map_err(|e| PayError::InternalError(format!("serialize reservation: {e}")))?;
1129            sqlx::query("UPDATE spend_reservations SET reservation = $1 WHERE reservation_id = $2")
1130                .bind(&updated)
1131                .bind(rid)
1132                .execute(&mut **tx)
1133                .await
1134                .map_err(|e| PayError::InternalError(format!("pg expire reservation: {e}")))?;
1135        }
1136    }
1137    Ok(())
1138}
1139
1140// ═══════════════════════════════════════════
1141// Exchange rate (shared, delegates to backend for caching)
1142// ═══════════════════════════════════════════
1143
1144impl SpendLedger {
1145    async fn amount_to_usd_cents(
1146        &self,
1147        network: &str,
1148        token: Option<&str>,
1149        amount_native: u64,
1150    ) -> Result<u64, PayError> {
1151        let (symbol, divisor) = token_asset(network, token).ok_or_else(|| {
1152            PayError::InvalidAmount(format!(
1153                "network '{network}' token '{token:?}' is unsupported for global-usd-cents limits"
1154            ))
1155        })?;
1156
1157        let quote = self.get_or_fetch_quote(symbol, "USD").await?;
1158
1159        // Warn if cached quote age exceeds 80% of its TTL.
1160        let ttl_ms = quote
1161            .expires_at_epoch_ms
1162            .saturating_sub(quote.fetched_at_epoch_ms);
1163        let age_ms = now_epoch_ms().saturating_sub(quote.fetched_at_epoch_ms);
1164        if ttl_ms > 0 && age_ms > ttl_ms * 4 / 5 {
1165            self.fx_stale_warned
1166                .store(true, std::sync::atomic::Ordering::Relaxed);
1167        }
1168
1169        let usd = (amount_native as f64 / divisor) * quote.price;
1170        if !usd.is_finite() || usd < 0f64 {
1171            return Err(PayError::InternalError(
1172                "invalid exchange-rate conversion result".to_string(),
1173            ));
1174        }
1175        Ok((usd * 100f64).round() as u64)
1176    }
1177
1178    async fn get_or_fetch_quote(
1179        &self,
1180        base: &str,
1181        quote: &str,
1182    ) -> Result<ExchangeRateQuote, PayError> {
1183        let pair = format!(
1184            "{}/{}",
1185            base.to_ascii_uppercase(),
1186            quote.to_ascii_uppercase()
1187        );
1188        let now = now_epoch_ms();
1189
1190        // Try cache — redb
1191        #[cfg(feature = "redb")]
1192        if let SpendBackend::Redb { .. } = &self.backend {
1193            let fx_db = self.open_exchange_rate_db()?;
1194            let read_txn = fx_db
1195                .begin_read()
1196                .map_err(|e| PayError::InternalError(format!("fx begin_read: {e}")))?;
1197            if let Ok(table) = read_txn.open_table(FX_QUOTE_BY_PAIR) {
1198                if let Some(entry) = table
1199                    .get(pair.as_str())
1200                    .map_err(|e| PayError::InternalError(format!("fx read quote: {e}")))?
1201                {
1202                    let cached: ExchangeRateQuote = decode(entry.value())?;
1203                    if cached.expires_at_epoch_ms > now {
1204                        return Ok(cached);
1205                    }
1206                }
1207            }
1208        }
1209
1210        // Try cache — postgres
1211        #[cfg(feature = "postgres")]
1212        if let SpendBackend::Postgres { pool } = &self.backend {
1213            let row: Option<(serde_json::Value,)> =
1214                sqlx::query_as("SELECT quote FROM exchange_rate_cache WHERE pair = $1")
1215                    .bind(&pair)
1216                    .fetch_optional(pool)
1217                    .await
1218                    .map_err(|e| PayError::InternalError(format!("pg fx read cache: {e}")))?;
1219            if let Some((quote_json,)) = row {
1220                let cached: ExchangeRateQuote = serde_json::from_value(quote_json)
1221                    .map_err(|e| PayError::InternalError(format!("pg fx parse cache: {e}")))?;
1222                if cached.expires_at_epoch_ms > now {
1223                    return Ok(cached);
1224                }
1225            }
1226        }
1227
1228        let (fetched_price, source_name) = self.fetch_exchange_rate_http(base, quote).await?;
1229        let ttl_s = self
1230            .exchange_rate
1231            .as_ref()
1232            .map(|cfg| cfg.ttl_s)
1233            .unwrap_or(300)
1234            .max(1);
1235        let new_quote = ExchangeRateQuote {
1236            pair: pair.clone(),
1237            source: source_name,
1238            price: fetched_price,
1239            fetched_at_epoch_ms: now,
1240            expires_at_epoch_ms: now.saturating_add(ttl_s.saturating_mul(1000)),
1241        };
1242
1243        // Write cache — redb
1244        #[cfg(feature = "redb")]
1245        if let SpendBackend::Redb { .. } = &self.backend {
1246            let fx_db = self.open_exchange_rate_db()?;
1247            let write_txn = fx_db
1248                .begin_write()
1249                .map_err(|e| PayError::InternalError(format!("fx begin_write: {e}")))?;
1250            let mut encoded_blobs: Vec<String> = Vec::new();
1251            {
1252                let mut table = write_txn
1253                    .open_table(FX_QUOTE_BY_PAIR)
1254                    .map_err(|e| PayError::InternalError(format!("fx open quote table: {e}")))?;
1255                encoded_blobs.push(encode(&new_quote)?);
1256                let encoded = encoded_blobs
1257                    .last()
1258                    .ok_or_else(|| PayError::InternalError("missing quote blob".to_string()))?;
1259                table
1260                    .insert(pair.as_str(), encoded.as_str())
1261                    .map_err(|e| PayError::InternalError(format!("fx insert quote: {e}")))?;
1262            }
1263            write_txn
1264                .commit()
1265                .map_err(|e| PayError::InternalError(format!("fx commit write: {e}")))?;
1266        }
1267
1268        // Write cache — postgres
1269        #[cfg(feature = "postgres")]
1270        if let SpendBackend::Postgres { pool } = &self.backend {
1271            let quote_json = serde_json::to_value(&new_quote)
1272                .map_err(|e| PayError::InternalError(format!("serialize fx quote: {e}")))?;
1273            let _ = sqlx::query(
1274                "INSERT INTO exchange_rate_cache (pair, quote) VALUES ($1, $2) \
1275                 ON CONFLICT (pair) DO UPDATE SET quote = $2",
1276            )
1277            .bind(&pair)
1278            .bind(&quote_json)
1279            .execute(pool)
1280            .await;
1281        }
1282
1283        Ok(new_quote)
1284    }
1285
1286    #[cfg(feature = "exchange-rate")]
1287    async fn fetch_exchange_rate_http(
1288        &self,
1289        base: &str,
1290        quote_currency: &str,
1291    ) -> Result<(f64, String), PayError> {
1292        let cfg = self.exchange_rate.as_ref().cloned().unwrap_or_default();
1293
1294        if cfg.sources.is_empty() {
1295            return Err(PayError::InvalidAmount(
1296                "exchange_rate.sources is empty — no exchange-rate API configured".to_string(),
1297            ));
1298        }
1299
1300        let client = reqwest::Client::new();
1301        let mut last_err = String::new();
1302
1303        for source in &cfg.sources {
1304            match fetch_from_source(&client, source, base, quote_currency).await {
1305                Ok(price) => return Ok((price, source.endpoint.clone())),
1306                Err(e) => {
1307                    last_err =
1308                        format!("{} ({}): {e}", source.endpoint, source.source_type.as_str());
1309                }
1310            }
1311        }
1312
1313        Err(PayError::NetworkError(format!(
1314            "all exchange-rate sources failed; last: {last_err}"
1315        )))
1316    }
1317
1318    #[cfg(not(feature = "exchange-rate"))]
1319    async fn fetch_exchange_rate_http(
1320        &self,
1321        _base: &str,
1322        _quote_currency: &str,
1323    ) -> Result<(f64, String), PayError> {
1324        Err(PayError::NotImplemented(
1325            "exchange-rate HTTP support is not built in this feature set".to_string(),
1326        ))
1327    }
1328}
1329
1330// ═══════════════════════════════════════════
1331// Helpers
1332// ═══════════════════════════════════════════
1333
1334fn now_epoch_ms() -> u64 {
1335    std::time::SystemTime::now()
1336        .duration_since(std::time::UNIX_EPOCH)
1337        .map(|d| d.as_millis() as u64)
1338        .unwrap_or(0)
1339}
1340
1341fn token_asset(network: &str, token: Option<&str>) -> Option<(&'static str, f64)> {
1342    match token.map(|t| t.to_ascii_lowercase()).as_deref() {
1343        Some("sol") => Some(("SOL", 1e9)),
1344        Some("eth") => Some(("ETH", 1e18)),
1345        Some("usdc" | "usdt") => Some(("USD", 1e6)),
1346        Some(_) => None,
1347        None => {
1348            let p = network.to_ascii_lowercase();
1349            if p.starts_with("ln") || p == "cashu" || p == "btc" {
1350                Some(("BTC", 1e8))
1351            } else {
1352                None
1353            }
1354        }
1355    }
1356}
1357
1358#[cfg(feature = "exchange-rate")]
1359fn extract_price_generic(value: &serde_json::Value) -> Option<f64> {
1360    value
1361        .get("price")
1362        .and_then(|v| v.as_f64())
1363        .or_else(|| value.get("rate").and_then(|v| v.as_f64()))
1364        .or_else(|| value.get("usd_per_base").and_then(|v| v.as_f64()))
1365        .or_else(|| {
1366            value
1367                .get("data")
1368                .and_then(|d| d.get("price"))
1369                .and_then(|v| v.as_f64())
1370        })
1371}
1372
1373#[cfg(feature = "exchange-rate")]
1374impl ExchangeRateSourceType {
1375    fn as_str(self) -> &'static str {
1376        match self {
1377            Self::Generic => "generic",
1378            Self::CoinGecko => "coingecko",
1379            Self::Kraken => "kraken",
1380        }
1381    }
1382}
1383
1384#[cfg(feature = "exchange-rate")]
1385fn coingecko_coin_id(symbol: &str) -> Option<&'static str> {
1386    match symbol.to_ascii_uppercase().as_str() {
1387        "BTC" => Some("bitcoin"),
1388        "SOL" => Some("solana"),
1389        "ETH" => Some("ethereum"),
1390        _ => None,
1391    }
1392}
1393
1394#[cfg(feature = "exchange-rate")]
1395fn kraken_pair(symbol: &str) -> Option<&'static str> {
1396    match symbol.to_ascii_uppercase().as_str() {
1397        "BTC" => Some("XBTUSD"),
1398        "SOL" => Some("SOLUSD"),
1399        "ETH" => Some("ETHUSD"),
1400        _ => None,
1401    }
1402}
1403
1404#[cfg(feature = "exchange-rate")]
1405async fn fetch_from_source(
1406    client: &reqwest::Client,
1407    source: &crate::types::ExchangeRateSource,
1408    base: &str,
1409    quote_currency: &str,
1410) -> Result<f64, String> {
1411    type PriceExtractor = Box<dyn Fn(&serde_json::Value) -> Option<f64> + Send>;
1412    let (url, extract_fn): (String, PriceExtractor) = match source.source_type {
1413        ExchangeRateSourceType::Kraken => {
1414            let pair = kraken_pair(base)
1415                .ok_or_else(|| format!("kraken: unsupported base asset '{base}'"))?;
1416            let url = format!("{}/0/public/Ticker?pair={pair}", source.endpoint);
1417            let pair_owned = pair.to_string();
1418            (
1419                url,
1420                Box::new(move |v: &serde_json::Value| {
1421                    let result = v.get("result")?;
1422                    let ticker = result
1423                        .get(&pair_owned)
1424                        .or_else(|| result.as_object().and_then(|m| m.values().next()))?;
1425                    let price_str = ticker.get("c")?.as_array()?.first()?.as_str()?;
1426                    price_str.parse::<f64>().ok()
1427                }),
1428            )
1429        }
1430        ExchangeRateSourceType::CoinGecko => {
1431            let coin_id = coingecko_coin_id(base)
1432                .ok_or_else(|| format!("coingecko: unsupported base asset '{base}'"))?;
1433            let vs = quote_currency.to_ascii_lowercase();
1434            let url = format!(
1435                "{}/simple/price?ids={coin_id}&vs_currencies={vs}",
1436                source.endpoint
1437            );
1438            let coin_id_owned = coin_id.to_string();
1439            let vs_owned = vs.clone();
1440            (
1441                url,
1442                Box::new(move |v: &serde_json::Value| {
1443                    v.get(&coin_id_owned)?.get(&vs_owned)?.as_f64()
1444                }),
1445            )
1446        }
1447        ExchangeRateSourceType::Generic => {
1448            let sep = if source.endpoint.contains('?') {
1449                '&'
1450            } else {
1451                '?'
1452            };
1453            let url = format!(
1454                "{}{sep}base={}&quote={}",
1455                source.endpoint,
1456                base.to_ascii_uppercase(),
1457                quote_currency.to_ascii_uppercase()
1458            );
1459            (url, Box::new(extract_price_generic))
1460        }
1461    };
1462
1463    let mut req = client.get(&url);
1464    if let Some(key) = &source.api_key {
1465        req = req.header("Authorization", format!("Bearer {key}"));
1466        req = req.header("X-Api-Key", key);
1467    }
1468
1469    let resp = req
1470        .send()
1471        .await
1472        .map_err(|e| format!("request failed: {e}"))?;
1473    if !resp.status().is_success() {
1474        return Err(format!("status {}", resp.status()));
1475    }
1476
1477    let value: serde_json::Value = resp
1478        .json()
1479        .await
1480        .map_err(|e| format!("parse failed: {e}"))?;
1481
1482    extract_fn(&value).ok_or_else(|| "could not extract price from response".to_string())
1483}
1484
1485#[cfg(feature = "redb")]
1486fn encode<T: Serialize>(value: &T) -> Result<String, PayError> {
1487    serde_json::to_string(value)
1488        .map_err(|e| PayError::InternalError(format!("spend encode failed: {e}")))
1489}
1490
1491#[cfg(feature = "redb")]
1492fn decode<T: DeserializeOwned>(encoded: &str) -> Result<T, PayError> {
1493    serde_json::from_str(encoded).map_err(|e| {
1494        let preview_len = encoded.len().min(48);
1495        let preview = &encoded[..preview_len];
1496        PayError::InternalError(format!(
1497            "spend decode failed (len={}, preview={}): {e}",
1498            encoded.len(),
1499            preview
1500        ))
1501    })
1502}
1503
1504#[cfg(feature = "redb")]
1505fn prepend_err(prefix: &str, err: PayError) -> PayError {
1506    match err {
1507        PayError::InternalError(msg) => PayError::InternalError(format!("{prefix}: {msg}")),
1508        other => other,
1509    }
1510}
1511
1512fn generate_rule_identifier() -> Result<String, PayError> {
1513    let mut buf = [0u8; 4];
1514    getrandom::fill(&mut buf).map_err(|e| PayError::InternalError(format!("rng failed: {e}")))?;
1515    Ok(format!("r_{}", hex::encode(buf)))
1516}
1517
1518fn validate_limit(
1519    rule: &SpendLimit,
1520    exchange_rate: Option<&ExchangeRateConfig>,
1521) -> Result<(), PayError> {
1522    if rule.window_s == 0 {
1523        return Err(PayError::InvalidAmount(
1524            "limit rule has zero window_s".to_string(),
1525        ));
1526    }
1527    if rule.max_spend == 0 {
1528        return Err(PayError::InvalidAmount(
1529            "limit rule has zero max_spend".to_string(),
1530        ));
1531    }
1532
1533    match rule.scope {
1534        SpendScope::GlobalUsdCents => {
1535            if rule.network.is_some() || rule.wallet.is_some() {
1536                return Err(PayError::InvalidAmount(
1537                    "scope=global-usd-cents cannot set network/wallet".to_string(),
1538                ));
1539            }
1540            if rule.token.is_some() {
1541                return Err(PayError::InvalidAmount(
1542                    "scope=global-usd-cents cannot set token".to_string(),
1543                ));
1544            }
1545        }
1546        SpendScope::Network => {
1547            if rule.network.as_deref().unwrap_or("").trim().is_empty() {
1548                return Err(PayError::InvalidAmount(
1549                    "scope=network requires network".to_string(),
1550                ));
1551            }
1552            if rule.wallet.is_some() {
1553                return Err(PayError::InvalidAmount(
1554                    "scope=network cannot set wallet".to_string(),
1555                ));
1556            }
1557        }
1558        SpendScope::Wallet => {
1559            if rule.network.as_deref().unwrap_or("").trim().is_empty() {
1560                return Err(PayError::InvalidAmount(
1561                    "scope=wallet requires network".to_string(),
1562                ));
1563            }
1564            if rule.wallet.as_deref().unwrap_or("").trim().is_empty() {
1565                return Err(PayError::InvalidAmount(
1566                    "scope=wallet requires wallet".to_string(),
1567                ));
1568            }
1569        }
1570    }
1571
1572    if rule.scope == SpendScope::GlobalUsdCents && exchange_rate.is_none() {
1573        return Err(PayError::InvalidAmount(
1574            "scope=global-usd-cents requires config.exchange_rate".to_string(),
1575        ));
1576    }
1577    Ok(())
1578}
1579
1580#[cfg(feature = "redb")]
1581fn load_rules(read_txn: &redb::ReadTransaction) -> Result<Vec<SpendLimit>, PayError> {
1582    let Ok(rule_table) = read_txn.open_table(RULE_BY_ID) else {
1583        return Ok(vec![]);
1584    };
1585    rule_table
1586        .iter()
1587        .map_err(|e| PayError::InternalError(format!("spend iterate rules: {e}")))?
1588        .map(|entry| {
1589            let (_k, v) = entry
1590                .map_err(|e| PayError::InternalError(format!("spend read rule entry: {e}")))?;
1591            decode::<SpendLimit>(v.value()).map_err(|e| prepend_err("spend decode rule", e))
1592        })
1593        .collect()
1594}
1595
1596#[cfg(feature = "redb")]
1597fn load_reservations(read_txn: &redb::ReadTransaction) -> Result<Vec<SpendReservation>, PayError> {
1598    let Ok(table) = read_txn.open_table(RESERVATION_BY_ID) else {
1599        return Ok(vec![]);
1600    };
1601    table
1602        .iter()
1603        .map_err(|e| PayError::InternalError(format!("spend iterate reservations: {e}")))?
1604        .map(|entry| {
1605            let (_k, v) = entry
1606                .map_err(|e| PayError::InternalError(format!("spend read reservation: {e}")))?;
1607            decode::<SpendReservation>(v.value())
1608                .map_err(|e| prepend_err("spend decode reservation", e))
1609        })
1610        .collect()
1611}
1612
1613#[cfg(feature = "redb")]
1614fn expire_pending(_table: &mut redb::Table<u64, &str>, _now_ms: u64) -> Result<(), PayError> {
1615    Ok(())
1616}
1617
1618fn amount_for_rule(
1619    _rule: &SpendLimit,
1620    amount_native: u64,
1621    amount_usd_cents: Option<u64>,
1622    use_usd: bool,
1623) -> Result<u64, PayError> {
1624    if use_usd {
1625        amount_usd_cents.ok_or_else(|| {
1626            PayError::InternalError("missing USD amount for non-native unit rule".to_string())
1627        })
1628    } else {
1629        Ok(amount_native)
1630    }
1631}
1632
1633fn reservation_active_for_window(r: &SpendReservation, now_ms: u64) -> bool {
1634    match r.status {
1635        ReservationStatus::Confirmed => true,
1636        ReservationStatus::Pending => r.expires_at_epoch_ms > now_ms,
1637        ReservationStatus::Cancelled | ReservationStatus::Expired => false,
1638    }
1639}
1640
1641fn rule_matches_context(
1642    rule: &SpendLimit,
1643    network: &str,
1644    wallet: Option<&str>,
1645    token: Option<&str>,
1646) -> bool {
1647    if let Some(rule_token) = &rule.token {
1648        match token {
1649            Some(ctx_token) if ctx_token.eq_ignore_ascii_case(rule_token) => {}
1650            _ => return false,
1651        }
1652    }
1653    match rule.scope {
1654        SpendScope::GlobalUsdCents => true,
1655        SpendScope::Network => rule.network.as_deref() == Some(network),
1656        SpendScope::Wallet => {
1657            rule.network.as_deref() == Some(network) && rule.wallet.as_deref() == wallet
1658        }
1659    }
1660}
1661
1662fn scope_key(rule: &SpendLimit) -> String {
1663    match rule.scope {
1664        SpendScope::GlobalUsdCents => "global-usd-cents".to_string(),
1665        SpendScope::Network => rule.network.clone().unwrap_or_default(),
1666        SpendScope::Wallet => format!(
1667            "{}/{}",
1668            rule.network.clone().unwrap_or_default(),
1669            rule.wallet.clone().unwrap_or_default()
1670        ),
1671    }
1672}
1673
1674fn spent_in_window(
1675    rule: &SpendLimit,
1676    reservations: &[SpendReservation],
1677    now_ms: u64,
1678    use_usd: bool,
1679) -> Result<(u64, Option<u64>), PayError> {
1680    let window_ms = rule.window_s.saturating_mul(1000);
1681    let cutoff = now_ms.saturating_sub(window_ms);
1682
1683    let mut spent = 0u64;
1684    let mut oldest: Option<u64> = None;
1685
1686    for r in reservations {
1687        if !reservation_active_for_window(r, now_ms) {
1688            continue;
1689        }
1690        if r.created_at_epoch_ms < cutoff {
1691            continue;
1692        }
1693        if !rule_matches_context(rule, &r.network, r.wallet.as_deref(), r.token.as_deref()) {
1694            continue;
1695        }
1696
1697        let amount = if use_usd {
1698            r.amount_usd_cents.ok_or_else(|| {
1699                PayError::InternalError("reservation missing USD amount".to_string())
1700            })?
1701        } else {
1702            r.amount_native
1703        };
1704        spent = spent.saturating_add(amount);
1705        oldest = Some(oldest.map_or(r.created_at_epoch_ms, |v| v.min(r.created_at_epoch_ms)));
1706    }
1707
1708    Ok((spent, oldest))
1709}
1710
1711#[cfg(feature = "redb")]
1712fn next_counter(write_txn: &redb::WriteTransaction, key: &str) -> Result<u64, PayError> {
1713    let mut meta = write_txn
1714        .open_table(META_COUNTER)
1715        .map_err(|e| PayError::InternalError(format!("spend open meta table: {e}")))?;
1716    let current = match meta
1717        .get(key)
1718        .map_err(|e| PayError::InternalError(format!("spend read counter {key}: {e}")))?
1719    {
1720        Some(v) => v.value(),
1721        None => 0,
1722    };
1723    let next = current.saturating_add(1);
1724    meta.insert(key, next)
1725        .map_err(|e| PayError::InternalError(format!("spend write counter {key}: {e}")))?;
1726    Ok(next)
1727}
1728
1729#[cfg(test)]
1730mod tests {
1731    use super::*;
1732
1733    fn make_limit(scope: SpendScope, network: Option<&str>, wallet: Option<&str>) -> SpendLimit {
1734        SpendLimit {
1735            rule_id: None,
1736            scope,
1737            network: network.map(|s| s.to_string()),
1738            wallet: wallet.map(|s| s.to_string()),
1739            window_s: 3600,
1740            max_spend: 1000,
1741            token: None,
1742        }
1743    }
1744
1745    #[cfg(feature = "redb")]
1746    #[tokio::test]
1747    async fn provider_limit_reserve_and_confirm() {
1748        let tmp = tempfile::tempdir().unwrap();
1749        let ledger = SpendLedger::new(tmp.path().to_str().unwrap(), None);
1750
1751        ledger
1752            .set_limits(&[make_limit(SpendScope::Network, Some("cashu"), None)])
1753            .await
1754            .unwrap();
1755
1756        let ctx = SpendContext {
1757            network: "cashu".to_string(),
1758            wallet: Some("w_01".to_string()),
1759            amount_native: 400,
1760            token: None,
1761        };
1762        let r1 = ledger.reserve("op_1", &ctx).await.unwrap();
1763        ledger.confirm(r1).await.unwrap();
1764
1765        let r2 = ledger.reserve("op_2", &ctx).await.unwrap();
1766        let err = ledger.reserve("op_3", &ctx).await.unwrap_err();
1767        assert!(matches!(err, PayError::LimitExceeded { .. }));
1768
1769        ledger.cancel(r2).await.unwrap();
1770    }
1771
1772    #[cfg(feature = "redb")]
1773    #[tokio::test]
1774    async fn wallet_scope_requires_wallet_context() {
1775        let tmp = tempfile::tempdir().unwrap();
1776        let ledger = SpendLedger::new(tmp.path().to_str().unwrap(), None);
1777
1778        ledger
1779            .set_limits(&[make_limit(SpendScope::Wallet, Some("cashu"), Some("w_abc"))])
1780            .await
1781            .unwrap();
1782
1783        let ctx = SpendContext {
1784            network: "cashu".to_string(),
1785            wallet: None,
1786            amount_native: 1,
1787            token: None,
1788        };
1789        let err = ledger.reserve("op_1", &ctx).await.unwrap_err();
1790        assert!(matches!(err, PayError::InvalidAmount(_)));
1791    }
1792
1793    #[tokio::test]
1794    async fn global_usd_cents_scope_requires_exchange_rate_config() {
1795        let tmp = tempfile::tempdir().unwrap();
1796        let ledger = SpendLedger::new(tmp.path().to_str().unwrap(), None);
1797
1798        let err = ledger
1799            .set_limits(&[SpendLimit {
1800                rule_id: None,
1801                scope: SpendScope::GlobalUsdCents,
1802                network: None,
1803                wallet: None,
1804                window_s: 3600,
1805                max_spend: 100,
1806                token: None,
1807            }])
1808            .await
1809            .unwrap_err();
1810
1811        assert!(matches!(err, PayError::InvalidAmount(_)));
1812    }
1813
1814    #[cfg(feature = "redb")]
1815    #[tokio::test]
1816    async fn network_scope_native_token_ok_without_exchange_rate() {
1817        let tmp = tempfile::tempdir().unwrap();
1818        let ledger = SpendLedger::new(tmp.path().to_str().unwrap(), None);
1819
1820        ledger
1821            .set_limits(&[SpendLimit {
1822                rule_id: None,
1823                scope: SpendScope::Network,
1824                network: Some("cashu".to_string()),
1825                wallet: None,
1826                window_s: 3600,
1827                max_spend: 100,
1828                token: None,
1829            }])
1830            .await
1831            .expect("network scope should not require exchange_rate");
1832    }
1833}