ff_engine/scanner/
budget_reset.rs1use std::time::Duration;
11
12use ff_core::backend::ScannerFilter;
13use ff_core::keys::budget_resets_key;
14use ff_core::partition::{Partition, PartitionFamily};
15
16use super::{ScanResult, Scanner};
17
18const BATCH_SIZE: u32 = 20;
19
20pub struct BudgetResetScanner {
21 interval: Duration,
22 filter: ScannerFilter,
24}
25
26impl BudgetResetScanner {
27 pub fn new(interval: Duration) -> Self {
28 Self::with_filter(interval, ScannerFilter::default())
29 }
30
31 pub fn with_filter(interval: Duration, filter: ScannerFilter) -> Self {
37 Self { interval, filter }
38 }
39}
40
41impl Scanner for BudgetResetScanner {
42 fn name(&self) -> &'static str {
43 "budget_reset"
44 }
45
46 fn interval(&self) -> Duration {
47 self.interval
48 }
49
50 fn filter(&self) -> &ScannerFilter {
51 &self.filter
52 }
53
54 async fn scan_partition(
55 &self,
56 client: &ferriskey::Client,
57 partition: u16,
58 ) -> ScanResult {
59 let p = Partition {
60 family: PartitionFamily::Budget,
61 index: partition,
62 };
63 let tag = p.hash_tag();
64 let resets_key = budget_resets_key(&tag);
65
66 let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
67 Ok(t) => t,
68 Err(e) => {
69 tracing::warn!(partition, error = %e, "budget_reset: failed to get server time");
70 return ScanResult { processed: 0, errors: 1 };
71 }
72 };
73
74 let due: Vec<String> = match client
76 .cmd("ZRANGEBYSCORE")
77 .arg(&resets_key)
78 .arg("-inf")
79 .arg(now_ms.to_string().as_str())
80 .arg("LIMIT")
81 .arg("0")
82 .arg(BATCH_SIZE.to_string().as_str())
83 .execute()
84 .await
85 {
86 Ok(ids) => ids,
87 Err(e) => {
88 tracing::warn!(partition, error = %e, "budget_reset: ZRANGEBYSCORE failed");
89 return ScanResult { processed: 0, errors: 1 };
90 }
91 };
92
93 if due.is_empty() {
94 return ScanResult { processed: 0, errors: 0 };
95 }
96
97 let mut processed: u32 = 0;
98 let mut errors: u32 = 0;
99
100 for budget_id_str in &due {
101 let budget_def = format!("ff:budget:{}:{}", tag, budget_id_str);
105 let budget_usage = format!("ff:budget:{}:{}:usage", tag, budget_id_str);
106
107 let keys: [&str; 3] = [&budget_def, &budget_usage, &resets_key];
108 let now_s = now_ms.to_string();
109 let argv: [&str; 2] = [budget_id_str.as_str(), &now_s];
110
111 match client
112 .fcall::<ferriskey::Value>("ff_reset_budget", &keys, &argv)
113 .await
114 {
115 Ok(_) => processed += 1,
116 Err(e) => {
117 tracing::warn!(
118 partition,
119 budget_id = budget_id_str.as_str(),
120 error = %e,
121 "budget_reset: ff_reset_budget failed"
122 );
123 errors += 1;
124 }
125 }
126 }
127
128 ScanResult { processed, errors }
129 }
130}