Skip to main content

ff_engine/scanner/
budget_reset.rs

1//! Budget reset scanner.
2//!
3//! Scans `ff:idx:{b:M}:budget_resets` per budget partition, finding budgets
4//! whose `next_reset_at` score is <= now. For each, calls
5//! `FCALL ff_reset_budget` which zeroes usage counters, records the reset
6//! event, computes the next reset time, and re-scores in the index.
7//!
8//! Reference: RFC-008 §Budget Reset, RFC-010 §6.11
9
10use std::time::Duration;
11
12use ff_core::backend::ScannerFilter;
13use ff_core::keys::budget_resets_key;
14use ff_core::partition::{Partition, PartitionFamily};
15
16use super::{ScanResult, Scanner};
17
18const BATCH_SIZE: u32 = 20;
19
20pub struct BudgetResetScanner {
21    interval: Duration,
22    /// Issue #122: accepted for uniform API; not applied.
23    filter: ScannerFilter,
24}
25
26impl BudgetResetScanner {
27    pub fn new(interval: Duration) -> Self {
28        Self::with_filter(interval, ScannerFilter::default())
29    }
30
31    /// Accepts a [`ScannerFilter`] for uniform construction across
32    /// all scanners (issue #122) but **does not apply it**. This
33    /// scanner iterates budget IDs — not executions — and the
34    /// `namespace` / `instance_tag` filter dimensions do not map
35    /// onto budget partitions.
36    pub fn with_filter(interval: Duration, filter: ScannerFilter) -> Self {
37        Self { interval, filter }
38    }
39}
40
41impl Scanner for BudgetResetScanner {
42    fn name(&self) -> &'static str {
43        "budget_reset"
44    }
45
46    fn interval(&self) -> Duration {
47        self.interval
48    }
49
50    fn filter(&self) -> &ScannerFilter {
51        &self.filter
52    }
53
54    async fn scan_partition(
55        &self,
56        client: &ferriskey::Client,
57        partition: u16,
58    ) -> ScanResult {
59        let p = Partition {
60            family: PartitionFamily::Budget,
61            index: partition,
62        };
63        let tag = p.hash_tag();
64        let resets_key = budget_resets_key(&tag);
65
66        let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
67            Ok(t) => t,
68            Err(e) => {
69                tracing::warn!(partition, error = %e, "budget_reset: failed to get server time");
70                return ScanResult { processed: 0, errors: 1 };
71            }
72        };
73
74        // ZRANGEBYSCORE budget_resets -inf now LIMIT 0 batch_size
75        let due: Vec<String> = match client
76            .cmd("ZRANGEBYSCORE")
77            .arg(&resets_key)
78            .arg("-inf")
79            .arg(now_ms.to_string().as_str())
80            .arg("LIMIT")
81            .arg("0")
82            .arg(BATCH_SIZE.to_string().as_str())
83            .execute()
84            .await
85        {
86            Ok(ids) => ids,
87            Err(e) => {
88                tracing::warn!(partition, error = %e, "budget_reset: ZRANGEBYSCORE failed");
89                return ScanResult { processed: 0, errors: 1 };
90            }
91        };
92
93        if due.is_empty() {
94            return ScanResult { processed: 0, errors: 0 };
95        }
96
97        let mut processed: u32 = 0;
98        let mut errors: u32 = 0;
99
100        for budget_id_str in &due {
101            // FCALL ff_reset_budget on {b:M}
102            // KEYS (3): budget_def, budget_usage, budget_resets_zset
103            // ARGV (1): budget_id
104            let budget_def = format!("ff:budget:{}:{}", tag, budget_id_str);
105            let budget_usage = format!("ff:budget:{}:{}:usage", tag, budget_id_str);
106
107            let keys: [&str; 3] = [&budget_def, &budget_usage, &resets_key];
108            let now_s = now_ms.to_string();
109            let argv: [&str; 2] = [budget_id_str.as_str(), &now_s];
110
111            match client
112                .fcall::<ferriskey::Value>("ff_reset_budget", &keys, &argv)
113                .await
114            {
115                Ok(_) => processed += 1,
116                Err(e) => {
117                    tracing::warn!(
118                        partition,
119                        budget_id = budget_id_str.as_str(),
120                        error = %e,
121                        "budget_reset: ff_reset_budget failed"
122                    );
123                    errors += 1;
124                }
125            }
126        }
127
128        ScanResult { processed, errors }
129    }
130}