ff_engine/scanner/
budget_reset.rs1use std::time::Duration;
11
12use ff_core::keys::budget_resets_key;
13use ff_core::partition::{Partition, PartitionFamily};
14
15use super::{ScanResult, Scanner};
16
17const BATCH_SIZE: u32 = 20;
18
19pub struct BudgetResetScanner {
20 interval: Duration,
21}
22
23impl BudgetResetScanner {
24 pub fn new(interval: Duration) -> Self {
25 Self { interval }
26 }
27}
28
29impl Scanner for BudgetResetScanner {
30 fn name(&self) -> &'static str {
31 "budget_reset"
32 }
33
34 fn interval(&self) -> Duration {
35 self.interval
36 }
37
38 async fn scan_partition(
39 &self,
40 client: &ferriskey::Client,
41 partition: u16,
42 ) -> ScanResult {
43 let p = Partition {
44 family: PartitionFamily::Budget,
45 index: partition,
46 };
47 let tag = p.hash_tag();
48 let resets_key = budget_resets_key(&tag);
49
50 let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
51 Ok(t) => t,
52 Err(e) => {
53 tracing::warn!(partition, error = %e, "budget_reset: failed to get server time");
54 return ScanResult { processed: 0, errors: 1 };
55 }
56 };
57
58 let due: Vec<String> = match client
60 .cmd("ZRANGEBYSCORE")
61 .arg(&resets_key)
62 .arg("-inf")
63 .arg(now_ms.to_string().as_str())
64 .arg("LIMIT")
65 .arg("0")
66 .arg(BATCH_SIZE.to_string().as_str())
67 .execute()
68 .await
69 {
70 Ok(ids) => ids,
71 Err(e) => {
72 tracing::warn!(partition, error = %e, "budget_reset: ZRANGEBYSCORE failed");
73 return ScanResult { processed: 0, errors: 1 };
74 }
75 };
76
77 if due.is_empty() {
78 return ScanResult { processed: 0, errors: 0 };
79 }
80
81 let mut processed: u32 = 0;
82 let mut errors: u32 = 0;
83
84 for budget_id_str in &due {
85 let budget_def = format!("ff:budget:{}:{}", tag, budget_id_str);
89 let budget_usage = format!("ff:budget:{}:{}:usage", tag, budget_id_str);
90
91 let keys: [&str; 3] = [&budget_def, &budget_usage, &resets_key];
92 let now_s = now_ms.to_string();
93 let argv: [&str; 2] = [budget_id_str.as_str(), &now_s];
94
95 match client
96 .fcall::<ferriskey::Value>("ff_reset_budget", &keys, &argv)
97 .await
98 {
99 Ok(_) => processed += 1,
100 Err(e) => {
101 tracing::warn!(
102 partition,
103 budget_id = budget_id_str.as_str(),
104 error = %e,
105 "budget_reset: ff_reset_budget failed"
106 );
107 errors += 1;
108 }
109 }
110 }
111
112 ScanResult { processed, errors }
113 }
114}