ff_engine/scanner/
quota_reconciler.rs1use std::time::Duration;
15
16use ff_core::keys;
17use ff_core::partition::{Partition, PartitionFamily};
18
19use super::{ScanResult, Scanner};
20
21pub struct QuotaReconciler {
22 interval: Duration,
23}
24
25impl QuotaReconciler {
26 pub fn new(interval: Duration) -> Self {
27 Self { interval }
28 }
29}
30
31impl Scanner for QuotaReconciler {
32 fn name(&self) -> &'static str {
33 "quota_reconciler"
34 }
35
36 fn interval(&self) -> Duration {
37 self.interval
38 }
39
40 async fn scan_partition(
41 &self,
42 client: &ferriskey::Client,
43 partition: u16,
44 ) -> ScanResult {
45 let p = Partition {
46 family: PartitionFamily::Quota,
47 index: partition,
48 };
49 let tag = p.hash_tag();
50
51 let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
52 Ok(t) => t,
53 Err(e) => {
54 tracing::warn!(partition, error = %e, "quota_reconciler: failed to get server time");
55 return ScanResult { processed: 0, errors: 1 };
56 }
57 };
58
59 let policies_key = keys::quota_policies_index(&tag);
61 let quota_ids: Vec<String> = match client
62 .cmd("SMEMBERS")
63 .arg(&policies_key)
64 .execute()
65 .await
66 {
67 Ok(ids) => ids,
68 Err(e) => {
69 tracing::warn!(partition, error = %e, "quota_reconciler: SMEMBERS failed");
70 return ScanResult { processed: 0, errors: 1 };
71 }
72 };
73
74 if quota_ids.is_empty() {
75 return ScanResult { processed: 0, errors: 0 };
76 }
77
78 let mut processed: u32 = 0;
79 let mut errors: u32 = 0;
80
81 for qid in "a_ids {
82 match reconcile_one_quota(client, &tag, qid, now_ms).await {
83 Ok(true) => processed += 1,
84 Ok(false) => {} Err(e) => {
86 tracing::warn!(
87 partition,
88 quota_id = qid.as_str(),
89 error = %e,
90 "quota_reconciler: reconcile failed"
91 );
92 errors += 1;
93 }
94 }
95 }
96
97 ScanResult { processed, errors }
98 }
99}
100
101async fn reconcile_one_quota(
103 client: &ferriskey::Client,
104 tag: &str,
105 quota_id: &str,
106 now_ms: u64,
107) -> Result<bool, ferriskey::Error> {
108 let mut did_work = false;
109
110 let def_key = format!("ff:quota:{}:{}", tag, quota_id);
112 let window_secs: Option<String> = client
113 .cmd("HGET")
114 .arg(&def_key)
115 .arg("requests_per_window_seconds")
116 .execute()
117 .await?;
118
119 if let Some(ref ws) = window_secs
121 && let Ok(secs) = ws.parse::<u64>()
122 && secs > 0
123 {
124 let window_ms = secs * 1000;
125 let window_key =
126 format!("ff:quota:{}:{}:window:requests_per_window", tag, quota_id);
127 let cutoff = now_ms.saturating_sub(window_ms);
128
129 let removed: u32 = client
130 .cmd("ZREMRANGEBYSCORE")
131 .arg(&window_key)
132 .arg("-inf")
133 .arg(cutoff.to_string().as_str())
134 .execute()
135 .await
136 .unwrap_or(0);
137
138 if removed > 0 {
139 did_work = true;
140 tracing::debug!(
141 quota_id,
142 removed,
143 "quota_reconciler: trimmed expired window entries"
144 );
145 }
146 }
147
148 let concurrency_cap: Option<String> = client
154 .cmd("HGET")
155 .arg(&def_key)
156 .arg("active_concurrency_cap")
157 .execute()
158 .await?;
159
160 if let Some(ref cap_str) = concurrency_cap
161 && let Ok(cap) = cap_str.parse::<u64>()
162 && cap > 0
163 {
164 let counter_key = format!("ff:quota:{}:{}:concurrency", tag, quota_id);
165 let admitted_set_key = format!("ff:quota:{}:{}:admitted_set", tag, quota_id);
166
167 let mut live_count: u64 = 0;
169 let mut cursor = "0".to_string();
170 loop {
171 let result: ferriskey::Value = client
172 .cmd("SSCAN")
173 .arg(&admitted_set_key)
174 .arg(cursor.as_str())
175 .arg("COUNT")
176 .arg("100")
177 .execute()
178 .await?;
179
180 let (next_cursor, members) = parse_sscan_response(&result);
181
182 for eid in &members {
183 let guard_key = format!("ff:quota:{}:{}:admitted:{}", tag, quota_id, eid);
184 let exists: bool = client
185 .exists(&guard_key)
186 .await
187 .unwrap_or(false);
188 if exists {
189 live_count += 1;
190 } else {
191 let _: () = client
193 .cmd("SREM")
194 .arg(&admitted_set_key)
195 .arg(eid.as_str())
196 .execute()
197 .await
198 .unwrap_or_default();
199 }
200 }
201
202 cursor = next_cursor;
203 if cursor == "0" {
204 break;
205 }
206 }
207
208 let stored: Option<String> = client
210 .cmd("GET")
211 .arg(&counter_key)
212 .execute()
213 .await?;
214 let stored_count: i64 = stored
215 .as_deref()
216 .and_then(|s| s.parse().ok())
217 .unwrap_or(0);
218
219 if stored_count != live_count as i64 {
221 let _: () = client
222 .cmd("SET")
223 .arg(&counter_key)
224 .arg(live_count.to_string().as_str())
225 .execute()
226 .await?;
227 tracing::info!(
228 quota_id,
229 stored = stored_count,
230 actual = live_count,
231 "quota_reconciler: corrected concurrency counter drift"
232 );
233 did_work = true;
234 }
235 }
236
237 Ok(did_work)
238}
239
240fn parse_sscan_response(val: &ferriskey::Value) -> (String, Vec<String>) {
242 let arr = match val {
243 ferriskey::Value::Array(a) if a.len() >= 2 => a,
244 _ => return ("0".to_string(), vec![]),
245 };
246
247 let cursor = match &arr[0] {
248 Ok(ferriskey::Value::BulkString(b)) => String::from_utf8_lossy(b).into_owned(),
249 Ok(ferriskey::Value::SimpleString(s)) => s.clone(),
250 _ => return ("0".to_string(), vec![]),
251 };
252
253 let mut members = Vec::new();
254 match &arr[1] {
255 Ok(ferriskey::Value::Array(inner)) => {
256 for item in inner {
257 if let Ok(ferriskey::Value::BulkString(b)) = item {
258 members.push(String::from_utf8_lossy(b).into_owned());
259 }
260 }
261 }
262 Ok(ferriskey::Value::Set(inner)) => {
263 for item in inner {
264 if let ferriskey::Value::BulkString(b) = item {
265 members.push(String::from_utf8_lossy(b).into_owned());
266 }
267 }
268 }
269 _ => {}
270 }
271
272 (cursor, members)
273}