Skip to main content

agent_first_pay/spend/
mod.rs

1pub mod tokens;
2
3use crate::provider::PayError;
4use crate::types::{
5    ExchangeRateConfig, ExchangeRateSourceType, SpendLimit, SpendLimitStatus, SpendScope,
6};
7use serde::de::DeserializeOwned;
8use serde::{Deserialize, Serialize};
9use tokio::sync::Mutex;
10
11#[cfg(feature = "redb")]
12use crate::store::db;
13#[cfg(feature = "redb")]
14use redb::{Database, ReadableDatabase, ReadableTable, TableDefinition};
15#[cfg(feature = "redb")]
16use std::path::{Path, PathBuf};
17
18#[cfg(feature = "redb")]
19const META_COUNTER: TableDefinition<&str, u64> = TableDefinition::new("meta_counter");
20#[cfg(feature = "redb")]
21const RULE_BY_ID: TableDefinition<&str, &str> = TableDefinition::new("rule_by_id_v3");
22#[cfg(feature = "redb")]
23const RESERVATION_BY_ID: TableDefinition<u64, &str> = TableDefinition::new("reservation_by_id");
24#[cfg(feature = "redb")]
25const RESERVATION_ID_BY_OP_ID: TableDefinition<&str, u64> =
26    TableDefinition::new("reservation_id_by_op_id");
27#[cfg(feature = "redb")]
28const SPEND_EVENT_BY_ID: TableDefinition<u64, &str> = TableDefinition::new("spend_event_by_id");
29#[cfg(feature = "redb")]
30const FX_QUOTE_BY_PAIR: TableDefinition<&str, &str> = TableDefinition::new("quote_by_pair");
31#[cfg(feature = "redb")]
32const NEXT_RESERVATION_ID_KEY: &str = "next_reservation_id";
33#[cfg(feature = "redb")]
34const NEXT_EVENT_ID_KEY: &str = "next_event_id";
35#[cfg(feature = "redb")]
36const SPEND_VERSION: u64 = 1;
37#[cfg(feature = "redb")]
38const FX_CACHE_VERSION: u64 = 1;
39
40#[derive(Debug, Clone)]
41pub struct SpendContext {
42    pub network: String,
43    pub wallet: Option<String>,
44    pub amount_native: u64,
45    pub token: Option<String>,
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
49enum ReservationStatus {
50    Pending,
51    Confirmed,
52    Cancelled,
53    Expired,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
57struct SpendReservation {
58    reservation_id: u64,
59    op_id: String,
60    network: String,
61    wallet: Option<String>,
62    #[serde(default)]
63    token: Option<String>,
64    amount_native: u64,
65    amount_usd_cents: Option<u64>,
66    status: ReservationStatus,
67    created_at_epoch_ms: u64,
68    expires_at_epoch_ms: u64,
69    finalized_at_epoch_ms: Option<u64>,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
73struct SpendEvent {
74    event_id: u64,
75    reservation_id: u64,
76    op_id: String,
77    network: String,
78    wallet: Option<String>,
79    #[serde(default)]
80    token: Option<String>,
81    amount_native: u64,
82    amount_usd_cents: Option<u64>,
83    created_at_epoch_ms: u64,
84    confirmed_at_epoch_ms: u64,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
88struct ExchangeRateQuote {
89    pair: String,
90    source: String,
91    price: f64,
92    fetched_at_epoch_ms: u64,
93    expires_at_epoch_ms: u64,
94}
95
96// ═══════════════════════════════════════════
97// SpendBackend
98// ═══════════════════════════════════════════
99
100#[allow(dead_code)] // None variant used when neither redb nor postgres features are enabled
101enum SpendBackend {
102    #[cfg(feature = "redb")]
103    Redb {
104        data_dir: String,
105    },
106    #[cfg(feature = "postgres")]
107    Postgres {
108        pool: sqlx::PgPool,
109    },
110    None,
111}
112
113// ═══════════════════════════════════════════
114// SpendLedger
115// ═══════════════════════════════════════════
116
117pub struct SpendLedger {
118    backend: SpendBackend,
119    exchange_rate: Option<ExchangeRateConfig>,
120    mu: Mutex<()>,
121}
122
123impl SpendLedger {
124    pub fn new(data_dir: &str, exchange_rate: Option<ExchangeRateConfig>) -> Self {
125        #[cfg(feature = "redb")]
126        let backend = SpendBackend::Redb {
127            data_dir: data_dir.to_string(),
128        };
129        #[cfg(not(feature = "redb"))]
130        let backend = {
131            let _ = data_dir;
132            SpendBackend::None
133        };
134        Self {
135            backend,
136            exchange_rate,
137            mu: Mutex::new(()),
138        }
139    }
140
141    #[cfg(feature = "postgres")]
142    pub fn new_postgres(pool: sqlx::PgPool, exchange_rate: Option<ExchangeRateConfig>) -> Self {
143        Self {
144            backend: SpendBackend::Postgres { pool },
145            exchange_rate,
146            mu: Mutex::new(()),
147        }
148    }
149
150    /// Add a single spend limit rule. Generates and assigns a rule_id, returns it.
151    pub async fn add_limit(&self, limit: &mut SpendLimit) -> Result<String, PayError> {
152        validate_limit(limit, self.exchange_rate.as_ref())?;
153
154        let _guard = self.mu.lock().await;
155
156        match &self.backend {
157            #[cfg(feature = "redb")]
158            SpendBackend::Redb { .. } => self.add_limit_redb(limit),
159            #[cfg(feature = "postgres")]
160            SpendBackend::Postgres { .. } => self.add_limit_postgres(limit).await,
161            SpendBackend::None => Err(PayError::NotImplemented(
162                "no storage backend for spend limits".to_string(),
163            )),
164        }
165    }
166
167    /// Remove a spend limit rule by its rule_id.
168    pub async fn remove_limit(&self, rule_id: &str) -> Result<(), PayError> {
169        let _guard = self.mu.lock().await;
170
171        match &self.backend {
172            #[cfg(feature = "redb")]
173            SpendBackend::Redb { .. } => self.remove_limit_redb(rule_id),
174            #[cfg(feature = "postgres")]
175            SpendBackend::Postgres { .. } => self.remove_limit_postgres(rule_id).await,
176            SpendBackend::None => Err(PayError::NotImplemented(
177                "no storage backend for spend limits".to_string(),
178            )),
179        }
180    }
181
182    /// Replace all spend limits (used by config patch / pipe mode).
183    pub async fn set_limits(&self, limits: &[SpendLimit]) -> Result<(), PayError> {
184        for limit in limits {
185            validate_limit(limit, self.exchange_rate.as_ref())?;
186        }
187
188        let _guard = self.mu.lock().await;
189
190        match &self.backend {
191            #[cfg(feature = "redb")]
192            SpendBackend::Redb { .. } => self.set_limits_redb(limits),
193            #[cfg(feature = "postgres")]
194            SpendBackend::Postgres { .. } => self.set_limits_postgres(limits).await,
195            SpendBackend::None => Err(PayError::NotImplemented(
196                "no storage backend for spend limits".to_string(),
197            )),
198        }
199    }
200
201    /// Compute current status for all limits.
202    pub async fn get_status(&self) -> Result<Vec<SpendLimitStatus>, PayError> {
203        let _guard = self.mu.lock().await;
204
205        match &self.backend {
206            #[cfg(feature = "redb")]
207            SpendBackend::Redb { .. } => self.get_status_redb(),
208            #[cfg(feature = "postgres")]
209            SpendBackend::Postgres { .. } => self.get_status_postgres().await,
210            SpendBackend::None => Ok(Vec::new()),
211        }
212    }
213
214    /// Reserve spend against all matching limits, returns reservation id.
215    pub async fn reserve(&self, op_id: &str, ctx: &SpendContext) -> Result<u64, PayError> {
216        if op_id.trim().is_empty() {
217            return Err(PayError::InvalidAmount("op_id cannot be empty".to_string()));
218        }
219        if ctx.network.trim().is_empty() {
220            return Err(PayError::InvalidAmount(
221                "network cannot be empty for spend check".to_string(),
222            ));
223        }
224
225        let _guard = self.mu.lock().await;
226
227        match &self.backend {
228            #[cfg(feature = "redb")]
229            SpendBackend::Redb { .. } => self.reserve_redb(op_id, ctx).await,
230            #[cfg(feature = "postgres")]
231            SpendBackend::Postgres { .. } => self.reserve_postgres(op_id, ctx).await,
232            SpendBackend::None => Err(PayError::NotImplemented(
233                "no storage backend for spend limits".to_string(),
234            )),
235        }
236    }
237
238    pub async fn confirm(&self, reservation_id: u64) -> Result<(), PayError> {
239        let _guard = self.mu.lock().await;
240
241        match &self.backend {
242            #[cfg(feature = "redb")]
243            SpendBackend::Redb { .. } => self.confirm_redb(reservation_id),
244            #[cfg(feature = "postgres")]
245            SpendBackend::Postgres { .. } => self.confirm_postgres(reservation_id).await,
246            SpendBackend::None => Err(PayError::NotImplemented(
247                "no storage backend for spend limits".to_string(),
248            )),
249        }
250    }
251
252    pub async fn cancel(&self, reservation_id: u64) -> Result<(), PayError> {
253        let _guard = self.mu.lock().await;
254
255        match &self.backend {
256            #[cfg(feature = "redb")]
257            SpendBackend::Redb { .. } => self.cancel_redb(reservation_id),
258            #[cfg(feature = "postgres")]
259            SpendBackend::Postgres { .. } => self.cancel_postgres(reservation_id).await,
260            SpendBackend::None => Ok(()),
261        }
262    }
263}
264
265// ═══════════════════════════════════════════
266// Redb backend implementation
267// ═══════════════════════════════════════════
268
269#[cfg(feature = "redb")]
270impl SpendLedger {
271    fn spend_db_path(&self) -> PathBuf {
272        match &self.backend {
273            SpendBackend::Redb { data_dir } => Path::new(data_dir).join("spend").join("spend.redb"),
274            #[allow(unreachable_patterns)]
275            _ => PathBuf::new(),
276        }
277    }
278
279    fn exchange_rate_db_path(&self) -> PathBuf {
280        match &self.backend {
281            SpendBackend::Redb { data_dir } => Path::new(data_dir)
282                .join("spend")
283                .join("exchange-rate-cache.redb"),
284            #[allow(unreachable_patterns)]
285            _ => PathBuf::new(),
286        }
287    }
288
289    fn open_spend_db(&self) -> Result<Database, PayError> {
290        db::open_and_migrate(
291            &self.spend_db_path(),
292            SPEND_VERSION,
293            &[
294                // v0 → v1: no data migration, just stamp version
295                &|_db: &Database| Ok(()),
296            ],
297        )
298    }
299
300    fn open_exchange_rate_db(&self) -> Result<Database, PayError> {
301        db::open_and_migrate(
302            &self.exchange_rate_db_path(),
303            FX_CACHE_VERSION,
304            &[
305                // v0 → v1: no data migration, just stamp version
306                &|_db: &Database| Ok(()),
307            ],
308        )
309    }
310
311    fn add_limit_redb(&self, limit: &mut SpendLimit) -> Result<String, PayError> {
312        let db = self.open_spend_db()?;
313        let rule_id = generate_rule_identifier()?;
314        limit.rule_id = Some(rule_id.clone());
315        let encoded = encode(limit)?;
316        let write_txn = db
317            .begin_write()
318            .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
319        {
320            let mut rule_table = write_txn
321                .open_table(RULE_BY_ID)
322                .map_err(|e| PayError::InternalError(format!("spend open rule table: {e}")))?;
323            rule_table
324                .insert(rule_id.as_str(), encoded.as_str())
325                .map_err(|e| PayError::InternalError(format!("spend insert rule: {e}")))?;
326        }
327        write_txn
328            .commit()
329            .map_err(|e| PayError::InternalError(format!("spend commit add_limit: {e}")))?;
330        Ok(rule_id)
331    }
332
333    fn remove_limit_redb(&self, rule_id: &str) -> Result<(), PayError> {
334        let db = self.open_spend_db()?;
335        let write_txn = db
336            .begin_write()
337            .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
338        {
339            let mut rule_table = write_txn
340                .open_table(RULE_BY_ID)
341                .map_err(|e| PayError::InternalError(format!("spend open rule table: {e}")))?;
342            let existed = rule_table
343                .remove(rule_id)
344                .map_err(|e| PayError::InternalError(format!("spend remove rule: {e}")))?;
345            if existed.is_none() {
346                return Err(PayError::InvalidAmount(format!(
347                    "rule_id '{rule_id}' not found"
348                )));
349            }
350        }
351        write_txn
352            .commit()
353            .map_err(|e| PayError::InternalError(format!("spend commit remove_limit: {e}")))
354    }
355
356    fn set_limits_redb(&self, limits: &[SpendLimit]) -> Result<(), PayError> {
357        let db = self.open_spend_db()?;
358        let write_txn = db
359            .begin_write()
360            .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
361        {
362            let mut rule_table = write_txn
363                .open_table(RULE_BY_ID)
364                .map_err(|e| PayError::InternalError(format!("spend open rule table: {e}")))?;
365            // Clear existing rules
366            let existing_ids = rule_table
367                .iter()
368                .map_err(|e| PayError::InternalError(format!("spend iterate rules: {e}")))?
369                .map(|entry| {
370                    entry
371                        .map(|(k, _)| k.value().to_string())
372                        .map_err(|e| PayError::InternalError(format!("spend read rule key: {e}")))
373                })
374                .collect::<Result<Vec<_>, _>>()?;
375            for rid in existing_ids {
376                rule_table
377                    .remove(rid.as_str())
378                    .map_err(|e| PayError::InternalError(format!("spend remove rule: {e}")))?;
379            }
380
381            // Insert new rules with generated IDs
382            for limit in limits {
383                let mut rule = limit.clone();
384                let rid = generate_rule_identifier()?;
385                rule.rule_id = Some(rid.clone());
386                let encoded = encode(&rule)?;
387                rule_table
388                    .insert(rid.as_str(), encoded.as_str())
389                    .map_err(|e| PayError::InternalError(format!("spend insert rule: {e}")))?;
390            }
391        }
392        write_txn
393            .commit()
394            .map_err(|e| PayError::InternalError(format!("spend commit set_limits: {e}")))
395    }
396
397    fn get_status_redb(&self) -> Result<Vec<SpendLimitStatus>, PayError> {
398        let db = self.open_spend_db()?;
399        let read_txn = db
400            .begin_read()
401            .map_err(|e| PayError::InternalError(format!("spend begin_read: {e}")))?;
402        let rules = load_rules(&read_txn)?;
403        let reservations = load_reservations(&read_txn)?;
404        let now = now_epoch_ms();
405        let mut out = Vec::with_capacity(rules.len());
406        for rule in rules {
407            let use_usd = rule.scope == SpendScope::GlobalUsdCents;
408            let (spent, oldest_ts) = spent_in_window(&rule, &reservations, now, use_usd)?;
409            let remaining = rule.max_spend.saturating_sub(spent);
410            let window_ms = rule.window_s.saturating_mul(1000);
411            let window_reset_s = oldest_ts
412                .map(|oldest| (oldest.saturating_add(window_ms)).saturating_sub(now) / 1000)
413                .unwrap_or(0);
414            out.push(SpendLimitStatus {
415                rule_id: rule.rule_id.clone().unwrap_or_default(),
416                scope: rule.scope,
417                network: rule.network.clone(),
418                wallet: rule.wallet.clone(),
419                window_s: rule.window_s,
420                max_spend: rule.max_spend,
421                spent,
422                remaining,
423                token: rule.token.clone(),
424                window_reset_s,
425            });
426        }
427        Ok(out)
428    }
429
430    async fn reserve_redb(&self, op_id: &str, ctx: &SpendContext) -> Result<u64, PayError> {
431        let now = now_epoch_ms();
432        let db = self.open_spend_db()?;
433
434        let read_txn = db
435            .begin_read()
436            .map_err(|e| PayError::InternalError(format!("spend begin_read: {e}")))?;
437        let rules = load_rules(&read_txn)?;
438
439        if rules.iter().any(|r| {
440            r.scope == SpendScope::Wallet
441                && r.network.as_deref() == Some(ctx.network.as_str())
442                && ctx.wallet.is_none()
443        }) {
444            return Err(PayError::InvalidAmount(
445                "wallet-scoped limits require an explicit wallet".to_string(),
446            ));
447        }
448
449        // GlobalUsdCents scope needs USD conversion
450        let needs_usd = rules.iter().any(|r| r.scope == SpendScope::GlobalUsdCents);
451        let amount_usd_cents = if needs_usd {
452            Some(
453                self.amount_to_usd_cents(&ctx.network, ctx.token.as_deref(), ctx.amount_native)
454                    .await?,
455            )
456        } else {
457            None
458        };
459
460        let write_txn = db
461            .begin_write()
462            .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
463
464        let mut encoded_blobs: Vec<String> = Vec::new();
465        let reservation_id = {
466            let mut reservation_index =
467                write_txn.open_table(RESERVATION_ID_BY_OP_ID).map_err(|e| {
468                    PayError::InternalError(format!("spend open reservation op index: {e}"))
469                })?;
470            if let Some(existing) = reservation_index
471                .get(op_id)
472                .map_err(|e| PayError::InternalError(format!("spend read op index: {e}")))?
473            {
474                let existing_id = existing.value();
475                return Ok(existing_id);
476            }
477
478            let mut reservation_table = write_txn.open_table(RESERVATION_BY_ID).map_err(|e| {
479                PayError::InternalError(format!("spend open reservation table: {e}"))
480            })?;
481
482            expire_pending(&mut reservation_table, now)?;
483
484            let reservations = reservation_table
485                .iter()
486                .map_err(|e| PayError::InternalError(format!("spend iterate reservations: {e}")))?
487                .map(|entry| {
488                    let (_k, v) = entry.map_err(|e| {
489                        PayError::InternalError(format!("spend read reservation: {e}"))
490                    })?;
491                    decode::<SpendReservation>(v.value())
492                        .map_err(|e| prepend_err("spend decode reservation", e))
493                })
494                .collect::<Result<Vec<_>, _>>()?;
495
496            for rule in rules.iter() {
497                if !rule_matches_context(
498                    rule,
499                    &ctx.network,
500                    ctx.wallet.as_deref(),
501                    ctx.token.as_deref(),
502                ) {
503                    continue;
504                }
505
506                let use_usd = rule.scope == SpendScope::GlobalUsdCents;
507                let candidate_amount =
508                    amount_for_rule(rule, ctx.amount_native, amount_usd_cents, use_usd)?;
509                let (spent, oldest_ts) = spent_in_window(rule, &reservations, now, use_usd)?;
510                if spent.saturating_add(candidate_amount) > rule.max_spend {
511                    let window_ms = rule.window_s.saturating_mul(1000);
512                    let remaining_s = oldest_ts
513                        .map(|oldest| (oldest.saturating_add(window_ms)).saturating_sub(now) / 1000)
514                        .unwrap_or(0);
515
516                    return Err(PayError::LimitExceeded {
517                        rule_id: rule.rule_id.clone().unwrap_or_default(),
518                        scope: rule.scope,
519                        scope_key: scope_key(rule),
520                        spent,
521                        max_spend: rule.max_spend,
522                        token: rule.token.clone(),
523                        remaining_s,
524                        origin: None,
525                    });
526                }
527            }
528
529            let reservation_id = next_counter(&write_txn, NEXT_RESERVATION_ID_KEY)?;
530            let reservation = SpendReservation {
531                reservation_id,
532                op_id: op_id.to_string(),
533                network: ctx.network.clone(),
534                wallet: ctx.wallet.clone(),
535                token: ctx.token.clone(),
536                amount_native: ctx.amount_native,
537                amount_usd_cents,
538                status: ReservationStatus::Pending,
539                created_at_epoch_ms: now,
540                expires_at_epoch_ms: now.saturating_add(300_000),
541                finalized_at_epoch_ms: None,
542            };
543            encoded_blobs.push(encode(&reservation)?);
544            let encoded = encoded_blobs
545                .last()
546                .ok_or_else(|| PayError::InternalError("missing reservation blob".to_string()))?;
547            reservation_table
548                .insert(reservation_id, encoded.as_str())
549                .map_err(|e| PayError::InternalError(format!("spend insert reservation: {e}")))?;
550            reservation_index
551                .insert(op_id, reservation_id)
552                .map_err(|e| PayError::InternalError(format!("spend insert op index: {e}")))?;
553            reservation_id
554        };
555
556        write_txn
557            .commit()
558            .map_err(|e| PayError::InternalError(format!("spend commit reserve: {e}")))?;
559        Ok(reservation_id)
560    }
561
562    fn confirm_redb(&self, reservation_id: u64) -> Result<(), PayError> {
563        let db = self.open_spend_db()?;
564        let now = now_epoch_ms();
565
566        let write_txn = db
567            .begin_write()
568            .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
569
570        let mut encoded_blobs: Vec<String> = Vec::new();
571        {
572            let mut reservation_table = write_txn.open_table(RESERVATION_BY_ID).map_err(|e| {
573                PayError::InternalError(format!("spend open reservation table: {e}"))
574            })?;
575            let Some(existing_bytes) = reservation_table
576                .get(reservation_id)
577                .map_err(|e| PayError::InternalError(format!("spend read reservation: {e}")))?
578                .map(|g| g.value().to_string())
579            else {
580                return Err(PayError::InternalError(format!(
581                    "reservation {reservation_id} not found"
582                )));
583            };
584
585            let mut reservation: SpendReservation = decode(&existing_bytes)?;
586            if !matches!(reservation.status, ReservationStatus::Pending) {
587                return Ok(());
588            }
589
590            reservation.status = ReservationStatus::Confirmed;
591            reservation.finalized_at_epoch_ms = Some(now);
592            encoded_blobs.push(encode(&reservation)?);
593            let encoded = encoded_blobs
594                .last()
595                .ok_or_else(|| PayError::InternalError("missing reservation blob".to_string()))?;
596            reservation_table
597                .insert(reservation_id, encoded.as_str())
598                .map_err(|e| PayError::InternalError(format!("spend update reservation: {e}")))?;
599
600            let mut events = write_txn
601                .open_table(SPEND_EVENT_BY_ID)
602                .map_err(|e| PayError::InternalError(format!("spend open event table: {e}")))?;
603            let event_id = next_counter(&write_txn, NEXT_EVENT_ID_KEY)?;
604            let event = SpendEvent {
605                event_id,
606                reservation_id,
607                op_id: reservation.op_id,
608                network: reservation.network,
609                wallet: reservation.wallet,
610                token: reservation.token,
611                amount_native: reservation.amount_native,
612                amount_usd_cents: reservation.amount_usd_cents,
613                created_at_epoch_ms: reservation.created_at_epoch_ms,
614                confirmed_at_epoch_ms: now,
615            };
616            encoded_blobs.push(encode(&event)?);
617            let encoded_event = encoded_blobs
618                .last()
619                .ok_or_else(|| PayError::InternalError("missing event blob".to_string()))?;
620            events
621                .insert(event_id, encoded_event.as_str())
622                .map_err(|e| PayError::InternalError(format!("spend insert event: {e}")))?;
623        }
624
625        write_txn
626            .commit()
627            .map_err(|e| PayError::InternalError(format!("spend commit confirm: {e}")))
628    }
629
630    fn cancel_redb(&self, reservation_id: u64) -> Result<(), PayError> {
631        let db = self.open_spend_db()?;
632        let now = now_epoch_ms();
633
634        let write_txn = db
635            .begin_write()
636            .map_err(|e| PayError::InternalError(format!("spend begin_write: {e}")))?;
637
638        let mut encoded_blobs: Vec<String> = Vec::new();
639        {
640            let mut reservation_table = write_txn.open_table(RESERVATION_BY_ID).map_err(|e| {
641                PayError::InternalError(format!("spend open reservation table: {e}"))
642            })?;
643            let existing = reservation_table
644                .get(reservation_id)
645                .map_err(|e| PayError::InternalError(format!("spend read reservation: {e}")))?;
646            let existing_bytes = existing.map(|g| g.value().to_string());
647            if let Some(existing_bytes) = existing_bytes {
648                let mut reservation: SpendReservation = decode(&existing_bytes)?;
649                if matches!(reservation.status, ReservationStatus::Pending) {
650                    reservation.status = ReservationStatus::Cancelled;
651                    reservation.finalized_at_epoch_ms = Some(now);
652                    encoded_blobs.push(encode(&reservation)?);
653                    let encoded = encoded_blobs.last().ok_or_else(|| {
654                        PayError::InternalError("missing reservation blob".to_string())
655                    })?;
656                    reservation_table
657                        .insert(reservation_id, encoded.as_str())
658                        .map_err(|e| {
659                            PayError::InternalError(format!("spend update reservation: {e}"))
660                        })?;
661                }
662            }
663        }
664
665        write_txn
666            .commit()
667            .map_err(|e| PayError::InternalError(format!("spend commit cancel: {e}")))
668    }
669}
670
671// ═══════════════════════════════════════════
672// Postgres backend implementation
673// ═══════════════════════════════════════════
674
675#[cfg(feature = "postgres")]
676impl SpendLedger {
677    fn pg_pool(&self) -> Result<&sqlx::PgPool, PayError> {
678        match &self.backend {
679            SpendBackend::Postgres { pool } => Ok(pool),
680            _ => Err(PayError::InternalError(
681                "expected postgres spend backend".to_string(),
682            )),
683        }
684    }
685
686    async fn add_limit_postgres(&self, limit: &mut SpendLimit) -> Result<String, PayError> {
687        let pool = self.pg_pool()?;
688        let rule_id = generate_rule_identifier()?;
689        limit.rule_id = Some(rule_id.clone());
690        let rule_json = serde_json::to_value(limit)
691            .map_err(|e| PayError::InternalError(format!("serialize spend rule: {e}")))?;
692
693        sqlx::query("INSERT INTO spend_rules (rule_id, rule) VALUES ($1, $2)")
694            .bind(&rule_id)
695            .bind(&rule_json)
696            .execute(pool)
697            .await
698            .map_err(|e| PayError::InternalError(format!("pg insert spend rule: {e}")))?;
699
700        Ok(rule_id)
701    }
702
703    async fn remove_limit_postgres(&self, rule_id: &str) -> Result<(), PayError> {
704        let pool = self.pg_pool()?;
705        let result = sqlx::query("DELETE FROM spend_rules WHERE rule_id = $1")
706            .bind(rule_id)
707            .execute(pool)
708            .await
709            .map_err(|e| PayError::InternalError(format!("pg delete spend rule: {e}")))?;
710
711        if result.rows_affected() == 0 {
712            return Err(PayError::InvalidAmount(format!(
713                "rule_id '{rule_id}' not found"
714            )));
715        }
716        Ok(())
717    }
718
719    async fn set_limits_postgres(&self, limits: &[SpendLimit]) -> Result<(), PayError> {
720        let pool = self.pg_pool()?;
721        let mut tx = pool
722            .begin()
723            .await
724            .map_err(|e| PayError::InternalError(format!("pg begin tx: {e}")))?;
725
726        sqlx::query("DELETE FROM spend_rules")
727            .execute(&mut *tx)
728            .await
729            .map_err(|e| PayError::InternalError(format!("pg clear spend rules: {e}")))?;
730
731        for limit in limits {
732            let mut rule = limit.clone();
733            let rid = generate_rule_identifier()?;
734            rule.rule_id = Some(rid.clone());
735            let rule_json = serde_json::to_value(&rule)
736                .map_err(|e| PayError::InternalError(format!("serialize spend rule: {e}")))?;
737            sqlx::query("INSERT INTO spend_rules (rule_id, rule) VALUES ($1, $2)")
738                .bind(&rid)
739                .bind(&rule_json)
740                .execute(&mut *tx)
741                .await
742                .map_err(|e| PayError::InternalError(format!("pg insert spend rule: {e}")))?;
743        }
744
745        tx.commit()
746            .await
747            .map_err(|e| PayError::InternalError(format!("pg commit set_limits: {e}")))
748    }
749
750    async fn get_status_postgres(&self) -> Result<Vec<SpendLimitStatus>, PayError> {
751        let pool = self.pg_pool()?;
752        let rules = pg_load_rules(pool).await?;
753        let reservations = pg_load_reservations(pool).await?;
754        let now = now_epoch_ms();
755
756        let mut out = Vec::with_capacity(rules.len());
757        for rule in rules {
758            let use_usd = rule.scope == SpendScope::GlobalUsdCents;
759            let (spent, oldest_ts) = spent_in_window(&rule, &reservations, now, use_usd)?;
760            let remaining = rule.max_spend.saturating_sub(spent);
761            let window_ms = rule.window_s.saturating_mul(1000);
762            let window_reset_s = oldest_ts
763                .map(|oldest| (oldest.saturating_add(window_ms)).saturating_sub(now) / 1000)
764                .unwrap_or(0);
765            out.push(SpendLimitStatus {
766                rule_id: rule.rule_id.clone().unwrap_or_default(),
767                scope: rule.scope,
768                network: rule.network.clone(),
769                wallet: rule.wallet.clone(),
770                window_s: rule.window_s,
771                max_spend: rule.max_spend,
772                spent,
773                remaining,
774                token: rule.token.clone(),
775                window_reset_s,
776            });
777        }
778        Ok(out)
779    }
780
781    async fn reserve_postgres(&self, op_id: &str, ctx: &SpendContext) -> Result<u64, PayError> {
782        use crate::store::postgres_store::SPEND_ADVISORY_LOCK_KEY;
783
784        let pool = self.pg_pool()?;
785        let now = now_epoch_ms();
786
787        // Pre-flight: load rules outside the transaction for USD conversion
788        let rules = pg_load_rules(pool).await?;
789        if rules.iter().any(|r| {
790            r.scope == SpendScope::Wallet
791                && r.network.as_deref() == Some(ctx.network.as_str())
792                && ctx.wallet.is_none()
793        }) {
794            return Err(PayError::InvalidAmount(
795                "wallet-scoped limits require an explicit wallet".to_string(),
796            ));
797        }
798
799        let needs_usd = rules.iter().any(|r| r.scope == SpendScope::GlobalUsdCents);
800        let amount_usd_cents = if needs_usd {
801            Some(
802                self.amount_to_usd_cents(&ctx.network, ctx.token.as_deref(), ctx.amount_native)
803                    .await?,
804            )
805        } else {
806            None
807        };
808
809        // Begin serializable transaction with advisory lock
810        let mut tx = pool
811            .begin()
812            .await
813            .map_err(|e| PayError::InternalError(format!("pg begin tx: {e}")))?;
814
815        sqlx::query("SELECT pg_advisory_xact_lock($1)")
816            .bind(SPEND_ADVISORY_LOCK_KEY)
817            .execute(&mut *tx)
818            .await
819            .map_err(|e| PayError::InternalError(format!("pg advisory lock: {e}")))?;
820
821        // Check for existing reservation with same op_id (idempotency)
822        let existing: Option<(i64,)> =
823            sqlx::query_as("SELECT reservation_id FROM spend_reservations WHERE op_id = $1")
824                .bind(op_id)
825                .fetch_optional(&mut *tx)
826                .await
827                .map_err(|e| PayError::InternalError(format!("pg check op_id: {e}")))?;
828
829        if let Some((rid,)) = existing {
830            return Ok(rid as u64);
831        }
832
833        // Expire pending reservations
834        pg_expire_pending(&mut tx, now).await?;
835
836        // Load all reservations within the lock
837        let reservations = pg_load_reservations_tx(&mut tx).await?;
838
839        // Re-load rules within the lock (could have changed)
840        let rules = pg_load_rules_tx(&mut tx).await?;
841
842        // Check limits
843        for rule in rules.iter() {
844            if !rule_matches_context(
845                rule,
846                &ctx.network,
847                ctx.wallet.as_deref(),
848                ctx.token.as_deref(),
849            ) {
850                continue;
851            }
852
853            let use_usd = rule.scope == SpendScope::GlobalUsdCents;
854            let candidate_amount =
855                amount_for_rule(rule, ctx.amount_native, amount_usd_cents, use_usd)?;
856            let (spent, oldest_ts) = spent_in_window(rule, &reservations, now, use_usd)?;
857            if spent.saturating_add(candidate_amount) > rule.max_spend {
858                let window_ms = rule.window_s.saturating_mul(1000);
859                let remaining_s = oldest_ts
860                    .map(|oldest| (oldest.saturating_add(window_ms)).saturating_sub(now) / 1000)
861                    .unwrap_or(0);
862
863                return Err(PayError::LimitExceeded {
864                    rule_id: rule.rule_id.clone().unwrap_or_default(),
865                    scope: rule.scope,
866                    scope_key: scope_key(rule),
867                    spent,
868                    max_spend: rule.max_spend,
869                    token: rule.token.clone(),
870                    remaining_s,
871                    origin: None,
872                });
873            }
874        }
875
876        // Insert reservation
877        let reservation = SpendReservation {
878            reservation_id: 0, // will be assigned by BIGSERIAL
879            op_id: op_id.to_string(),
880            network: ctx.network.clone(),
881            wallet: ctx.wallet.clone(),
882            token: ctx.token.clone(),
883            amount_native: ctx.amount_native,
884            amount_usd_cents,
885            status: ReservationStatus::Pending,
886            created_at_epoch_ms: now,
887            expires_at_epoch_ms: now.saturating_add(300_000),
888            finalized_at_epoch_ms: None,
889        };
890        let reservation_json = serde_json::to_value(&reservation)
891            .map_err(|e| PayError::InternalError(format!("serialize reservation: {e}")))?;
892
893        let row: (i64,) = sqlx::query_as(
894            "INSERT INTO spend_reservations (op_id, reservation) \
895             VALUES ($1, $2) RETURNING reservation_id",
896        )
897        .bind(op_id)
898        .bind(&reservation_json)
899        .fetch_one(&mut *tx)
900        .await
901        .map_err(|e| PayError::InternalError(format!("pg insert reservation: {e}")))?;
902
903        let reservation_id = row.0 as u64;
904
905        // Update the reservation JSON with the assigned ID
906        let mut updated_json = reservation_json;
907        updated_json["reservation_id"] = serde_json::json!(reservation_id);
908        sqlx::query("UPDATE spend_reservations SET reservation = $1 WHERE reservation_id = $2")
909            .bind(&updated_json)
910            .bind(row.0)
911            .execute(&mut *tx)
912            .await
913            .map_err(|e| PayError::InternalError(format!("pg update reservation id: {e}")))?;
914
915        tx.commit()
916            .await
917            .map_err(|e| PayError::InternalError(format!("pg commit reserve: {e}")))?;
918
919        Ok(reservation_id)
920    }
921
922    async fn confirm_postgres(&self, reservation_id: u64) -> Result<(), PayError> {
923        let pool = self.pg_pool()?;
924        let now = now_epoch_ms();
925        let rid = reservation_id as i64;
926
927        let row: Option<(serde_json::Value,)> =
928            sqlx::query_as("SELECT reservation FROM spend_reservations WHERE reservation_id = $1")
929                .bind(rid)
930                .fetch_optional(pool)
931                .await
932                .map_err(|e| PayError::InternalError(format!("pg read reservation: {e}")))?;
933
934        let Some((res_json,)) = row else {
935            return Err(PayError::InternalError(format!(
936                "reservation {reservation_id} not found"
937            )));
938        };
939
940        let mut reservation: SpendReservation = serde_json::from_value(res_json)
941            .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))?;
942
943        if !matches!(reservation.status, ReservationStatus::Pending) {
944            return Ok(());
945        }
946
947        reservation.status = ReservationStatus::Confirmed;
948        reservation.finalized_at_epoch_ms = Some(now);
949        let updated_json = serde_json::to_value(&reservation)
950            .map_err(|e| PayError::InternalError(format!("serialize reservation: {e}")))?;
951
952        let event = SpendEvent {
953            event_id: 0, // assigned by BIGSERIAL
954            reservation_id,
955            op_id: reservation.op_id,
956            network: reservation.network,
957            wallet: reservation.wallet,
958            token: reservation.token,
959            amount_native: reservation.amount_native,
960            amount_usd_cents: reservation.amount_usd_cents,
961            created_at_epoch_ms: reservation.created_at_epoch_ms,
962            confirmed_at_epoch_ms: now,
963        };
964        let event_json = serde_json::to_value(&event)
965            .map_err(|e| PayError::InternalError(format!("serialize spend event: {e}")))?;
966
967        let mut tx = pool
968            .begin()
969            .await
970            .map_err(|e| PayError::InternalError(format!("pg begin tx: {e}")))?;
971
972        sqlx::query("UPDATE spend_reservations SET reservation = $1 WHERE reservation_id = $2")
973            .bind(&updated_json)
974            .bind(rid)
975            .execute(&mut *tx)
976            .await
977            .map_err(|e| PayError::InternalError(format!("pg update reservation: {e}")))?;
978
979        sqlx::query("INSERT INTO spend_events (reservation_id, event) VALUES ($1, $2)")
980            .bind(rid)
981            .bind(&event_json)
982            .execute(&mut *tx)
983            .await
984            .map_err(|e| PayError::InternalError(format!("pg insert spend event: {e}")))?;
985
986        tx.commit()
987            .await
988            .map_err(|e| PayError::InternalError(format!("pg commit confirm: {e}")))
989    }
990
991    async fn cancel_postgres(&self, reservation_id: u64) -> Result<(), PayError> {
992        let pool = self.pg_pool()?;
993        let now = now_epoch_ms();
994        let rid = reservation_id as i64;
995
996        let row: Option<(serde_json::Value,)> =
997            sqlx::query_as("SELECT reservation FROM spend_reservations WHERE reservation_id = $1")
998                .bind(rid)
999                .fetch_optional(pool)
1000                .await
1001                .map_err(|e| PayError::InternalError(format!("pg read reservation: {e}")))?;
1002
1003        if let Some((res_json,)) = row {
1004            let mut reservation: SpendReservation = serde_json::from_value(res_json)
1005                .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))?;
1006
1007            if matches!(reservation.status, ReservationStatus::Pending) {
1008                reservation.status = ReservationStatus::Cancelled;
1009                reservation.finalized_at_epoch_ms = Some(now);
1010                let updated_json = serde_json::to_value(&reservation)
1011                    .map_err(|e| PayError::InternalError(format!("serialize reservation: {e}")))?;
1012
1013                sqlx::query(
1014                    "UPDATE spend_reservations SET reservation = $1 WHERE reservation_id = $2",
1015                )
1016                .bind(&updated_json)
1017                .bind(rid)
1018                .execute(pool)
1019                .await
1020                .map_err(|e| PayError::InternalError(format!("pg update reservation: {e}")))?;
1021            }
1022        }
1023
1024        Ok(())
1025    }
1026}
1027
1028#[cfg(feature = "postgres")]
1029async fn pg_load_rules(pool: &sqlx::PgPool) -> Result<Vec<SpendLimit>, PayError> {
1030    let rows: Vec<(serde_json::Value,)> =
1031        sqlx::query_as("SELECT rule FROM spend_rules ORDER BY rule_id")
1032            .fetch_all(pool)
1033            .await
1034            .map_err(|e| PayError::InternalError(format!("pg load spend rules: {e}")))?;
1035    rows.into_iter()
1036        .map(|(v,)| {
1037            serde_json::from_value(v)
1038                .map_err(|e| PayError::InternalError(format!("pg parse spend rule: {e}")))
1039        })
1040        .collect()
1041}
1042
1043#[cfg(feature = "postgres")]
1044async fn pg_load_rules_tx(
1045    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1046) -> Result<Vec<SpendLimit>, PayError> {
1047    let rows: Vec<(serde_json::Value,)> =
1048        sqlx::query_as("SELECT rule FROM spend_rules ORDER BY rule_id")
1049            .fetch_all(&mut **tx)
1050            .await
1051            .map_err(|e| PayError::InternalError(format!("pg load spend rules: {e}")))?;
1052    rows.into_iter()
1053        .map(|(v,)| {
1054            serde_json::from_value(v)
1055                .map_err(|e| PayError::InternalError(format!("pg parse spend rule: {e}")))
1056        })
1057        .collect()
1058}
1059
1060#[cfg(feature = "postgres")]
1061async fn pg_load_reservations(pool: &sqlx::PgPool) -> Result<Vec<SpendReservation>, PayError> {
1062    let rows: Vec<(serde_json::Value,)> =
1063        sqlx::query_as("SELECT reservation FROM spend_reservations ORDER BY reservation_id")
1064            .fetch_all(pool)
1065            .await
1066            .map_err(|e| PayError::InternalError(format!("pg load reservations: {e}")))?;
1067    rows.into_iter()
1068        .map(|(v,)| {
1069            serde_json::from_value(v)
1070                .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))
1071        })
1072        .collect()
1073}
1074
1075#[cfg(feature = "postgres")]
1076async fn pg_load_reservations_tx(
1077    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1078) -> Result<Vec<SpendReservation>, PayError> {
1079    let rows: Vec<(serde_json::Value,)> =
1080        sqlx::query_as("SELECT reservation FROM spend_reservations ORDER BY reservation_id")
1081            .fetch_all(&mut **tx)
1082            .await
1083            .map_err(|e| PayError::InternalError(format!("pg load reservations: {e}")))?;
1084    rows.into_iter()
1085        .map(|(v,)| {
1086            serde_json::from_value(v)
1087                .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))
1088        })
1089        .collect()
1090}
1091
1092#[cfg(feature = "postgres")]
1093async fn pg_expire_pending(
1094    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1095    now_ms: u64,
1096) -> Result<(), PayError> {
1097    // Load pending reservations and expire those past their deadline
1098    let rows: Vec<(i64, serde_json::Value)> =
1099        sqlx::query_as("SELECT reservation_id, reservation FROM spend_reservations")
1100            .fetch_all(&mut **tx)
1101            .await
1102            .map_err(|e| {
1103                PayError::InternalError(format!("pg load reservations for expire: {e}"))
1104            })?;
1105
1106    for (rid, res_json) in rows {
1107        let mut reservation: SpendReservation = serde_json::from_value(res_json)
1108            .map_err(|e| PayError::InternalError(format!("pg parse reservation: {e}")))?;
1109        if matches!(reservation.status, ReservationStatus::Pending)
1110            && reservation.expires_at_epoch_ms <= now_ms
1111        {
1112            reservation.status = ReservationStatus::Expired;
1113            reservation.finalized_at_epoch_ms = Some(now_ms);
1114            let updated = serde_json::to_value(&reservation)
1115                .map_err(|e| PayError::InternalError(format!("serialize reservation: {e}")))?;
1116            sqlx::query("UPDATE spend_reservations SET reservation = $1 WHERE reservation_id = $2")
1117                .bind(&updated)
1118                .bind(rid)
1119                .execute(&mut **tx)
1120                .await
1121                .map_err(|e| PayError::InternalError(format!("pg expire reservation: {e}")))?;
1122        }
1123    }
1124    Ok(())
1125}
1126
1127// ═══════════════════════════════════════════
1128// Exchange rate (shared, delegates to backend for caching)
1129// ═══════════════════════════════════════════
1130
1131impl SpendLedger {
1132    async fn amount_to_usd_cents(
1133        &self,
1134        network: &str,
1135        token: Option<&str>,
1136        amount_native: u64,
1137    ) -> Result<u64, PayError> {
1138        let (symbol, divisor) = token_asset(network, token).ok_or_else(|| {
1139            PayError::InvalidAmount(format!(
1140                "network '{network}' token '{token:?}' is unsupported for global-usd-cents limits"
1141            ))
1142        })?;
1143
1144        let quote = self.get_or_fetch_quote(symbol, "USD").await?;
1145        let usd = (amount_native as f64 / divisor) * quote.price;
1146        if !usd.is_finite() || usd < 0f64 {
1147            return Err(PayError::InternalError(
1148                "invalid exchange-rate conversion result".to_string(),
1149            ));
1150        }
1151        Ok((usd * 100f64).round() as u64)
1152    }
1153
1154    async fn get_or_fetch_quote(
1155        &self,
1156        base: &str,
1157        quote: &str,
1158    ) -> Result<ExchangeRateQuote, PayError> {
1159        let pair = format!(
1160            "{}/{}",
1161            base.to_ascii_uppercase(),
1162            quote.to_ascii_uppercase()
1163        );
1164        let now = now_epoch_ms();
1165
1166        // Try cache — redb
1167        #[cfg(feature = "redb")]
1168        if let SpendBackend::Redb { .. } = &self.backend {
1169            let fx_db = self.open_exchange_rate_db()?;
1170            let read_txn = fx_db
1171                .begin_read()
1172                .map_err(|e| PayError::InternalError(format!("fx begin_read: {e}")))?;
1173            if let Ok(table) = read_txn.open_table(FX_QUOTE_BY_PAIR) {
1174                if let Some(entry) = table
1175                    .get(pair.as_str())
1176                    .map_err(|e| PayError::InternalError(format!("fx read quote: {e}")))?
1177                {
1178                    let cached: ExchangeRateQuote = decode(entry.value())?;
1179                    if cached.expires_at_epoch_ms > now {
1180                        return Ok(cached);
1181                    }
1182                }
1183            }
1184        }
1185
1186        // Try cache — postgres
1187        #[cfg(feature = "postgres")]
1188        if let SpendBackend::Postgres { pool } = &self.backend {
1189            let row: Option<(serde_json::Value,)> =
1190                sqlx::query_as("SELECT quote FROM exchange_rate_cache WHERE pair = $1")
1191                    .bind(&pair)
1192                    .fetch_optional(pool)
1193                    .await
1194                    .map_err(|e| PayError::InternalError(format!("pg fx read cache: {e}")))?;
1195            if let Some((quote_json,)) = row {
1196                let cached: ExchangeRateQuote = serde_json::from_value(quote_json)
1197                    .map_err(|e| PayError::InternalError(format!("pg fx parse cache: {e}")))?;
1198                if cached.expires_at_epoch_ms > now {
1199                    return Ok(cached);
1200                }
1201            }
1202        }
1203
1204        let (fetched_price, source_name) = self.fetch_exchange_rate_http(base, quote).await?;
1205        let ttl_s = self
1206            .exchange_rate
1207            .as_ref()
1208            .map(|cfg| cfg.ttl_s)
1209            .unwrap_or(300)
1210            .max(1);
1211        let new_quote = ExchangeRateQuote {
1212            pair: pair.clone(),
1213            source: source_name,
1214            price: fetched_price,
1215            fetched_at_epoch_ms: now,
1216            expires_at_epoch_ms: now.saturating_add(ttl_s.saturating_mul(1000)),
1217        };
1218
1219        // Write cache — redb
1220        #[cfg(feature = "redb")]
1221        if let SpendBackend::Redb { .. } = &self.backend {
1222            let fx_db = self.open_exchange_rate_db()?;
1223            let write_txn = fx_db
1224                .begin_write()
1225                .map_err(|e| PayError::InternalError(format!("fx begin_write: {e}")))?;
1226            let mut encoded_blobs: Vec<String> = Vec::new();
1227            {
1228                let mut table = write_txn
1229                    .open_table(FX_QUOTE_BY_PAIR)
1230                    .map_err(|e| PayError::InternalError(format!("fx open quote table: {e}")))?;
1231                encoded_blobs.push(encode(&new_quote)?);
1232                let encoded = encoded_blobs
1233                    .last()
1234                    .ok_or_else(|| PayError::InternalError("missing quote blob".to_string()))?;
1235                table
1236                    .insert(pair.as_str(), encoded.as_str())
1237                    .map_err(|e| PayError::InternalError(format!("fx insert quote: {e}")))?;
1238            }
1239            write_txn
1240                .commit()
1241                .map_err(|e| PayError::InternalError(format!("fx commit write: {e}")))?;
1242        }
1243
1244        // Write cache — postgres
1245        #[cfg(feature = "postgres")]
1246        if let SpendBackend::Postgres { pool } = &self.backend {
1247            let quote_json = serde_json::to_value(&new_quote)
1248                .map_err(|e| PayError::InternalError(format!("serialize fx quote: {e}")))?;
1249            let _ = sqlx::query(
1250                "INSERT INTO exchange_rate_cache (pair, quote) VALUES ($1, $2) \
1251                 ON CONFLICT (pair) DO UPDATE SET quote = $2",
1252            )
1253            .bind(&pair)
1254            .bind(&quote_json)
1255            .execute(pool)
1256            .await;
1257        }
1258
1259        Ok(new_quote)
1260    }
1261
1262    #[cfg(feature = "exchange-rate")]
1263    async fn fetch_exchange_rate_http(
1264        &self,
1265        base: &str,
1266        quote_currency: &str,
1267    ) -> Result<(f64, String), PayError> {
1268        let cfg = self.exchange_rate.as_ref().cloned().unwrap_or_default();
1269
1270        if cfg.sources.is_empty() {
1271            return Err(PayError::InvalidAmount(
1272                "exchange_rate.sources is empty — no exchange-rate API configured".to_string(),
1273            ));
1274        }
1275
1276        let client = reqwest::Client::new();
1277        let mut last_err = String::new();
1278
1279        for source in &cfg.sources {
1280            match fetch_from_source(&client, source, base, quote_currency).await {
1281                Ok(price) => return Ok((price, source.endpoint.clone())),
1282                Err(e) => {
1283                    last_err =
1284                        format!("{} ({}): {e}", source.endpoint, source.source_type.as_str());
1285                }
1286            }
1287        }
1288
1289        Err(PayError::NetworkError(format!(
1290            "all exchange-rate sources failed; last: {last_err}"
1291        )))
1292    }
1293
1294    #[cfg(not(feature = "exchange-rate"))]
1295    async fn fetch_exchange_rate_http(
1296        &self,
1297        _base: &str,
1298        _quote_currency: &str,
1299    ) -> Result<(f64, String), PayError> {
1300        Err(PayError::NotImplemented(
1301            "exchange-rate HTTP support is not built in this feature set".to_string(),
1302        ))
1303    }
1304}
1305
1306// ═══════════════════════════════════════════
1307// Helpers
1308// ═══════════════════════════════════════════
1309
1310fn now_epoch_ms() -> u64 {
1311    std::time::SystemTime::now()
1312        .duration_since(std::time::UNIX_EPOCH)
1313        .map(|d| d.as_millis() as u64)
1314        .unwrap_or(0)
1315}
1316
1317fn token_asset(network: &str, token: Option<&str>) -> Option<(&'static str, f64)> {
1318    match token.map(|t| t.to_ascii_lowercase()).as_deref() {
1319        Some("sol") => Some(("SOL", 1e9)),
1320        Some("eth") => Some(("ETH", 1e18)),
1321        Some("usdc" | "usdt") => Some(("USD", 1e6)),
1322        Some(_) => None,
1323        None => {
1324            let p = network.to_ascii_lowercase();
1325            if p.starts_with("ln") || p == "cashu" || p == "btc" {
1326                Some(("BTC", 1e8))
1327            } else {
1328                None
1329            }
1330        }
1331    }
1332}
1333
1334fn extract_price_generic(value: &serde_json::Value) -> Option<f64> {
1335    value
1336        .get("price")
1337        .and_then(|v| v.as_f64())
1338        .or_else(|| value.get("rate").and_then(|v| v.as_f64()))
1339        .or_else(|| value.get("usd_per_base").and_then(|v| v.as_f64()))
1340        .or_else(|| {
1341            value
1342                .get("data")
1343                .and_then(|d| d.get("price"))
1344                .and_then(|v| v.as_f64())
1345        })
1346}
1347
1348impl ExchangeRateSourceType {
1349    fn as_str(self) -> &'static str {
1350        match self {
1351            Self::Generic => "generic",
1352            Self::CoinGecko => "coingecko",
1353            Self::Kraken => "kraken",
1354        }
1355    }
1356}
1357
1358fn coingecko_coin_id(symbol: &str) -> Option<&'static str> {
1359    match symbol.to_ascii_uppercase().as_str() {
1360        "BTC" => Some("bitcoin"),
1361        "SOL" => Some("solana"),
1362        "ETH" => Some("ethereum"),
1363        _ => None,
1364    }
1365}
1366
1367fn kraken_pair(symbol: &str) -> Option<&'static str> {
1368    match symbol.to_ascii_uppercase().as_str() {
1369        "BTC" => Some("XBTUSD"),
1370        "SOL" => Some("SOLUSD"),
1371        "ETH" => Some("ETHUSD"),
1372        _ => None,
1373    }
1374}
1375
1376#[cfg(feature = "exchange-rate")]
1377async fn fetch_from_source(
1378    client: &reqwest::Client,
1379    source: &crate::types::ExchangeRateSource,
1380    base: &str,
1381    quote_currency: &str,
1382) -> Result<f64, String> {
1383    type PriceExtractor = Box<dyn Fn(&serde_json::Value) -> Option<f64> + Send>;
1384    let (url, extract_fn): (String, PriceExtractor) = match source.source_type {
1385        ExchangeRateSourceType::Kraken => {
1386            let pair = kraken_pair(base)
1387                .ok_or_else(|| format!("kraken: unsupported base asset '{base}'"))?;
1388            let url = format!("{}/0/public/Ticker?pair={pair}", source.endpoint);
1389            let pair_owned = pair.to_string();
1390            (
1391                url,
1392                Box::new(move |v: &serde_json::Value| {
1393                    let result = v.get("result")?;
1394                    let ticker = result
1395                        .get(&pair_owned)
1396                        .or_else(|| result.as_object().and_then(|m| m.values().next()))?;
1397                    let price_str = ticker.get("c")?.as_array()?.first()?.as_str()?;
1398                    price_str.parse::<f64>().ok()
1399                }),
1400            )
1401        }
1402        ExchangeRateSourceType::CoinGecko => {
1403            let coin_id = coingecko_coin_id(base)
1404                .ok_or_else(|| format!("coingecko: unsupported base asset '{base}'"))?;
1405            let vs = quote_currency.to_ascii_lowercase();
1406            let url = format!(
1407                "{}/simple/price?ids={coin_id}&vs_currencies={vs}",
1408                source.endpoint
1409            );
1410            let coin_id_owned = coin_id.to_string();
1411            let vs_owned = vs.clone();
1412            (
1413                url,
1414                Box::new(move |v: &serde_json::Value| {
1415                    v.get(&coin_id_owned)?.get(&vs_owned)?.as_f64()
1416                }),
1417            )
1418        }
1419        ExchangeRateSourceType::Generic => {
1420            let sep = if source.endpoint.contains('?') {
1421                '&'
1422            } else {
1423                '?'
1424            };
1425            let url = format!(
1426                "{}{sep}base={}&quote={}",
1427                source.endpoint,
1428                base.to_ascii_uppercase(),
1429                quote_currency.to_ascii_uppercase()
1430            );
1431            (url, Box::new(extract_price_generic))
1432        }
1433    };
1434
1435    let mut req = client.get(&url);
1436    if let Some(key) = &source.api_key {
1437        req = req.header("Authorization", format!("Bearer {key}"));
1438        req = req.header("X-Api-Key", key);
1439    }
1440
1441    let resp = req
1442        .send()
1443        .await
1444        .map_err(|e| format!("request failed: {e}"))?;
1445    if !resp.status().is_success() {
1446        return Err(format!("status {}", resp.status()));
1447    }
1448
1449    let value: serde_json::Value = resp
1450        .json()
1451        .await
1452        .map_err(|e| format!("parse failed: {e}"))?;
1453
1454    extract_fn(&value).ok_or_else(|| "could not extract price from response".to_string())
1455}
1456
1457fn encode<T: Serialize>(value: &T) -> Result<String, PayError> {
1458    serde_json::to_string(value)
1459        .map_err(|e| PayError::InternalError(format!("spend encode failed: {e}")))
1460}
1461
1462fn decode<T: DeserializeOwned>(encoded: &str) -> Result<T, PayError> {
1463    serde_json::from_str(encoded).map_err(|e| {
1464        let preview_len = encoded.len().min(48);
1465        let preview = &encoded[..preview_len];
1466        PayError::InternalError(format!(
1467            "spend decode failed (len={}, preview={}): {e}",
1468            encoded.len(),
1469            preview
1470        ))
1471    })
1472}
1473
1474fn prepend_err(prefix: &str, err: PayError) -> PayError {
1475    match err {
1476        PayError::InternalError(msg) => PayError::InternalError(format!("{prefix}: {msg}")),
1477        other => other,
1478    }
1479}
1480
1481fn generate_rule_identifier() -> Result<String, PayError> {
1482    let mut buf = [0u8; 4];
1483    getrandom::fill(&mut buf).map_err(|e| PayError::InternalError(format!("rng failed: {e}")))?;
1484    Ok(format!("r_{}", hex::encode(buf)))
1485}
1486
1487fn validate_limit(
1488    rule: &SpendLimit,
1489    exchange_rate: Option<&ExchangeRateConfig>,
1490) -> Result<(), PayError> {
1491    if rule.window_s == 0 {
1492        return Err(PayError::InvalidAmount(
1493            "limit rule has zero window_s".to_string(),
1494        ));
1495    }
1496    if rule.max_spend == 0 {
1497        return Err(PayError::InvalidAmount(
1498            "limit rule has zero max_spend".to_string(),
1499        ));
1500    }
1501
1502    match rule.scope {
1503        SpendScope::GlobalUsdCents => {
1504            if rule.network.is_some() || rule.wallet.is_some() {
1505                return Err(PayError::InvalidAmount(
1506                    "scope=global-usd-cents cannot set network/wallet".to_string(),
1507                ));
1508            }
1509            if rule.token.is_some() {
1510                return Err(PayError::InvalidAmount(
1511                    "scope=global-usd-cents cannot set token".to_string(),
1512                ));
1513            }
1514        }
1515        SpendScope::Network => {
1516            if rule.network.as_deref().unwrap_or("").trim().is_empty() {
1517                return Err(PayError::InvalidAmount(
1518                    "scope=network requires network".to_string(),
1519                ));
1520            }
1521            if rule.wallet.is_some() {
1522                return Err(PayError::InvalidAmount(
1523                    "scope=network cannot set wallet".to_string(),
1524                ));
1525            }
1526        }
1527        SpendScope::Wallet => {
1528            if rule.network.as_deref().unwrap_or("").trim().is_empty() {
1529                return Err(PayError::InvalidAmount(
1530                    "scope=wallet requires network".to_string(),
1531                ));
1532            }
1533            if rule.wallet.as_deref().unwrap_or("").trim().is_empty() {
1534                return Err(PayError::InvalidAmount(
1535                    "scope=wallet requires wallet".to_string(),
1536                ));
1537            }
1538        }
1539    }
1540
1541    if rule.scope == SpendScope::GlobalUsdCents && exchange_rate.is_none() {
1542        return Err(PayError::InvalidAmount(
1543            "scope=global-usd-cents requires config.exchange_rate".to_string(),
1544        ));
1545    }
1546    Ok(())
1547}
1548
1549#[cfg(feature = "redb")]
1550fn load_rules(read_txn: &redb::ReadTransaction) -> Result<Vec<SpendLimit>, PayError> {
1551    let Ok(rule_table) = read_txn.open_table(RULE_BY_ID) else {
1552        return Ok(vec![]);
1553    };
1554    rule_table
1555        .iter()
1556        .map_err(|e| PayError::InternalError(format!("spend iterate rules: {e}")))?
1557        .map(|entry| {
1558            let (_k, v) = entry
1559                .map_err(|e| PayError::InternalError(format!("spend read rule entry: {e}")))?;
1560            decode::<SpendLimit>(v.value()).map_err(|e| prepend_err("spend decode rule", e))
1561        })
1562        .collect()
1563}
1564
1565#[cfg(feature = "redb")]
1566fn load_reservations(read_txn: &redb::ReadTransaction) -> Result<Vec<SpendReservation>, PayError> {
1567    let Ok(table) = read_txn.open_table(RESERVATION_BY_ID) else {
1568        return Ok(vec![]);
1569    };
1570    table
1571        .iter()
1572        .map_err(|e| PayError::InternalError(format!("spend iterate reservations: {e}")))?
1573        .map(|entry| {
1574            let (_k, v) = entry
1575                .map_err(|e| PayError::InternalError(format!("spend read reservation: {e}")))?;
1576            decode::<SpendReservation>(v.value())
1577                .map_err(|e| prepend_err("spend decode reservation", e))
1578        })
1579        .collect()
1580}
1581
1582#[cfg(feature = "redb")]
1583fn expire_pending(_table: &mut redb::Table<u64, &str>, _now_ms: u64) -> Result<(), PayError> {
1584    Ok(())
1585}
1586
1587fn amount_for_rule(
1588    _rule: &SpendLimit,
1589    amount_native: u64,
1590    amount_usd_cents: Option<u64>,
1591    use_usd: bool,
1592) -> Result<u64, PayError> {
1593    if use_usd {
1594        amount_usd_cents.ok_or_else(|| {
1595            PayError::InternalError("missing USD amount for non-native unit rule".to_string())
1596        })
1597    } else {
1598        Ok(amount_native)
1599    }
1600}
1601
1602fn reservation_active_for_window(r: &SpendReservation, now_ms: u64) -> bool {
1603    match r.status {
1604        ReservationStatus::Confirmed => true,
1605        ReservationStatus::Pending => r.expires_at_epoch_ms > now_ms,
1606        ReservationStatus::Cancelled | ReservationStatus::Expired => false,
1607    }
1608}
1609
1610fn rule_matches_context(
1611    rule: &SpendLimit,
1612    network: &str,
1613    wallet: Option<&str>,
1614    token: Option<&str>,
1615) -> bool {
1616    if let Some(rule_token) = &rule.token {
1617        match token {
1618            Some(ctx_token) if ctx_token.eq_ignore_ascii_case(rule_token) => {}
1619            _ => return false,
1620        }
1621    }
1622    match rule.scope {
1623        SpendScope::GlobalUsdCents => true,
1624        SpendScope::Network => rule.network.as_deref() == Some(network),
1625        SpendScope::Wallet => {
1626            rule.network.as_deref() == Some(network) && rule.wallet.as_deref() == wallet
1627        }
1628    }
1629}
1630
1631fn scope_key(rule: &SpendLimit) -> String {
1632    match rule.scope {
1633        SpendScope::GlobalUsdCents => "global-usd-cents".to_string(),
1634        SpendScope::Network => rule.network.clone().unwrap_or_default(),
1635        SpendScope::Wallet => format!(
1636            "{}/{}",
1637            rule.network.clone().unwrap_or_default(),
1638            rule.wallet.clone().unwrap_or_default()
1639        ),
1640    }
1641}
1642
1643fn spent_in_window(
1644    rule: &SpendLimit,
1645    reservations: &[SpendReservation],
1646    now_ms: u64,
1647    use_usd: bool,
1648) -> Result<(u64, Option<u64>), PayError> {
1649    let window_ms = rule.window_s.saturating_mul(1000);
1650    let cutoff = now_ms.saturating_sub(window_ms);
1651
1652    let mut spent = 0u64;
1653    let mut oldest: Option<u64> = None;
1654
1655    for r in reservations {
1656        if !reservation_active_for_window(r, now_ms) {
1657            continue;
1658        }
1659        if r.created_at_epoch_ms < cutoff {
1660            continue;
1661        }
1662        if !rule_matches_context(rule, &r.network, r.wallet.as_deref(), r.token.as_deref()) {
1663            continue;
1664        }
1665
1666        let amount = if use_usd {
1667            r.amount_usd_cents.ok_or_else(|| {
1668                PayError::InternalError("reservation missing USD amount".to_string())
1669            })?
1670        } else {
1671            r.amount_native
1672        };
1673        spent = spent.saturating_add(amount);
1674        oldest = Some(oldest.map_or(r.created_at_epoch_ms, |v| v.min(r.created_at_epoch_ms)));
1675    }
1676
1677    Ok((spent, oldest))
1678}
1679
1680#[cfg(feature = "redb")]
1681fn next_counter(write_txn: &redb::WriteTransaction, key: &str) -> Result<u64, PayError> {
1682    let mut meta = write_txn
1683        .open_table(META_COUNTER)
1684        .map_err(|e| PayError::InternalError(format!("spend open meta table: {e}")))?;
1685    let current = match meta
1686        .get(key)
1687        .map_err(|e| PayError::InternalError(format!("spend read counter {key}: {e}")))?
1688    {
1689        Some(v) => v.value(),
1690        None => 0,
1691    };
1692    let next = current.saturating_add(1);
1693    meta.insert(key, next)
1694        .map_err(|e| PayError::InternalError(format!("spend write counter {key}: {e}")))?;
1695    Ok(next)
1696}
1697
1698#[cfg(test)]
1699mod tests {
1700    use super::*;
1701
1702    fn make_limit(scope: SpendScope, network: Option<&str>, wallet: Option<&str>) -> SpendLimit {
1703        SpendLimit {
1704            rule_id: None,
1705            scope,
1706            network: network.map(|s| s.to_string()),
1707            wallet: wallet.map(|s| s.to_string()),
1708            window_s: 3600,
1709            max_spend: 1000,
1710            token: None,
1711        }
1712    }
1713
1714    #[cfg(feature = "redb")]
1715    #[tokio::test]
1716    async fn provider_limit_reserve_and_confirm() {
1717        let tmp = tempfile::tempdir().unwrap();
1718        let ledger = SpendLedger::new(tmp.path().to_str().unwrap(), None);
1719
1720        ledger
1721            .set_limits(&[make_limit(SpendScope::Network, Some("cashu"), None)])
1722            .await
1723            .unwrap();
1724
1725        let ctx = SpendContext {
1726            network: "cashu".to_string(),
1727            wallet: Some("w_01".to_string()),
1728            amount_native: 400,
1729            token: None,
1730        };
1731        let r1 = ledger.reserve("op_1", &ctx).await.unwrap();
1732        ledger.confirm(r1).await.unwrap();
1733
1734        let r2 = ledger.reserve("op_2", &ctx).await.unwrap();
1735        let err = ledger.reserve("op_3", &ctx).await.unwrap_err();
1736        assert!(matches!(err, PayError::LimitExceeded { .. }));
1737
1738        ledger.cancel(r2).await.unwrap();
1739    }
1740
1741    #[cfg(feature = "redb")]
1742    #[tokio::test]
1743    async fn wallet_scope_requires_wallet_context() {
1744        let tmp = tempfile::tempdir().unwrap();
1745        let ledger = SpendLedger::new(tmp.path().to_str().unwrap(), None);
1746
1747        ledger
1748            .set_limits(&[make_limit(SpendScope::Wallet, Some("cashu"), Some("w_abc"))])
1749            .await
1750            .unwrap();
1751
1752        let ctx = SpendContext {
1753            network: "cashu".to_string(),
1754            wallet: None,
1755            amount_native: 1,
1756            token: None,
1757        };
1758        let err = ledger.reserve("op_1", &ctx).await.unwrap_err();
1759        assert!(matches!(err, PayError::InvalidAmount(_)));
1760    }
1761
1762    #[tokio::test]
1763    async fn global_usd_cents_scope_requires_exchange_rate_config() {
1764        let tmp = tempfile::tempdir().unwrap();
1765        let ledger = SpendLedger::new(tmp.path().to_str().unwrap(), None);
1766
1767        let err = ledger
1768            .set_limits(&[SpendLimit {
1769                rule_id: None,
1770                scope: SpendScope::GlobalUsdCents,
1771                network: None,
1772                wallet: None,
1773                window_s: 3600,
1774                max_spend: 100,
1775                token: None,
1776            }])
1777            .await
1778            .unwrap_err();
1779
1780        assert!(matches!(err, PayError::InvalidAmount(_)));
1781    }
1782
1783    #[cfg(feature = "redb")]
1784    #[tokio::test]
1785    async fn network_scope_native_token_ok_without_exchange_rate() {
1786        let tmp = tempfile::tempdir().unwrap();
1787        let ledger = SpendLedger::new(tmp.path().to_str().unwrap(), None);
1788
1789        ledger
1790            .set_limits(&[SpendLimit {
1791                rule_id: None,
1792                scope: SpendScope::Network,
1793                network: Some("cashu".to_string()),
1794                wallet: None,
1795                window_s: 3600,
1796                max_spend: 100,
1797                token: None,
1798            }])
1799            .await
1800            .expect("network scope should not require exchange_rate");
1801    }
1802}