ff_engine/scanner/
quota_reconciler.rs1use std::time::Duration;
15
16use ff_core::backend::ScannerFilter;
17use ff_core::keys;
18use ff_core::partition::{Partition, PartitionFamily};
19
20use super::{ScanResult, Scanner};
21
22pub struct QuotaReconciler {
23 interval: Duration,
24 filter: ScannerFilter,
26}
27
28impl QuotaReconciler {
29 pub fn new(interval: Duration) -> Self {
30 Self::with_filter(interval, ScannerFilter::default())
31 }
32
33 pub fn with_filter(interval: Duration, filter: ScannerFilter) -> Self {
39 Self { interval, filter }
40 }
41}
42
43impl Scanner for QuotaReconciler {
44 fn name(&self) -> &'static str {
45 "quota_reconciler"
46 }
47
48 fn interval(&self) -> Duration {
49 self.interval
50 }
51
52 fn filter(&self) -> &ScannerFilter {
53 &self.filter
54 }
55
56 async fn scan_partition(
57 &self,
58 client: &ferriskey::Client,
59 partition: u16,
60 ) -> ScanResult {
61 let p = Partition {
62 family: PartitionFamily::Quota,
63 index: partition,
64 };
65 let tag = p.hash_tag();
66
67 let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
68 Ok(t) => t,
69 Err(e) => {
70 tracing::warn!(partition, error = %e, "quota_reconciler: failed to get server time");
71 return ScanResult { processed: 0, errors: 1 };
72 }
73 };
74
75 let policies_key = keys::quota_policies_index(&tag);
77 let quota_ids: Vec<String> = match client
78 .cmd("SMEMBERS")
79 .arg(&policies_key)
80 .execute()
81 .await
82 {
83 Ok(ids) => ids,
84 Err(e) => {
85 tracing::warn!(partition, error = %e, "quota_reconciler: SMEMBERS failed");
86 return ScanResult { processed: 0, errors: 1 };
87 }
88 };
89
90 if quota_ids.is_empty() {
91 return ScanResult { processed: 0, errors: 0 };
92 }
93
94 let mut processed: u32 = 0;
95 let mut errors: u32 = 0;
96
97 for qid in "a_ids {
98 match reconcile_one_quota(client, &tag, qid, now_ms).await {
99 Ok(true) => processed += 1,
100 Ok(false) => {} Err(e) => {
102 tracing::warn!(
103 partition,
104 quota_id = qid.as_str(),
105 error = %e,
106 "quota_reconciler: reconcile failed"
107 );
108 errors += 1;
109 }
110 }
111 }
112
113 ScanResult { processed, errors }
114 }
115}
116
117async fn reconcile_one_quota(
119 client: &ferriskey::Client,
120 tag: &str,
121 quota_id: &str,
122 now_ms: u64,
123) -> Result<bool, ferriskey::Error> {
124 let mut did_work = false;
125
126 let def_key = format!("ff:quota:{}:{}", tag, quota_id);
128 let window_secs: Option<String> = client
129 .cmd("HGET")
130 .arg(&def_key)
131 .arg("requests_per_window_seconds")
132 .execute()
133 .await?;
134
135 if let Some(ref ws) = window_secs
137 && let Ok(secs) = ws.parse::<u64>()
138 && secs > 0
139 {
140 let window_ms = secs * 1000;
141 let window_key =
142 format!("ff:quota:{}:{}:window:requests_per_window", tag, quota_id);
143 let cutoff = now_ms.saturating_sub(window_ms);
144
145 let removed: u32 = client
146 .cmd("ZREMRANGEBYSCORE")
147 .arg(&window_key)
148 .arg("-inf")
149 .arg(cutoff.to_string().as_str())
150 .execute()
151 .await
152 .unwrap_or(0);
153
154 if removed > 0 {
155 did_work = true;
156 tracing::debug!(
157 quota_id,
158 removed,
159 "quota_reconciler: trimmed expired window entries"
160 );
161 }
162 }
163
164 let concurrency_cap: Option<String> = client
170 .cmd("HGET")
171 .arg(&def_key)
172 .arg("active_concurrency_cap")
173 .execute()
174 .await?;
175
176 if let Some(ref cap_str) = concurrency_cap
177 && let Ok(cap) = cap_str.parse::<u64>()
178 && cap > 0
179 {
180 let counter_key = format!("ff:quota:{}:{}:concurrency", tag, quota_id);
181 let admitted_set_key = format!("ff:quota:{}:{}:admitted_set", tag, quota_id);
182
183 let mut live_count: u64 = 0;
185 let mut cursor = "0".to_string();
186 loop {
187 let result: ferriskey::Value = client
188 .cmd("SSCAN")
189 .arg(&admitted_set_key)
190 .arg(cursor.as_str())
191 .arg("COUNT")
192 .arg("100")
193 .execute()
194 .await?;
195
196 let (next_cursor, members) = parse_sscan_response(&result);
197
198 for eid in &members {
199 let guard_key = format!("ff:quota:{}:{}:admitted:{}", tag, quota_id, eid);
200 let exists: bool = client
201 .exists(&guard_key)
202 .await
203 .unwrap_or(false);
204 if exists {
205 live_count += 1;
206 } else {
207 let _: () = client
209 .cmd("SREM")
210 .arg(&admitted_set_key)
211 .arg(eid.as_str())
212 .execute()
213 .await
214 .unwrap_or_default();
215 }
216 }
217
218 cursor = next_cursor;
219 if cursor == "0" {
220 break;
221 }
222 }
223
224 let stored: Option<String> = client
226 .cmd("GET")
227 .arg(&counter_key)
228 .execute()
229 .await?;
230 let stored_count: i64 = stored
231 .as_deref()
232 .and_then(|s| s.parse().ok())
233 .unwrap_or(0);
234
235 if stored_count != live_count as i64 {
237 let _: () = client
238 .cmd("SET")
239 .arg(&counter_key)
240 .arg(live_count.to_string().as_str())
241 .execute()
242 .await?;
243 tracing::info!(
244 quota_id,
245 stored = stored_count,
246 actual = live_count,
247 "quota_reconciler: corrected concurrency counter drift"
248 );
249 did_work = true;
250 }
251 }
252
253 Ok(did_work)
254}
255
256fn parse_sscan_response(val: &ferriskey::Value) -> (String, Vec<String>) {
258 let arr = match val {
259 ferriskey::Value::Array(a) if a.len() >= 2 => a,
260 _ => return ("0".to_string(), vec![]),
261 };
262
263 let cursor = match &arr[0] {
264 Ok(ferriskey::Value::BulkString(b)) => String::from_utf8_lossy(b).into_owned(),
265 Ok(ferriskey::Value::SimpleString(s)) => s.clone(),
266 _ => return ("0".to_string(), vec![]),
267 };
268
269 let mut members = Vec::new();
270 match &arr[1] {
271 Ok(ferriskey::Value::Array(inner)) => {
272 for item in inner {
273 if let Ok(ferriskey::Value::BulkString(b)) = item {
274 members.push(String::from_utf8_lossy(b).into_owned());
275 }
276 }
277 }
278 Ok(ferriskey::Value::Set(inner)) => {
279 for item in inner {
280 if let ferriskey::Value::BulkString(b) = item {
281 members.push(String::from_utf8_lossy(b).into_owned());
282 }
283 }
284 }
285 _ => {}
286 }
287
288 (cursor, members)
289}