Skip to main content

ff_engine/scanner/
quota_reconciler.rs

1//! Quota concurrency reconciler.
2//!
3//! Periodically scans quota partitions to correct drift on concurrency
4//! counters and clean expired entries from sliding window ZSETs.
5//!
6//! Concurrency counters drift because INCR (on lease acquire) and DECR
7//! (on lease release) happen on different partitions and are not atomic
8//! with each other.
9//!
10//! Cluster-safe: uses SMEMBERS on indexed SETs instead of SCAN.
11//!
12//! Reference: RFC-008 §Quota Reconciliation, RFC-010 §6.6
13
14use std::time::Duration;
15
16use ff_core::keys;
17use ff_core::partition::{Partition, PartitionFamily};
18
19use super::{ScanResult, Scanner};
20
21pub struct QuotaReconciler {
22    interval: Duration,
23}
24
25impl QuotaReconciler {
26    pub fn new(interval: Duration) -> Self {
27        Self { interval }
28    }
29}
30
31impl Scanner for QuotaReconciler {
32    fn name(&self) -> &'static str {
33        "quota_reconciler"
34    }
35
36    fn interval(&self) -> Duration {
37        self.interval
38    }
39
40    async fn scan_partition(
41        &self,
42        client: &ferriskey::Client,
43        partition: u16,
44    ) -> ScanResult {
45        let p = Partition {
46            family: PartitionFamily::Quota,
47            index: partition,
48        };
49        let tag = p.hash_tag();
50
51        let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
52            Ok(t) => t,
53            Err(e) => {
54                tracing::warn!(partition, error = %e, "quota_reconciler: failed to get server time");
55                return ScanResult { processed: 0, errors: 1 };
56            }
57        };
58
59        // Discover quota policies via partition-level index SET (cluster-safe)
60        let policies_key = keys::quota_policies_index(&tag);
61        let quota_ids: Vec<String> = match client
62            .cmd("SMEMBERS")
63            .arg(&policies_key)
64            .execute()
65            .await
66        {
67            Ok(ids) => ids,
68            Err(e) => {
69                tracing::warn!(partition, error = %e, "quota_reconciler: SMEMBERS failed");
70                return ScanResult { processed: 0, errors: 1 };
71            }
72        };
73
74        if quota_ids.is_empty() {
75            return ScanResult { processed: 0, errors: 0 };
76        }
77
78        let mut processed: u32 = 0;
79        let mut errors: u32 = 0;
80
81        for qid in &quota_ids {
82            match reconcile_one_quota(client, &tag, qid, now_ms).await {
83                Ok(true) => processed += 1,
84                Ok(false) => {} // nothing to do
85                Err(e) => {
86                    tracing::warn!(
87                        partition,
88                        quota_id = qid.as_str(),
89                        error = %e,
90                        "quota_reconciler: reconcile failed"
91                    );
92                    errors += 1;
93                }
94            }
95        }
96
97        ScanResult { processed, errors }
98    }
99}
100
101/// Reconcile one quota policy. Returns Ok(true) if something was cleaned.
102async fn reconcile_one_quota(
103    client: &ferriskey::Client,
104    tag: &str,
105    quota_id: &str,
106    now_ms: u64,
107) -> Result<bool, ferriskey::Error> {
108    let mut did_work = false;
109
110    // 1. Read quota definition to find rate-limit window dimensions.
111    let def_key = format!("ff:quota:{}:{}", tag, quota_id);
112    let window_secs: Option<String> = client
113        .cmd("HGET")
114        .arg(&def_key)
115        .arg("requests_per_window_seconds")
116        .execute()
117        .await?;
118
119    // 2. Clean expired entries from the requests_per_window sliding window ZSET
120    if let Some(ref ws) = window_secs
121        && let Ok(secs) = ws.parse::<u64>()
122        && secs > 0
123    {
124        let window_ms = secs * 1000;
125        let window_key =
126            format!("ff:quota:{}:{}:window:requests_per_window", tag, quota_id);
127        let cutoff = now_ms.saturating_sub(window_ms);
128
129        let removed: u32 = client
130            .cmd("ZREMRANGEBYSCORE")
131            .arg(&window_key)
132            .arg("-inf")
133            .arg(cutoff.to_string().as_str())
134            .execute()
135            .await
136            .unwrap_or(0);
137
138        if removed > 0 {
139            did_work = true;
140            tracing::debug!(
141                quota_id,
142                removed,
143                "quota_reconciler: trimmed expired window entries"
144            );
145        }
146    }
147
148    // 3. Reconcile concurrency counter (if quota has concurrency cap)
149    //
150    // Strategy: read admitted_set (SMEMBERS), check each guard key (EXISTS).
151    // If guard expired → SREM from set. Count live = true concurrency.
152    // SET counter to live count. No SCAN needed (cluster-safe).
153    let concurrency_cap: Option<String> = client
154        .cmd("HGET")
155        .arg(&def_key)
156        .arg("active_concurrency_cap")
157        .execute()
158        .await?;
159
160    if let Some(ref cap_str) = concurrency_cap
161        && let Ok(cap) = cap_str.parse::<u64>()
162        && cap > 0
163    {
164        let counter_key = format!("ff:quota:{}:{}:concurrency", tag, quota_id);
165        let admitted_set_key = format!("ff:quota:{}:{}:admitted_set", tag, quota_id);
166
167        // SSCAN the admitted set in batches (instead of unbounded SMEMBERS)
168        let mut live_count: u64 = 0;
169        let mut cursor = "0".to_string();
170        loop {
171            let result: ferriskey::Value = client
172                .cmd("SSCAN")
173                .arg(&admitted_set_key)
174                .arg(cursor.as_str())
175                .arg("COUNT")
176                .arg("100")
177                .execute()
178                .await?;
179
180            let (next_cursor, members) = parse_sscan_response(&result);
181
182            for eid in &members {
183                let guard_key = format!("ff:quota:{}:{}:admitted:{}", tag, quota_id, eid);
184                let exists: bool = client
185                    .exists(&guard_key)
186                    .await
187                    .unwrap_or(false);
188                if exists {
189                    live_count += 1;
190                } else {
191                    // Guard expired — clean up from admitted set
192                    let _: () = client
193                        .cmd("SREM")
194                        .arg(&admitted_set_key)
195                        .arg(eid.as_str())
196                        .execute()
197                        .await
198                        .unwrap_or_default();
199                }
200            }
201
202            cursor = next_cursor;
203            if cursor == "0" {
204                break;
205            }
206        }
207
208        // Read stored counter
209        let stored: Option<String> = client
210            .cmd("GET")
211            .arg(&counter_key)
212            .execute()
213            .await?;
214        let stored_count: i64 = stored
215            .as_deref()
216            .and_then(|s| s.parse().ok())
217            .unwrap_or(0);
218
219        // Correct if drifted
220        if stored_count != live_count as i64 {
221            let _: () = client
222                .cmd("SET")
223                .arg(&counter_key)
224                .arg(live_count.to_string().as_str())
225                .execute()
226                .await?;
227            tracing::info!(
228                quota_id,
229                stored = stored_count,
230                actual = live_count,
231                "quota_reconciler: corrected concurrency counter drift"
232            );
233            did_work = true;
234        }
235    }
236
237    Ok(did_work)
238}
239
240/// Parse SSCAN response: [cursor, [member1, member2, ...]]
241fn parse_sscan_response(val: &ferriskey::Value) -> (String, Vec<String>) {
242    let arr = match val {
243        ferriskey::Value::Array(a) if a.len() >= 2 => a,
244        _ => return ("0".to_string(), vec![]),
245    };
246
247    let cursor = match &arr[0] {
248        Ok(ferriskey::Value::BulkString(b)) => String::from_utf8_lossy(b).into_owned(),
249        Ok(ferriskey::Value::SimpleString(s)) => s.clone(),
250        _ => return ("0".to_string(), vec![]),
251    };
252
253    let mut members = Vec::new();
254    match &arr[1] {
255        Ok(ferriskey::Value::Array(inner)) => {
256            for item in inner {
257                if let Ok(ferriskey::Value::BulkString(b)) = item {
258                    members.push(String::from_utf8_lossy(b).into_owned());
259                }
260            }
261        }
262        Ok(ferriskey::Value::Set(inner)) => {
263            for item in inner {
264                if let ferriskey::Value::BulkString(b) = item {
265                    members.push(String::from_utf8_lossy(b).into_owned());
266                }
267            }
268        }
269        _ => {}
270    }
271
272    (cursor, members)
273}