ff_engine/scanner/
budget_reconciler.rs1use std::time::Duration;
15
16use ff_core::keys;
17use ff_core::partition::{Partition, PartitionFamily};
18
19use super::{ScanResult, Scanner};
20
21pub struct BudgetReconciler {
22 interval: Duration,
23}
24
25impl BudgetReconciler {
26 pub fn new(interval: Duration) -> Self {
27 Self { interval }
28 }
29}
30
31impl Scanner for BudgetReconciler {
32 fn name(&self) -> &'static str {
33 "budget_reconciler"
34 }
35
36 fn interval(&self) -> Duration {
37 self.interval
38 }
39
40 async fn scan_partition(
41 &self,
42 client: &ferriskey::Client,
43 partition: u16,
44 ) -> ScanResult {
45 let p = Partition {
46 family: PartitionFamily::Budget,
47 index: partition,
48 };
49 let tag = p.hash_tag();
50
51 let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
52 Ok(t) => t,
53 Err(e) => {
54 tracing::warn!(partition, error = %e, "budget_reconciler: failed to get server time");
55 return ScanResult { processed: 0, errors: 1 };
56 }
57 };
58
59 let policies_key = keys::budget_policies_index(&tag);
64 let mut processed: u32 = 0;
65 let mut errors: u32 = 0;
66 let mut cursor = "0".to_string();
67
68 loop {
69 let result: ferriskey::Value = match client
70 .cmd("SSCAN")
71 .arg(&policies_key)
72 .arg(cursor.as_str())
73 .arg("COUNT")
74 .arg("100")
75 .execute()
76 .await
77 {
78 Ok(v) => v,
79 Err(e) => {
80 tracing::warn!(partition, error = %e, "budget_reconciler: SSCAN failed");
81 return ScanResult { processed, errors: errors + 1 };
82 }
83 };
84
85 let (next_cursor, budget_ids) = parse_sscan_response(&result);
86
87 for bid in &budget_ids {
88 match reconcile_one_budget(client, &tag, &policies_key, bid, now_ms).await {
89 Ok(true) => processed += 1,
90 Ok(false) => {} Err(e) => {
92 tracing::warn!(
93 partition,
94 budget_id = bid.as_str(),
95 error = %e,
96 "budget_reconciler: reconcile failed"
97 );
98 errors += 1;
99 }
100 }
101 }
102
103 cursor = next_cursor;
104 if cursor == "0" {
105 break;
106 }
107 }
108
109 ScanResult { processed, errors }
110 }
111}
112
113async fn reconcile_one_budget(
115 client: &ferriskey::Client,
116 tag: &str,
117 policies_key: &str,
118 budget_id: &str,
119 now_ms: u64,
120) -> Result<bool, ferriskey::Error> {
121 let def_key = format!("ff:budget:{}:{}", tag, budget_id);
122 let usage_key = format!("ff:budget:{}:{}:usage", tag, budget_id);
123 let limits_key = format!("ff:budget:{}:{}:limits", tag, budget_id);
124
125 let def_raw: Vec<String> = client
128 .cmd("HGETALL")
129 .arg(&def_key)
130 .execute()
131 .await
132 .unwrap_or_default();
133 if def_raw.is_empty() {
134 let _: Option<i64> = client
135 .cmd("SREM")
136 .arg(policies_key)
137 .arg(budget_id)
138 .execute()
139 .await
140 .unwrap_or(None);
141 return Ok(false);
142 }
143
144 let def_map = pairs_to_map(&def_raw);
145 let reset_interval = def_map.get("reset_interval_ms").copied();
146
147 if let Some(ri) = reset_interval
149 && !ri.is_empty()
150 && ri != "0"
151 {
152 return Ok(false);
153 }
154
155 let usage_raw: Vec<String> = client
157 .cmd("HGETALL")
158 .arg(&usage_key)
159 .execute()
160 .await
161 .unwrap_or_default();
162
163 let limits_raw: Vec<String> = client
165 .cmd("HGETALL")
166 .arg(&limits_key)
167 .execute()
168 .await
169 .unwrap_or_default();
170
171 if limits_raw.is_empty() {
172 return Ok(false); }
174
175 let usage = pairs_to_map(&usage_raw);
177 let limits = pairs_to_map(&limits_raw);
178
179 let mut any_breached = false;
184 for (field, limit_str) in &limits {
185 let dim = match field.strip_prefix("hard:") {
187 Some(d) => d,
188 None => continue, };
190 let limit: i64 = limit_str.parse().unwrap_or(i64::MAX);
191 if limit <= 0 {
192 continue; }
194 let current: i64 = usage
195 .get(dim)
196 .and_then(|v| v.parse().ok())
197 .unwrap_or(0);
198 if current > limit {
199 any_breached = true;
200 break;
201 }
202 }
203
204 let currently_breached = usage.contains_key("breached_at");
206
207 if any_breached && !currently_breached {
208 let _: () = client
210 .cmd("HSET")
211 .arg(&usage_key)
212 .arg("breached_at")
213 .arg(now_ms.to_string().as_str())
214 .execute()
215 .await?;
216 tracing::info!(budget_id, "budget_reconciler: marked budget as breached");
217 } else if !any_breached && currently_breached {
218 let _: u32 = client
220 .cmd("HDEL")
221 .arg(&usage_key)
222 .arg("breached_at")
223 .execute()
224 .await?;
225 tracing::info!(budget_id, "budget_reconciler: cleared budget breach");
226 }
227
228 Ok(any_breached != currently_breached) }
236
237fn pairs_to_map(flat: &[String]) -> std::collections::HashMap<&str, &str> {
238 let mut map = std::collections::HashMap::new();
239 let mut i = 0;
240 while i + 1 < flat.len() {
241 map.insert(flat[i].as_str(), flat[i + 1].as_str());
242 i += 2;
243 }
244 map
245}
246
247fn parse_sscan_response(val: &ferriskey::Value) -> (String, Vec<String>) {
251 let arr = match val {
252 ferriskey::Value::Array(a) if a.len() >= 2 => a,
253 _ => return ("0".to_string(), vec![]),
254 };
255
256 let cursor = match &arr[0] {
257 Ok(ferriskey::Value::BulkString(b)) => String::from_utf8_lossy(b).into_owned(),
258 Ok(ferriskey::Value::SimpleString(s)) => s.clone(),
259 _ => return ("0".to_string(), vec![]),
260 };
261
262 let mut members = Vec::new();
263 match &arr[1] {
264 Ok(ferriskey::Value::Array(inner)) => {
265 for item in inner {
266 if let Ok(ferriskey::Value::BulkString(b)) = item {
267 members.push(String::from_utf8_lossy(b).into_owned());
268 }
269 }
270 }
271 Ok(ferriskey::Value::Set(inner)) => {
272 for item in inner {
273 if let ferriskey::Value::BulkString(b) = item {
274 members.push(String::from_utf8_lossy(b).into_owned());
275 }
276 }
277 }
278 _ => {}
279 }
280
281 (cursor, members)
282}
283