ff_engine/scanner/
budget_reset.rs1use std::sync::Arc;
11use std::time::Duration;
12
13use ff_core::backend::ScannerFilter;
14use ff_core::contracts::ResetBudgetArgs;
15use ff_core::engine_backend::EngineBackend;
16use ff_core::keys::budget_resets_key;
17use ff_core::partition::{Partition, PartitionFamily};
18use ff_core::types::{BudgetId, TimestampMs};
19
20use super::{ScanResult, Scanner};
21
22const BATCH_SIZE: u32 = 20;
23
24pub struct BudgetResetScanner {
25 interval: Duration,
26 filter: ScannerFilter,
28 backend: Option<Arc<dyn EngineBackend>>,
33}
34
35impl BudgetResetScanner {
36 pub fn new(interval: Duration) -> Self {
37 Self::with_filter(interval, ScannerFilter::default())
38 }
39
40 pub fn with_filter(interval: Duration, filter: ScannerFilter) -> Self {
46 Self {
47 interval,
48 filter,
49 backend: None,
50 }
51 }
52
53 pub fn with_filter_and_backend(
57 interval: Duration,
58 filter: ScannerFilter,
59 backend: Arc<dyn EngineBackend>,
60 ) -> Self {
61 Self {
62 interval,
63 filter,
64 backend: Some(backend),
65 }
66 }
67}
68
69impl Scanner for BudgetResetScanner {
70 fn name(&self) -> &'static str {
71 "budget_reset"
72 }
73
74 fn interval(&self) -> Duration {
75 self.interval
76 }
77
78 fn filter(&self) -> &ScannerFilter {
79 &self.filter
80 }
81
82 async fn scan_partition(
83 &self,
84 client: &ferriskey::Client,
85 partition: u16,
86 ) -> ScanResult {
87 let p = Partition {
88 family: PartitionFamily::Budget,
89 index: partition,
90 };
91 let tag = p.hash_tag();
92 let resets_key = budget_resets_key(&tag);
93
94 let now_ms_res: Result<u64, String> = if let Some(ref b) = self.backend {
95 b.server_time_ms().await.map_err(|e| e.to_string())
96 } else {
97 crate::scanner::lease_expiry::server_time_ms_legacy(client).await.map_err(|e| e.to_string())
98 };
99 let now_ms = match now_ms_res {
100 Ok(t) => t,
101 Err(e) => {
102 tracing::warn!(partition, error = %e, "budget_reset: failed to get server time");
103 return ScanResult { processed: 0, errors: 1 };
104 }
105 };
106
107 let due: Vec<String> = match client
109 .cmd("ZRANGEBYSCORE")
110 .arg(&resets_key)
111 .arg("-inf")
112 .arg(now_ms.to_string().as_str())
113 .arg("LIMIT")
114 .arg("0")
115 .arg(BATCH_SIZE.to_string().as_str())
116 .execute()
117 .await
118 {
119 Ok(ids) => ids,
120 Err(e) => {
121 tracing::warn!(partition, error = %e, "budget_reset: ZRANGEBYSCORE failed");
122 return ScanResult { processed: 0, errors: 1 };
123 }
124 };
125
126 if due.is_empty() {
127 return ScanResult { processed: 0, errors: 0 };
128 }
129
130 let mut processed: u32 = 0;
131 let mut errors: u32 = 0;
132
133 for budget_id_str in &due {
134 let res = if let Some(ref backend) = self.backend {
135 let Ok(bid) = BudgetId::parse(budget_id_str) else {
140 tracing::warn!(
141 partition,
142 budget_id = budget_id_str.as_str(),
143 "budget_reset: malformed budget id; skipping"
144 );
145 errors += 1;
146 continue;
147 };
148 backend
149 .reset_budget(ResetBudgetArgs {
150 budget_id: bid,
151 now: TimestampMs::from_millis(now_ms as i64),
152 })
153 .await
154 .map(|_| ())
155 .map_err(|e| e.to_string())
156 } else {
157 let budget_def = format!("ff:budget:{}:{}", tag, budget_id_str);
165 let budget_usage = format!("ff:budget:{}:{}:usage", tag, budget_id_str);
166 let keys: [&str; 3] = [&budget_def, &budget_usage, &resets_key];
167 let now_s = now_ms.to_string();
168 let argv: [&str; 2] = [budget_id_str.as_str(), &now_s];
169 client
170 .fcall::<ferriskey::Value>("ff_reset_budget", &keys, &argv)
171 .await
172 .map(|_| ())
173 .map_err(|e| e.to_string())
174 };
175
176 match res {
177 Ok(()) => processed += 1,
178 Err(e) => {
179 tracing::warn!(
180 partition,
181 budget_id = budget_id_str.as_str(),
182 error = %e,
183 "budget_reset: reset_budget failed"
184 );
185 errors += 1;
186 }
187 }
188 }
189
190 ScanResult { processed, errors }
191 }
192}