Skip to main content

ff_engine/scanner/
budget_reset.rs

1//! Budget reset scanner.
2//!
3//! Scans `ff:idx:{b:M}:budget_resets` per budget partition, finding budgets
4//! whose `next_reset_at` score is <= now. For each, calls
5//! `FCALL ff_reset_budget` which zeroes usage counters, records the reset
6//! event, computes the next reset time, and re-scores in the index.
7//!
8//! Reference: RFC-008 §Budget Reset, RFC-010 §6.11
9
10use std::time::Duration;
11
12use ff_core::keys::budget_resets_key;
13use ff_core::partition::{Partition, PartitionFamily};
14
15use super::{ScanResult, Scanner};
16
17const BATCH_SIZE: u32 = 20;
18
19pub struct BudgetResetScanner {
20    interval: Duration,
21}
22
23impl BudgetResetScanner {
24    pub fn new(interval: Duration) -> Self {
25        Self { interval }
26    }
27}
28
29impl Scanner for BudgetResetScanner {
30    fn name(&self) -> &'static str {
31        "budget_reset"
32    }
33
34    fn interval(&self) -> Duration {
35        self.interval
36    }
37
38    async fn scan_partition(
39        &self,
40        client: &ferriskey::Client,
41        partition: u16,
42    ) -> ScanResult {
43        let p = Partition {
44            family: PartitionFamily::Budget,
45            index: partition,
46        };
47        let tag = p.hash_tag();
48        let resets_key = budget_resets_key(&tag);
49
50        let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
51            Ok(t) => t,
52            Err(e) => {
53                tracing::warn!(partition, error = %e, "budget_reset: failed to get server time");
54                return ScanResult { processed: 0, errors: 1 };
55            }
56        };
57
58        // ZRANGEBYSCORE budget_resets -inf now LIMIT 0 batch_size
59        let due: Vec<String> = match client
60            .cmd("ZRANGEBYSCORE")
61            .arg(&resets_key)
62            .arg("-inf")
63            .arg(now_ms.to_string().as_str())
64            .arg("LIMIT")
65            .arg("0")
66            .arg(BATCH_SIZE.to_string().as_str())
67            .execute()
68            .await
69        {
70            Ok(ids) => ids,
71            Err(e) => {
72                tracing::warn!(partition, error = %e, "budget_reset: ZRANGEBYSCORE failed");
73                return ScanResult { processed: 0, errors: 1 };
74            }
75        };
76
77        if due.is_empty() {
78            return ScanResult { processed: 0, errors: 0 };
79        }
80
81        let mut processed: u32 = 0;
82        let mut errors: u32 = 0;
83
84        for budget_id_str in &due {
85            // FCALL ff_reset_budget on {b:M}
86            // KEYS (3): budget_def, budget_usage, budget_resets_zset
87            // ARGV (1): budget_id
88            let budget_def = format!("ff:budget:{}:{}", tag, budget_id_str);
89            let budget_usage = format!("ff:budget:{}:{}:usage", tag, budget_id_str);
90
91            let keys: [&str; 3] = [&budget_def, &budget_usage, &resets_key];
92            let now_s = now_ms.to_string();
93            let argv: [&str; 2] = [budget_id_str.as_str(), &now_s];
94
95            match client
96                .fcall::<ferriskey::Value>("ff_reset_budget", &keys, &argv)
97                .await
98            {
99                Ok(_) => processed += 1,
100                Err(e) => {
101                    tracing::warn!(
102                        partition,
103                        budget_id = budget_id_str.as_str(),
104                        error = %e,
105                        "budget_reset: ff_reset_budget failed"
106                    );
107                    errors += 1;
108                }
109            }
110        }
111
112        ScanResult { processed, errors }
113    }
114}