ff_engine/scanner/
budget_reconciler.rs1use std::time::Duration;
15
16use ff_core::backend::ScannerFilter;
17use ff_core::keys;
18use ff_core::partition::{Partition, PartitionFamily};
19
20use super::{ScanResult, Scanner};
21
22pub struct BudgetReconciler {
23 interval: Duration,
24 filter: ScannerFilter,
27}
28
29impl BudgetReconciler {
30 pub fn new(interval: Duration) -> Self {
31 Self::with_filter(interval, ScannerFilter::default())
32 }
33
34 pub fn with_filter(interval: Duration, filter: ScannerFilter) -> Self {
43 Self { interval, filter }
44 }
45}
46
47impl Scanner for BudgetReconciler {
48 fn name(&self) -> &'static str {
49 "budget_reconciler"
50 }
51
52 fn interval(&self) -> Duration {
53 self.interval
54 }
55
56 fn filter(&self) -> &ScannerFilter {
57 &self.filter
58 }
59
60 async fn scan_partition(
61 &self,
62 client: &ferriskey::Client,
63 partition: u16,
64 ) -> ScanResult {
65 let p = Partition {
66 family: PartitionFamily::Budget,
67 index: partition,
68 };
69 let tag = p.hash_tag();
70
71 let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
72 Ok(t) => t,
73 Err(e) => {
74 tracing::warn!(partition, error = %e, "budget_reconciler: failed to get server time");
75 return ScanResult { processed: 0, errors: 1 };
76 }
77 };
78
79 let policies_key = keys::budget_policies_index(&tag);
84 let mut processed: u32 = 0;
85 let mut errors: u32 = 0;
86 let mut cursor = "0".to_string();
87
88 loop {
89 let result: ferriskey::Value = match client
90 .cmd("SSCAN")
91 .arg(&policies_key)
92 .arg(cursor.as_str())
93 .arg("COUNT")
94 .arg("100")
95 .execute()
96 .await
97 {
98 Ok(v) => v,
99 Err(e) => {
100 tracing::warn!(partition, error = %e, "budget_reconciler: SSCAN failed");
101 return ScanResult { processed, errors: errors + 1 };
102 }
103 };
104
105 let (next_cursor, budget_ids) = parse_sscan_response(&result);
106
107 for bid in &budget_ids {
108 match reconcile_one_budget(client, &tag, &policies_key, bid, now_ms).await {
109 Ok(true) => processed += 1,
110 Ok(false) => {} Err(e) => {
112 tracing::warn!(
113 partition,
114 budget_id = bid.as_str(),
115 error = %e,
116 "budget_reconciler: reconcile failed"
117 );
118 errors += 1;
119 }
120 }
121 }
122
123 cursor = next_cursor;
124 if cursor == "0" {
125 break;
126 }
127 }
128
129 ScanResult { processed, errors }
130 }
131}
132
133async fn reconcile_one_budget(
135 client: &ferriskey::Client,
136 tag: &str,
137 policies_key: &str,
138 budget_id: &str,
139 now_ms: u64,
140) -> Result<bool, ferriskey::Error> {
141 let def_key = format!("ff:budget:{}:{}", tag, budget_id);
142 let usage_key = format!("ff:budget:{}:{}:usage", tag, budget_id);
143 let limits_key = format!("ff:budget:{}:{}:limits", tag, budget_id);
144
145 let def_raw: Vec<String> = client
154 .cmd("HGETALL")
155 .arg(&def_key)
156 .execute()
157 .await?;
158 if def_raw.is_empty() {
159 let _: Option<i64> = client
160 .cmd("SREM")
161 .arg(policies_key)
162 .arg(budget_id)
163 .execute()
164 .await
165 .unwrap_or(None);
166 return Ok(false);
167 }
168
169 let def_map = pairs_to_map(&def_raw);
170 let reset_interval = def_map.get("reset_interval_ms").copied();
171
172 if let Some(ri) = reset_interval
174 && !ri.is_empty()
175 && ri != "0"
176 {
177 return Ok(false);
178 }
179
180 let usage_raw: Vec<String> = client
185 .cmd("HGETALL")
186 .arg(&usage_key)
187 .execute()
188 .await?;
189
190 let limits_raw: Vec<String> = client
192 .cmd("HGETALL")
193 .arg(&limits_key)
194 .execute()
195 .await?;
196
197 if limits_raw.is_empty() {
198 return Ok(false); }
200
201 let usage = pairs_to_map(&usage_raw);
203 let limits = pairs_to_map(&limits_raw);
204
205 let mut any_breached = false;
210 for (field, limit_str) in &limits {
211 let dim = match field.strip_prefix("hard:") {
213 Some(d) => d,
214 None => continue, };
216 let limit: i64 = limit_str.parse().unwrap_or(i64::MAX);
217 if limit <= 0 {
218 continue; }
220 let current: i64 = usage
221 .get(dim)
222 .and_then(|v| v.parse().ok())
223 .unwrap_or(0);
224 if current > limit {
225 any_breached = true;
226 break;
227 }
228 }
229
230 let currently_breached = usage.contains_key("breached_at");
232
233 if any_breached && !currently_breached {
234 let _: () = client
236 .cmd("HSET")
237 .arg(&usage_key)
238 .arg("breached_at")
239 .arg(now_ms.to_string().as_str())
240 .execute()
241 .await?;
242 tracing::info!(budget_id, "budget_reconciler: marked budget as breached");
243 } else if !any_breached && currently_breached {
244 let _: u32 = client
246 .cmd("HDEL")
247 .arg(&usage_key)
248 .arg("breached_at")
249 .execute()
250 .await?;
251 tracing::info!(budget_id, "budget_reconciler: cleared budget breach");
252 }
253
254 Ok(any_breached != currently_breached) }
262
263fn pairs_to_map(flat: &[String]) -> std::collections::HashMap<&str, &str> {
264 let mut map = std::collections::HashMap::new();
265 let mut i = 0;
266 while i + 1 < flat.len() {
267 map.insert(flat[i].as_str(), flat[i + 1].as_str());
268 i += 2;
269 }
270 map
271}
272
273fn parse_sscan_response(val: &ferriskey::Value) -> (String, Vec<String>) {
277 let arr = match val {
278 ferriskey::Value::Array(a) if a.len() >= 2 => a,
279 _ => return ("0".to_string(), vec![]),
280 };
281
282 let cursor = match &arr[0] {
283 Ok(ferriskey::Value::BulkString(b)) => String::from_utf8_lossy(b).into_owned(),
284 Ok(ferriskey::Value::SimpleString(s)) => s.clone(),
285 _ => return ("0".to_string(), vec![]),
286 };
287
288 let mut members = Vec::new();
289 match &arr[1] {
290 Ok(ferriskey::Value::Array(inner)) => {
291 for item in inner {
292 if let Ok(ferriskey::Value::BulkString(b)) = item {
293 members.push(String::from_utf8_lossy(b).into_owned());
294 }
295 }
296 }
297 Ok(ferriskey::Value::Set(inner)) => {
298 for item in inner {
299 if let ferriskey::Value::BulkString(b) = item {
300 members.push(String::from_utf8_lossy(b).into_owned());
301 }
302 }
303 }
304 _ => {}
305 }
306
307 (cursor, members)
308}
309