use std::time::Duration;
use ff_core::keys;
use ff_core::partition::{Partition, PartitionFamily};
use super::{ScanResult, Scanner};
pub struct BudgetReconciler {
interval: Duration,
}
impl BudgetReconciler {
pub fn new(interval: Duration) -> Self {
Self { interval }
}
}
impl Scanner for BudgetReconciler {
fn name(&self) -> &'static str {
"budget_reconciler"
}
fn interval(&self) -> Duration {
self.interval
}
async fn scan_partition(
&self,
client: &ferriskey::Client,
partition: u16,
) -> ScanResult {
let p = Partition {
family: PartitionFamily::Budget,
index: partition,
};
let tag = p.hash_tag();
let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
Ok(t) => t,
Err(e) => {
tracing::warn!(partition, error = %e, "budget_reconciler: failed to get server time");
return ScanResult { processed: 0, errors: 1 };
}
};
let policies_key = keys::budget_policies_index(&tag);
let mut processed: u32 = 0;
let mut errors: u32 = 0;
let mut cursor = "0".to_string();
loop {
let result: ferriskey::Value = match client
.cmd("SSCAN")
.arg(&policies_key)
.arg(cursor.as_str())
.arg("COUNT")
.arg("100")
.execute()
.await
{
Ok(v) => v,
Err(e) => {
tracing::warn!(partition, error = %e, "budget_reconciler: SSCAN failed");
return ScanResult { processed, errors: errors + 1 };
}
};
let (next_cursor, budget_ids) = parse_sscan_response(&result);
for bid in &budget_ids {
match reconcile_one_budget(client, &tag, &policies_key, bid, now_ms).await {
Ok(true) => processed += 1,
Ok(false) => {} Err(e) => {
tracing::warn!(
partition,
budget_id = bid.as_str(),
error = %e,
"budget_reconciler: reconcile failed"
);
errors += 1;
}
}
}
cursor = next_cursor;
if cursor == "0" {
break;
}
}
ScanResult { processed, errors }
}
}
async fn reconcile_one_budget(
client: &ferriskey::Client,
tag: &str,
policies_key: &str,
budget_id: &str,
now_ms: u64,
) -> Result<bool, ferriskey::Error> {
let def_key = format!("ff:budget:{}:{}", tag, budget_id);
let usage_key = format!("ff:budget:{}:{}:usage", tag, budget_id);
let limits_key = format!("ff:budget:{}:{}:limits", tag, budget_id);
let def_raw: Vec<String> = client
.cmd("HGETALL")
.arg(&def_key)
.execute()
.await
.unwrap_or_default();
if def_raw.is_empty() {
let _: Option<i64> = client
.cmd("SREM")
.arg(policies_key)
.arg(budget_id)
.execute()
.await
.unwrap_or(None);
return Ok(false);
}
let def_map = pairs_to_map(&def_raw);
let reset_interval = def_map.get("reset_interval_ms").copied();
if let Some(ri) = reset_interval
&& !ri.is_empty()
&& ri != "0"
{
return Ok(false);
}
let usage_raw: Vec<String> = client
.cmd("HGETALL")
.arg(&usage_key)
.execute()
.await
.unwrap_or_default();
let limits_raw: Vec<String> = client
.cmd("HGETALL")
.arg(&limits_key)
.execute()
.await
.unwrap_or_default();
if limits_raw.is_empty() {
return Ok(false); }
let usage = pairs_to_map(&usage_raw);
let limits = pairs_to_map(&limits_raw);
let mut any_breached = false;
for (field, limit_str) in &limits {
let dim = match field.strip_prefix("hard:") {
Some(d) => d,
None => continue, };
let limit: i64 = limit_str.parse().unwrap_or(i64::MAX);
if limit <= 0 {
continue; }
let current: i64 = usage
.get(dim)
.and_then(|v| v.parse().ok())
.unwrap_or(0);
if current > limit {
any_breached = true;
break;
}
}
let currently_breached = usage.contains_key("breached_at");
if any_breached && !currently_breached {
let _: () = client
.cmd("HSET")
.arg(&usage_key)
.arg("breached_at")
.arg(now_ms.to_string().as_str())
.execute()
.await?;
tracing::info!(budget_id, "budget_reconciler: marked budget as breached");
} else if !any_breached && currently_breached {
let _: u32 = client
.cmd("HDEL")
.arg(&usage_key)
.arg("breached_at")
.execute()
.await?;
tracing::info!(budget_id, "budget_reconciler: cleared budget breach");
}
Ok(any_breached != currently_breached) }
fn pairs_to_map(flat: &[String]) -> std::collections::HashMap<&str, &str> {
let mut map = std::collections::HashMap::new();
let mut i = 0;
while i + 1 < flat.len() {
map.insert(flat[i].as_str(), flat[i + 1].as_str());
i += 2;
}
map
}
fn parse_sscan_response(val: &ferriskey::Value) -> (String, Vec<String>) {
let arr = match val {
ferriskey::Value::Array(a) if a.len() >= 2 => a,
_ => return ("0".to_string(), vec![]),
};
let cursor = match &arr[0] {
Ok(ferriskey::Value::BulkString(b)) => String::from_utf8_lossy(b).into_owned(),
Ok(ferriskey::Value::SimpleString(s)) => s.clone(),
_ => return ("0".to_string(), vec![]),
};
let mut members = Vec::new();
match &arr[1] {
Ok(ferriskey::Value::Array(inner)) => {
for item in inner {
if let Ok(ferriskey::Value::BulkString(b)) = item {
members.push(String::from_utf8_lossy(b).into_owned());
}
}
}
Ok(ferriskey::Value::Set(inner)) => {
for item in inner {
if let ferriskey::Value::BulkString(b) = item {
members.push(String::from_utf8_lossy(b).into_owned());
}
}
}
_ => {}
}
(cursor, members)
}