Skip to main content

ff_engine/scanner/
budget_reconciler.rs

1//! Budget counter reconciler.
2//!
3//! Periodically scans budget partitions to detect and correct drift on
4//! lifetime (non-resetting) budget usage counters. Also checks breach
5//! status against hard limits and cleans stale execution references.
6//!
7//! Budgets with `reset_policy` set are SKIPPED — resetting budgets cannot
8//! be reconciled by summing all-time usage (the sum would undo the reset).
9//!
10//! Cluster-safe: uses SMEMBERS on a partition-level index SET instead of SCAN.
11//!
12//! Reference: RFC-008 §Budget Reconciliation, RFC-010 §6.5
13
14use std::time::Duration;
15
16use ff_core::backend::ScannerFilter;
17use ff_core::keys;
18use ff_core::partition::{Partition, PartitionFamily};
19
20use super::{ScanResult, Scanner};
21
22pub struct BudgetReconciler {
23    interval: Duration,
24    /// Issue #122: accepted for uniform API; not applied. See
25    /// [`Self::with_filter`] rustdoc.
26    filter: ScannerFilter,
27}
28
29impl BudgetReconciler {
30    pub fn new(interval: Duration) -> Self {
31        Self::with_filter(interval, ScannerFilter::default())
32    }
33
34    /// Accepts a [`ScannerFilter`] for uniform construction across
35    /// all scanners (issue #122) but **does not apply it**. This
36    /// scanner iterates budget IDs — not executions — and the
37    /// `namespace` / `instance_tag` filter dimensions are keyed on
38    /// per-execution fields that do not map onto budget partitions.
39    /// The argument is retained so callers can hand the engine's
40    /// single `EngineConfig.scanner_filter` to every scanner
41    /// constructor without special-casing.
42    pub fn with_filter(interval: Duration, filter: ScannerFilter) -> Self {
43        Self { interval, filter }
44    }
45}
46
47impl Scanner for BudgetReconciler {
48    fn name(&self) -> &'static str {
49        "budget_reconciler"
50    }
51
52    fn interval(&self) -> Duration {
53        self.interval
54    }
55
56    fn filter(&self) -> &ScannerFilter {
57        &self.filter
58    }
59
60    async fn scan_partition(
61        &self,
62        client: &ferriskey::Client,
63        partition: u16,
64    ) -> ScanResult {
65        let p = Partition {
66            family: PartitionFamily::Budget,
67            index: partition,
68        };
69        let tag = p.hash_tag();
70
71        let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
72            Ok(t) => t,
73            Err(e) => {
74                tracing::warn!(partition, error = %e, "budget_reconciler: failed to get server time");
75                return ScanResult { processed: 0, errors: 1 };
76            }
77        };
78
79        // Discover budgets via partition-level index SET (cluster-safe).
80        // Stream in SSCAN batches so partitions with many budgets do not
81        // materialise the entire id list into one allocation and do not
82        // hold Valkey's single-threaded SMEMBERS for a large set.
83        let policies_key = keys::budget_policies_index(&tag);
84        let mut processed: u32 = 0;
85        let mut errors: u32 = 0;
86        let mut cursor = "0".to_string();
87
88        loop {
89            let result: ferriskey::Value = match client
90                .cmd("SSCAN")
91                .arg(&policies_key)
92                .arg(cursor.as_str())
93                .arg("COUNT")
94                .arg("100")
95                .execute()
96                .await
97            {
98                Ok(v) => v,
99                Err(e) => {
100                    tracing::warn!(partition, error = %e, "budget_reconciler: SSCAN failed");
101                    return ScanResult { processed, errors: errors + 1 };
102                }
103            };
104
105            let (next_cursor, budget_ids) = parse_sscan_response(&result);
106
107            for bid in &budget_ids {
108                match reconcile_one_budget(client, &tag, &policies_key, bid, now_ms).await {
109                    Ok(true) => processed += 1,
110                    Ok(false) => {} // skipped (resetting budget or no drift)
111                    Err(e) => {
112                        tracing::warn!(
113                            partition,
114                            budget_id = bid.as_str(),
115                            error = %e,
116                            "budget_reconciler: reconcile failed"
117                        );
118                        errors += 1;
119                    }
120                }
121            }
122
123            cursor = next_cursor;
124            if cursor == "0" {
125                break;
126            }
127        }
128
129        ScanResult { processed, errors }
130    }
131}
132
133/// Reconcile one budget. Returns Ok(true) if corrected, Ok(false) if skipped.
134async fn reconcile_one_budget(
135    client: &ferriskey::Client,
136    tag: &str,
137    policies_key: &str,
138    budget_id: &str,
139    now_ms: u64,
140) -> Result<bool, ferriskey::Error> {
141    let def_key = format!("ff:budget:{}:{}", tag, budget_id);
142    let usage_key = format!("ff:budget:{}:{}:usage", tag, budget_id);
143    let limits_key = format!("ff:budget:{}:{}:limits", tag, budget_id);
144
145    // Defensive prune: index entry for a budget whose definition is gone
146    // (manual delete / retention purge) — drop it so SMEMBERS stays correct.
147    //
148    // Fail-closed on transport errors: we MUST distinguish "key absent"
149    // (empty Vec) from "read failed" (Err). Pre-fix the unwrap_or_default
150    // collapsed them, so a transient WRONGTYPE / IoError / ClusterDown
151    // on the def key would SREM the budget from the partition index —
152    // durable metadata loss triggered by a momentary Valkey blip.
153    let def_raw: Vec<String> = client
154        .cmd("HGETALL")
155        .arg(&def_key)
156        .execute()
157        .await?;
158    if def_raw.is_empty() {
159        let _: Option<i64> = client
160            .cmd("SREM")
161            .arg(policies_key)
162            .arg(budget_id)
163            .execute()
164            .await
165            .unwrap_or(None);
166        return Ok(false);
167    }
168
169    let def_map = pairs_to_map(&def_raw);
170    let reset_interval = def_map.get("reset_interval_ms").copied();
171
172    // Skip resetting budgets — cannot reconcile by summing all-time usage
173    if let Some(ri) = reset_interval
174        && !ri.is_empty()
175        && ri != "0"
176    {
177        return Ok(false);
178    }
179
180    // Read current usage counters. Fail-closed on transport errors
181    // (same reasoning as def_key above): an empty Vec from a WRONGTYPE
182    // would make the subsequent breach comparison compute against zero
183    // and silently clear the `breached_at` marker below.
184    let usage_raw: Vec<String> = client
185        .cmd("HGETALL")
186        .arg(&usage_key)
187        .execute()
188        .await?;
189
190    // Read hard limits (same fail-closed treatment).
191    let limits_raw: Vec<String> = client
192        .cmd("HGETALL")
193        .arg(&limits_key)
194        .execute()
195        .await?;
196
197    if limits_raw.is_empty() {
198        return Ok(false); // No limits defined
199    }
200
201    // Parse usage and limits into dimension maps
202    let usage = pairs_to_map(&usage_raw);
203    let limits = pairs_to_map(&limits_raw);
204
205    // Check breach status against hard limits.
206    // Limits hash uses prefixed fields: "hard:tokens", "soft:tokens".
207    // Usage hash uses bare fields: "tokens".
208    // Must strip prefix before looking up in usage.
209    let mut any_breached = false;
210    for (field, limit_str) in &limits {
211        // Only check hard limits for breach detection
212        let dim = match field.strip_prefix("hard:") {
213            Some(d) => d,
214            None => continue, // skip soft limits and other fields
215        };
216        let limit: i64 = limit_str.parse().unwrap_or(i64::MAX);
217        if limit <= 0 {
218            continue; // 0 or negative means no limit
219        }
220        let current: i64 = usage
221            .get(dim)
222            .and_then(|v| v.parse().ok())
223            .unwrap_or(0);
224        if current > limit {
225            any_breached = true;
226            break;
227        }
228    }
229
230    // Reconcile breach marker
231    let currently_breached = usage.contains_key("breached_at");
232
233    if any_breached && !currently_breached {
234        // Mark breached
235        let _: () = client
236            .cmd("HSET")
237            .arg(&usage_key)
238            .arg("breached_at")
239            .arg(now_ms.to_string().as_str())
240            .execute()
241            .await?;
242        tracing::info!(budget_id, "budget_reconciler: marked budget as breached");
243    } else if !any_breached && currently_breached {
244        // Clear breach
245        let _: u32 = client
246            .cmd("HDEL")
247            .arg(&usage_key)
248            .arg("breached_at")
249            .execute()
250            .await?;
251        tracing::info!(budget_id, "budget_reconciler: cleared budget breach");
252    }
253
254    // TODO: cross-partition stale-execution cleanup for ff:budget:{b:M}:<id>:executions.
255    // Retention on {p:N} can't reach this {b:M} reverse index, so entries
256    // accumulate after executions are purged. Needs a partition-aware EXISTS
257    // check (parse UUID → execution_partition(config) → core key) — deferred
258    // to a later pass since it requires PartitionConfig plumbing here.
259
260    Ok(any_breached != currently_breached) // true if we corrected something
261}
262
263fn pairs_to_map(flat: &[String]) -> std::collections::HashMap<&str, &str> {
264    let mut map = std::collections::HashMap::new();
265    let mut i = 0;
266    while i + 1 < flat.len() {
267        map.insert(flat[i].as_str(), flat[i + 1].as_str());
268        i += 2;
269    }
270    map
271}
272
273/// Parse an SSCAN reply `[cursor, [member1, member2, ...]]` into
274/// `(cursor, Vec<member>)`. Mirrors the helper in quota_reconciler /
275/// flow_projector so all three scanners agree on the wire shape.
276fn parse_sscan_response(val: &ferriskey::Value) -> (String, Vec<String>) {
277    let arr = match val {
278        ferriskey::Value::Array(a) if a.len() >= 2 => a,
279        _ => return ("0".to_string(), vec![]),
280    };
281
282    let cursor = match &arr[0] {
283        Ok(ferriskey::Value::BulkString(b)) => String::from_utf8_lossy(b).into_owned(),
284        Ok(ferriskey::Value::SimpleString(s)) => s.clone(),
285        _ => return ("0".to_string(), vec![]),
286    };
287
288    let mut members = Vec::new();
289    match &arr[1] {
290        Ok(ferriskey::Value::Array(inner)) => {
291            for item in inner {
292                if let Ok(ferriskey::Value::BulkString(b)) = item {
293                    members.push(String::from_utf8_lossy(b).into_owned());
294                }
295            }
296        }
297        Ok(ferriskey::Value::Set(inner)) => {
298            for item in inner {
299                if let ferriskey::Value::BulkString(b) = item {
300                    members.push(String::from_utf8_lossy(b).into_owned());
301                }
302            }
303        }
304        _ => {}
305    }
306
307    (cursor, members)
308}
309