use std::time::Duration;
use ff_core::backend::ScannerFilter;
use ff_core::keys;
use ff_core::partition::{Partition, PartitionFamily};
use super::{ScanResult, Scanner};
pub struct QuotaReconciler {
interval: Duration,
filter: ScannerFilter,
}
impl QuotaReconciler {
pub fn new(interval: Duration) -> Self {
Self::with_filter(interval, ScannerFilter::default())
}
pub fn with_filter(interval: Duration, filter: ScannerFilter) -> Self {
Self { interval, filter }
}
}
impl Scanner for QuotaReconciler {
fn name(&self) -> &'static str {
"quota_reconciler"
}
fn interval(&self) -> Duration {
self.interval
}
fn filter(&self) -> &ScannerFilter {
&self.filter
}
async fn scan_partition(
&self,
client: &ferriskey::Client,
partition: u16,
) -> ScanResult {
let p = Partition {
family: PartitionFamily::Quota,
index: partition,
};
let tag = p.hash_tag();
let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
Ok(t) => t,
Err(e) => {
tracing::warn!(partition, error = %e, "quota_reconciler: failed to get server time");
return ScanResult { processed: 0, errors: 1 };
}
};
let policies_key = keys::quota_policies_index(&tag);
let quota_ids: Vec<String> = match client
.cmd("SMEMBERS")
.arg(&policies_key)
.execute()
.await
{
Ok(ids) => ids,
Err(e) => {
tracing::warn!(partition, error = %e, "quota_reconciler: SMEMBERS failed");
return ScanResult { processed: 0, errors: 1 };
}
};
if quota_ids.is_empty() {
return ScanResult { processed: 0, errors: 0 };
}
let mut processed: u32 = 0;
let mut errors: u32 = 0;
for qid in "a_ids {
match reconcile_one_quota(client, &tag, qid, now_ms).await {
Ok(true) => processed += 1,
Ok(false) => {} Err(e) => {
tracing::warn!(
partition,
quota_id = qid.as_str(),
error = %e,
"quota_reconciler: reconcile failed"
);
errors += 1;
}
}
}
ScanResult { processed, errors }
}
}
async fn reconcile_one_quota(
client: &ferriskey::Client,
tag: &str,
quota_id: &str,
now_ms: u64,
) -> Result<bool, ferriskey::Error> {
let mut did_work = false;
let def_key = format!("ff:quota:{}:{}", tag, quota_id);
let window_secs: Option<String> = client
.cmd("HGET")
.arg(&def_key)
.arg("requests_per_window_seconds")
.execute()
.await?;
if let Some(ref ws) = window_secs
&& let Ok(secs) = ws.parse::<u64>()
&& secs > 0
{
let window_ms = secs * 1000;
let window_key =
format!("ff:quota:{}:{}:window:requests_per_window", tag, quota_id);
let cutoff = now_ms.saturating_sub(window_ms);
let removed: u32 = client
.cmd("ZREMRANGEBYSCORE")
.arg(&window_key)
.arg("-inf")
.arg(cutoff.to_string().as_str())
.execute()
.await
.unwrap_or(0);
if removed > 0 {
did_work = true;
tracing::debug!(
quota_id,
removed,
"quota_reconciler: trimmed expired window entries"
);
}
}
let concurrency_cap: Option<String> = client
.cmd("HGET")
.arg(&def_key)
.arg("active_concurrency_cap")
.execute()
.await?;
if let Some(ref cap_str) = concurrency_cap
&& let Ok(cap) = cap_str.parse::<u64>()
&& cap > 0
{
let counter_key = format!("ff:quota:{}:{}:concurrency", tag, quota_id);
let admitted_set_key = format!("ff:quota:{}:{}:admitted_set", tag, quota_id);
let mut live_count: u64 = 0;
let mut cursor = "0".to_string();
loop {
let result: ferriskey::Value = client
.cmd("SSCAN")
.arg(&admitted_set_key)
.arg(cursor.as_str())
.arg("COUNT")
.arg("100")
.execute()
.await?;
let (next_cursor, members) = parse_sscan_response(&result);
for eid in &members {
let guard_key = format!("ff:quota:{}:{}:admitted:{}", tag, quota_id, eid);
let exists: bool = client
.exists(&guard_key)
.await
.unwrap_or(false);
if exists {
live_count += 1;
} else {
let _: () = client
.cmd("SREM")
.arg(&admitted_set_key)
.arg(eid.as_str())
.execute()
.await
.unwrap_or_default();
}
}
cursor = next_cursor;
if cursor == "0" {
break;
}
}
let stored: Option<String> = client
.cmd("GET")
.arg(&counter_key)
.execute()
.await?;
let stored_count: i64 = stored
.as_deref()
.and_then(|s| s.parse().ok())
.unwrap_or(0);
if stored_count != live_count as i64 {
let _: () = client
.cmd("SET")
.arg(&counter_key)
.arg(live_count.to_string().as_str())
.execute()
.await?;
tracing::info!(
quota_id,
stored = stored_count,
actual = live_count,
"quota_reconciler: corrected concurrency counter drift"
);
did_work = true;
}
}
Ok(did_work)
}
fn parse_sscan_response(val: &ferriskey::Value) -> (String, Vec<String>) {
let arr = match val {
ferriskey::Value::Array(a) if a.len() >= 2 => a,
_ => return ("0".to_string(), vec![]),
};
let cursor = match &arr[0] {
Ok(ferriskey::Value::BulkString(b)) => String::from_utf8_lossy(b).into_owned(),
Ok(ferriskey::Value::SimpleString(s)) => s.clone(),
_ => return ("0".to_string(), vec![]),
};
let mut members = Vec::new();
match &arr[1] {
Ok(ferriskey::Value::Array(inner)) => {
for item in inner {
if let Ok(ferriskey::Value::BulkString(b)) = item {
members.push(String::from_utf8_lossy(b).into_owned());
}
}
}
Ok(ferriskey::Value::Set(inner)) => {
for item in inner {
if let ferriskey::Value::BulkString(b) = item {
members.push(String::from_utf8_lossy(b).into_owned());
}
}
}
_ => {}
}
(cursor, members)
}