Skip to main content

ff_engine/scanner/
budget_reset.rs

1//! Budget reset scanner.
2//!
3//! Scans `ff:idx:{b:M}:budget_resets` per budget partition, finding budgets
4//! whose `next_reset_at` score is <= now. For each, calls
5//! `FCALL ff_reset_budget` which zeroes usage counters, records the reset
6//! event, computes the next reset time, and re-scores in the index.
7//!
8//! Reference: RFC-008 §Budget Reset, RFC-010 §6.11
9
10use std::sync::Arc;
11use std::time::Duration;
12
13use ff_core::backend::ScannerFilter;
14use ff_core::contracts::ResetBudgetArgs;
15use ff_core::engine_backend::EngineBackend;
16use ff_core::keys::budget_resets_key;
17use ff_core::partition::{Partition, PartitionFamily};
18use ff_core::types::{BudgetId, TimestampMs};
19
20use super::{ScanResult, Scanner};
21
22const BATCH_SIZE: u32 = 20;
23
24pub struct BudgetResetScanner {
25    interval: Duration,
26    /// Issue #122: accepted for uniform API; not applied.
27    filter: ScannerFilter,
28    /// PR-7b Cluster 2: trait-dispatch target for the per-budget
29    /// `ff_reset_budget` FCALL. When `None` (legacy test construction)
30    /// the scanner falls back to the pre-trait direct-FCALL path on
31    /// the supplied `ferriskey::Client`.
32    backend: Option<Arc<dyn EngineBackend>>,
33}
34
35impl BudgetResetScanner {
36    pub fn new(interval: Duration) -> Self {
37        Self::with_filter(interval, ScannerFilter::default())
38    }
39
40    /// Accepts a [`ScannerFilter`] for uniform construction across
41    /// all scanners (issue #122) but **does not apply it**. This
42    /// scanner iterates budget IDs — not executions — and the
43    /// `namespace` / `instance_tag` filter dimensions do not map
44    /// onto budget partitions.
45    pub fn with_filter(interval: Duration, filter: ScannerFilter) -> Self {
46        Self {
47            interval,
48            filter,
49            backend: None,
50        }
51    }
52
53    /// PR-7b Cluster 2: wire an `EngineBackend` so the per-budget
54    /// reset routes through the trait (`EngineBackend::reset_budget`)
55    /// rather than issuing a raw FCALL against the scanner client.
56    pub fn with_filter_and_backend(
57        interval: Duration,
58        filter: ScannerFilter,
59        backend: Arc<dyn EngineBackend>,
60    ) -> Self {
61        Self {
62            interval,
63            filter,
64            backend: Some(backend),
65        }
66    }
67}
68
69impl Scanner for BudgetResetScanner {
70    fn name(&self) -> &'static str {
71        "budget_reset"
72    }
73
74    fn interval(&self) -> Duration {
75        self.interval
76    }
77
78    fn filter(&self) -> &ScannerFilter {
79        &self.filter
80    }
81
82    async fn scan_partition(
83        &self,
84        client: &ferriskey::Client,
85        partition: u16,
86    ) -> ScanResult {
87        let p = Partition {
88            family: PartitionFamily::Budget,
89            index: partition,
90        };
91        let tag = p.hash_tag();
92        let resets_key = budget_resets_key(&tag);
93
94        let now_ms_res: Result<u64, String> = if let Some(ref b) = self.backend {
95            b.server_time_ms().await.map_err(|e| e.to_string())
96        } else {
97            crate::scanner::lease_expiry::server_time_ms_legacy(client).await.map_err(|e| e.to_string())
98        };
99        let now_ms = match now_ms_res {
100            Ok(t) => t,
101            Err(e) => {
102                tracing::warn!(partition, error = %e, "budget_reset: failed to get server time");
103                return ScanResult { processed: 0, errors: 1 };
104            }
105        };
106
107        // ZRANGEBYSCORE budget_resets -inf now LIMIT 0 batch_size
108        let due: Vec<String> = match client
109            .cmd("ZRANGEBYSCORE")
110            .arg(&resets_key)
111            .arg("-inf")
112            .arg(now_ms.to_string().as_str())
113            .arg("LIMIT")
114            .arg("0")
115            .arg(BATCH_SIZE.to_string().as_str())
116            .execute()
117            .await
118        {
119            Ok(ids) => ids,
120            Err(e) => {
121                tracing::warn!(partition, error = %e, "budget_reset: ZRANGEBYSCORE failed");
122                return ScanResult { processed: 0, errors: 1 };
123            }
124        };
125
126        if due.is_empty() {
127            return ScanResult { processed: 0, errors: 0 };
128        }
129
130        let mut processed: u32 = 0;
131        let mut errors: u32 = 0;
132
133        for budget_id_str in &due {
134            let res = if let Some(ref backend) = self.backend {
135                // PR-7b Cluster 2: trait-routed reset. Valkey wraps
136                // `ff_reset_budget` (same KEYS/ARGV as the legacy path);
137                // Postgres + SQLite run their per-row reset tx mirroring
138                // the Lua semantic (`budget::reset_budget_impl`).
139                let Ok(bid) = BudgetId::parse(budget_id_str) else {
140                    tracing::warn!(
141                        partition,
142                        budget_id = budget_id_str.as_str(),
143                        "budget_reset: malformed budget id; skipping"
144                    );
145                    errors += 1;
146                    continue;
147                };
148                backend
149                    .reset_budget(ResetBudgetArgs {
150                        budget_id: bid,
151                        now: TimestampMs::from_millis(now_ms as i64),
152                    })
153                    .await
154                    .map(|_| ())
155                    .map_err(|e| e.to_string())
156            } else {
157                // Test-only fallback: direct FCALL on the scanner client.
158                // Mirrors the cluster-1 lease_expiry pattern — preserves
159                // pre-trait-routing unit tests that construct the scanner
160                // without a backend.
161                //
162                // KEYS (3): budget_def, budget_usage, budget_resets_zset
163                // ARGV (2): budget_id, now_ms
164                let budget_def = format!("ff:budget:{}:{}", tag, budget_id_str);
165                let budget_usage = format!("ff:budget:{}:{}:usage", tag, budget_id_str);
166                let keys: [&str; 3] = [&budget_def, &budget_usage, &resets_key];
167                let now_s = now_ms.to_string();
168                let argv: [&str; 2] = [budget_id_str.as_str(), &now_s];
169                client
170                    .fcall::<ferriskey::Value>("ff_reset_budget", &keys, &argv)
171                    .await
172                    .map(|_| ())
173                    .map_err(|e| e.to_string())
174            };
175
176            match res {
177                Ok(()) => processed += 1,
178                Err(e) => {
179                    tracing::warn!(
180                        partition,
181                        budget_id = budget_id_str.as_str(),
182                        error = %e,
183                        "budget_reset: reset_budget failed"
184                    );
185                    errors += 1;
186                }
187            }
188        }
189
190        ScanResult { processed, errors }
191    }
192}