Skip to main content

ff_engine/scanner/
budget_reconciler.rs

1//! Budget counter reconciler.
2//!
3//! Periodically scans budget partitions to detect and correct drift on
4//! lifetime (non-resetting) budget usage counters. Also checks breach
5//! status against hard limits and cleans stale execution references.
6//!
7//! Budgets with `reset_policy` set are SKIPPED — resetting budgets cannot
8//! be reconciled by summing all-time usage (the sum would undo the reset).
9//!
10//! Cluster-safe: uses SMEMBERS on a partition-level index SET instead of SCAN.
11//!
12//! Reference: RFC-008 §Budget Reconciliation, RFC-010 §6.5
13
14use std::time::Duration;
15
16use ff_core::keys;
17use ff_core::partition::{Partition, PartitionFamily};
18
19use super::{ScanResult, Scanner};
20
21pub struct BudgetReconciler {
22    interval: Duration,
23}
24
25impl BudgetReconciler {
26    pub fn new(interval: Duration) -> Self {
27        Self { interval }
28    }
29}
30
31impl Scanner for BudgetReconciler {
32    fn name(&self) -> &'static str {
33        "budget_reconciler"
34    }
35
36    fn interval(&self) -> Duration {
37        self.interval
38    }
39
40    async fn scan_partition(
41        &self,
42        client: &ferriskey::Client,
43        partition: u16,
44    ) -> ScanResult {
45        let p = Partition {
46            family: PartitionFamily::Budget,
47            index: partition,
48        };
49        let tag = p.hash_tag();
50
51        let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
52            Ok(t) => t,
53            Err(e) => {
54                tracing::warn!(partition, error = %e, "budget_reconciler: failed to get server time");
55                return ScanResult { processed: 0, errors: 1 };
56            }
57        };
58
59        // Discover budgets via partition-level index SET (cluster-safe).
60        // Stream in SSCAN batches so partitions with many budgets do not
61        // materialise the entire id list into one allocation and do not
62        // hold Valkey's single-threaded SMEMBERS for a large set.
63        let policies_key = keys::budget_policies_index(&tag);
64        let mut processed: u32 = 0;
65        let mut errors: u32 = 0;
66        let mut cursor = "0".to_string();
67
68        loop {
69            let result: ferriskey::Value = match client
70                .cmd("SSCAN")
71                .arg(&policies_key)
72                .arg(cursor.as_str())
73                .arg("COUNT")
74                .arg("100")
75                .execute()
76                .await
77            {
78                Ok(v) => v,
79                Err(e) => {
80                    tracing::warn!(partition, error = %e, "budget_reconciler: SSCAN failed");
81                    return ScanResult { processed, errors: errors + 1 };
82                }
83            };
84
85            let (next_cursor, budget_ids) = parse_sscan_response(&result);
86
87            for bid in &budget_ids {
88                match reconcile_one_budget(client, &tag, &policies_key, bid, now_ms).await {
89                    Ok(true) => processed += 1,
90                    Ok(false) => {} // skipped (resetting budget or no drift)
91                    Err(e) => {
92                        tracing::warn!(
93                            partition,
94                            budget_id = bid.as_str(),
95                            error = %e,
96                            "budget_reconciler: reconcile failed"
97                        );
98                        errors += 1;
99                    }
100                }
101            }
102
103            cursor = next_cursor;
104            if cursor == "0" {
105                break;
106            }
107        }
108
109        ScanResult { processed, errors }
110    }
111}
112
113/// Reconcile one budget. Returns Ok(true) if corrected, Ok(false) if skipped.
114async fn reconcile_one_budget(
115    client: &ferriskey::Client,
116    tag: &str,
117    policies_key: &str,
118    budget_id: &str,
119    now_ms: u64,
120) -> Result<bool, ferriskey::Error> {
121    let def_key = format!("ff:budget:{}:{}", tag, budget_id);
122    let usage_key = format!("ff:budget:{}:{}:usage", tag, budget_id);
123    let limits_key = format!("ff:budget:{}:{}:limits", tag, budget_id);
124
125    // Defensive prune: index entry for a budget whose definition is gone
126    // (manual delete / retention purge) — drop it so SMEMBERS stays correct.
127    let def_raw: Vec<String> = client
128        .cmd("HGETALL")
129        .arg(&def_key)
130        .execute()
131        .await
132        .unwrap_or_default();
133    if def_raw.is_empty() {
134        let _: Option<i64> = client
135            .cmd("SREM")
136            .arg(policies_key)
137            .arg(budget_id)
138            .execute()
139            .await
140            .unwrap_or(None);
141        return Ok(false);
142    }
143
144    let def_map = pairs_to_map(&def_raw);
145    let reset_interval = def_map.get("reset_interval_ms").copied();
146
147    // Skip resetting budgets — cannot reconcile by summing all-time usage
148    if let Some(ri) = reset_interval
149        && !ri.is_empty()
150        && ri != "0"
151    {
152        return Ok(false);
153    }
154
155    // Read current usage counters
156    let usage_raw: Vec<String> = client
157        .cmd("HGETALL")
158        .arg(&usage_key)
159        .execute()
160        .await
161        .unwrap_or_default();
162
163    // Read hard limits
164    let limits_raw: Vec<String> = client
165        .cmd("HGETALL")
166        .arg(&limits_key)
167        .execute()
168        .await
169        .unwrap_or_default();
170
171    if limits_raw.is_empty() {
172        return Ok(false); // No limits defined
173    }
174
175    // Parse usage and limits into dimension maps
176    let usage = pairs_to_map(&usage_raw);
177    let limits = pairs_to_map(&limits_raw);
178
179    // Check breach status against hard limits.
180    // Limits hash uses prefixed fields: "hard:tokens", "soft:tokens".
181    // Usage hash uses bare fields: "tokens".
182    // Must strip prefix before looking up in usage.
183    let mut any_breached = false;
184    for (field, limit_str) in &limits {
185        // Only check hard limits for breach detection
186        let dim = match field.strip_prefix("hard:") {
187            Some(d) => d,
188            None => continue, // skip soft limits and other fields
189        };
190        let limit: i64 = limit_str.parse().unwrap_or(i64::MAX);
191        if limit <= 0 {
192            continue; // 0 or negative means no limit
193        }
194        let current: i64 = usage
195            .get(dim)
196            .and_then(|v| v.parse().ok())
197            .unwrap_or(0);
198        if current > limit {
199            any_breached = true;
200            break;
201        }
202    }
203
204    // Reconcile breach marker
205    let currently_breached = usage.contains_key("breached_at");
206
207    if any_breached && !currently_breached {
208        // Mark breached
209        let _: () = client
210            .cmd("HSET")
211            .arg(&usage_key)
212            .arg("breached_at")
213            .arg(now_ms.to_string().as_str())
214            .execute()
215            .await?;
216        tracing::info!(budget_id, "budget_reconciler: marked budget as breached");
217    } else if !any_breached && currently_breached {
218        // Clear breach
219        let _: u32 = client
220            .cmd("HDEL")
221            .arg(&usage_key)
222            .arg("breached_at")
223            .execute()
224            .await?;
225        tracing::info!(budget_id, "budget_reconciler: cleared budget breach");
226    }
227
228    // TODO: cross-partition stale-execution cleanup for ff:budget:{b:M}:<id>:executions.
229    // Retention on {p:N} can't reach this {b:M} reverse index, so entries
230    // accumulate after executions are purged. Needs a partition-aware EXISTS
231    // check (parse UUID → execution_partition(config) → core key) — deferred
232    // to a later pass since it requires PartitionConfig plumbing here.
233
234    Ok(any_breached != currently_breached) // true if we corrected something
235}
236
237fn pairs_to_map(flat: &[String]) -> std::collections::HashMap<&str, &str> {
238    let mut map = std::collections::HashMap::new();
239    let mut i = 0;
240    while i + 1 < flat.len() {
241        map.insert(flat[i].as_str(), flat[i + 1].as_str());
242        i += 2;
243    }
244    map
245}
246
247/// Parse an SSCAN reply `[cursor, [member1, member2, ...]]` into
248/// `(cursor, Vec<member>)`. Mirrors the helper in quota_reconciler /
249/// flow_projector so all three scanners agree on the wire shape.
250fn parse_sscan_response(val: &ferriskey::Value) -> (String, Vec<String>) {
251    let arr = match val {
252        ferriskey::Value::Array(a) if a.len() >= 2 => a,
253        _ => return ("0".to_string(), vec![]),
254    };
255
256    let cursor = match &arr[0] {
257        Ok(ferriskey::Value::BulkString(b)) => String::from_utf8_lossy(b).into_owned(),
258        Ok(ferriskey::Value::SimpleString(s)) => s.clone(),
259        _ => return ("0".to_string(), vec![]),
260    };
261
262    let mut members = Vec::new();
263    match &arr[1] {
264        Ok(ferriskey::Value::Array(inner)) => {
265            for item in inner {
266                if let Ok(ferriskey::Value::BulkString(b)) = item {
267                    members.push(String::from_utf8_lossy(b).into_owned());
268                }
269            }
270        }
271        Ok(ferriskey::Value::Set(inner)) => {
272            for item in inner {
273                if let ferriskey::Value::BulkString(b) = item {
274                    members.push(String::from_utf8_lossy(b).into_owned());
275                }
276            }
277        }
278        _ => {}
279    }
280
281    (cursor, members)
282}
283