Skip to main content

ff_engine/scanner/
quota_reconciler.rs

1//! Quota concurrency reconciler.
2//!
3//! Periodically scans quota partitions to correct drift on concurrency
4//! counters and clean expired entries from sliding window ZSETs.
5//!
6//! Concurrency counters drift because INCR (on lease acquire) and DECR
7//! (on lease release) happen on different partitions and are not atomic
8//! with each other.
9//!
10//! Cluster-safe: uses SMEMBERS on indexed SETs instead of SCAN.
11//!
12//! Reference: RFC-008 §Quota Reconciliation, RFC-010 §6.6
13
14use std::time::Duration;
15
16use ff_core::backend::ScannerFilter;
17use ff_core::keys;
18use ff_core::partition::{Partition, PartitionFamily};
19
20use super::{ScanResult, Scanner};
21
22pub struct QuotaReconciler {
23    interval: Duration,
24    /// Issue #122: accepted for uniform API; not applied.
25    filter: ScannerFilter,
26}
27
28impl QuotaReconciler {
29    pub fn new(interval: Duration) -> Self {
30        Self::with_filter(interval, ScannerFilter::default())
31    }
32
33    /// Accepts a [`ScannerFilter`] for uniform construction across
34    /// all scanners (issue #122) but **does not apply it**. This
35    /// scanner iterates quota policies — not executions — and the
36    /// `namespace` / `instance_tag` filter dimensions do not map
37    /// onto quota partitions.
38    pub fn with_filter(interval: Duration, filter: ScannerFilter) -> Self {
39        Self { interval, filter }
40    }
41}
42
43impl Scanner for QuotaReconciler {
44    fn name(&self) -> &'static str {
45        "quota_reconciler"
46    }
47
48    fn interval(&self) -> Duration {
49        self.interval
50    }
51
52    fn filter(&self) -> &ScannerFilter {
53        &self.filter
54    }
55
56    async fn scan_partition(
57        &self,
58        client: &ferriskey::Client,
59        partition: u16,
60    ) -> ScanResult {
61        let p = Partition {
62            family: PartitionFamily::Quota,
63            index: partition,
64        };
65        let tag = p.hash_tag();
66
67        let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
68            Ok(t) => t,
69            Err(e) => {
70                tracing::warn!(partition, error = %e, "quota_reconciler: failed to get server time");
71                return ScanResult { processed: 0, errors: 1 };
72            }
73        };
74
75        // Discover quota policies via partition-level index SET (cluster-safe)
76        let policies_key = keys::quota_policies_index(&tag);
77        let quota_ids: Vec<String> = match client
78            .cmd("SMEMBERS")
79            .arg(&policies_key)
80            .execute()
81            .await
82        {
83            Ok(ids) => ids,
84            Err(e) => {
85                tracing::warn!(partition, error = %e, "quota_reconciler: SMEMBERS failed");
86                return ScanResult { processed: 0, errors: 1 };
87            }
88        };
89
90        if quota_ids.is_empty() {
91            return ScanResult { processed: 0, errors: 0 };
92        }
93
94        let mut processed: u32 = 0;
95        let mut errors: u32 = 0;
96
97        for qid in &quota_ids {
98            match reconcile_one_quota(client, &tag, qid, now_ms).await {
99                Ok(true) => processed += 1,
100                Ok(false) => {} // nothing to do
101                Err(e) => {
102                    tracing::warn!(
103                        partition,
104                        quota_id = qid.as_str(),
105                        error = %e,
106                        "quota_reconciler: reconcile failed"
107                    );
108                    errors += 1;
109                }
110            }
111        }
112
113        ScanResult { processed, errors }
114    }
115}
116
117/// Reconcile one quota policy. Returns Ok(true) if something was cleaned.
118async fn reconcile_one_quota(
119    client: &ferriskey::Client,
120    tag: &str,
121    quota_id: &str,
122    now_ms: u64,
123) -> Result<bool, ferriskey::Error> {
124    let mut did_work = false;
125
126    // 1. Read quota definition to find rate-limit window dimensions.
127    let def_key = format!("ff:quota:{}:{}", tag, quota_id);
128    let window_secs: Option<String> = client
129        .cmd("HGET")
130        .arg(&def_key)
131        .arg("requests_per_window_seconds")
132        .execute()
133        .await?;
134
135    // 2. Clean expired entries from the requests_per_window sliding window ZSET
136    if let Some(ref ws) = window_secs
137        && let Ok(secs) = ws.parse::<u64>()
138        && secs > 0
139    {
140        let window_ms = secs * 1000;
141        let window_key =
142            format!("ff:quota:{}:{}:window:requests_per_window", tag, quota_id);
143        let cutoff = now_ms.saturating_sub(window_ms);
144
145        let removed: u32 = client
146            .cmd("ZREMRANGEBYSCORE")
147            .arg(&window_key)
148            .arg("-inf")
149            .arg(cutoff.to_string().as_str())
150            .execute()
151            .await
152            .unwrap_or(0);
153
154        if removed > 0 {
155            did_work = true;
156            tracing::debug!(
157                quota_id,
158                removed,
159                "quota_reconciler: trimmed expired window entries"
160            );
161        }
162    }
163
164    // 3. Reconcile concurrency counter (if quota has concurrency cap)
165    //
166    // Strategy: read admitted_set (SMEMBERS), check each guard key (EXISTS).
167    // If guard expired → SREM from set. Count live = true concurrency.
168    // SET counter to live count. No SCAN needed (cluster-safe).
169    let concurrency_cap: Option<String> = client
170        .cmd("HGET")
171        .arg(&def_key)
172        .arg("active_concurrency_cap")
173        .execute()
174        .await?;
175
176    if let Some(ref cap_str) = concurrency_cap
177        && let Ok(cap) = cap_str.parse::<u64>()
178        && cap > 0
179    {
180        let counter_key = format!("ff:quota:{}:{}:concurrency", tag, quota_id);
181        let admitted_set_key = format!("ff:quota:{}:{}:admitted_set", tag, quota_id);
182
183        // SSCAN the admitted set in batches (instead of unbounded SMEMBERS)
184        let mut live_count: u64 = 0;
185        let mut cursor = "0".to_string();
186        loop {
187            let result: ferriskey::Value = client
188                .cmd("SSCAN")
189                .arg(&admitted_set_key)
190                .arg(cursor.as_str())
191                .arg("COUNT")
192                .arg("100")
193                .execute()
194                .await?;
195
196            let (next_cursor, members) = parse_sscan_response(&result);
197
198            for eid in &members {
199                let guard_key = format!("ff:quota:{}:{}:admitted:{}", tag, quota_id, eid);
200                let exists: bool = client
201                    .exists(&guard_key)
202                    .await
203                    .unwrap_or(false);
204                if exists {
205                    live_count += 1;
206                } else {
207                    // Guard expired — clean up from admitted set
208                    let _: () = client
209                        .cmd("SREM")
210                        .arg(&admitted_set_key)
211                        .arg(eid.as_str())
212                        .execute()
213                        .await
214                        .unwrap_or_default();
215                }
216            }
217
218            cursor = next_cursor;
219            if cursor == "0" {
220                break;
221            }
222        }
223
224        // Read stored counter
225        let stored: Option<String> = client
226            .cmd("GET")
227            .arg(&counter_key)
228            .execute()
229            .await?;
230        let stored_count: i64 = stored
231            .as_deref()
232            .and_then(|s| s.parse().ok())
233            .unwrap_or(0);
234
235        // Correct if drifted
236        if stored_count != live_count as i64 {
237            let _: () = client
238                .cmd("SET")
239                .arg(&counter_key)
240                .arg(live_count.to_string().as_str())
241                .execute()
242                .await?;
243            tracing::info!(
244                quota_id,
245                stored = stored_count,
246                actual = live_count,
247                "quota_reconciler: corrected concurrency counter drift"
248            );
249            did_work = true;
250        }
251    }
252
253    Ok(did_work)
254}
255
256/// Parse SSCAN response: [cursor, [member1, member2, ...]]
257fn parse_sscan_response(val: &ferriskey::Value) -> (String, Vec<String>) {
258    let arr = match val {
259        ferriskey::Value::Array(a) if a.len() >= 2 => a,
260        _ => return ("0".to_string(), vec![]),
261    };
262
263    let cursor = match &arr[0] {
264        Ok(ferriskey::Value::BulkString(b)) => String::from_utf8_lossy(b).into_owned(),
265        Ok(ferriskey::Value::SimpleString(s)) => s.clone(),
266        _ => return ("0".to_string(), vec![]),
267    };
268
269    let mut members = Vec::new();
270    match &arr[1] {
271        Ok(ferriskey::Value::Array(inner)) => {
272            for item in inner {
273                if let Ok(ferriskey::Value::BulkString(b)) = item {
274                    members.push(String::from_utf8_lossy(b).into_owned());
275                }
276            }
277        }
278        Ok(ferriskey::Value::Set(inner)) => {
279            for item in inner {
280                if let ferriskey::Value::BulkString(b) = item {
281                    members.push(String::from_utf8_lossy(b).into_owned());
282                }
283            }
284        }
285        _ => {}
286    }
287
288    (cursor, members)
289}