use std::time::Duration;
use ff_core::keys::IndexKeys;
use ff_core::partition::{Partition, PartitionFamily};
use ff_core::types::LaneId;
use super::{ScanResult, Scanner};
const SCAN_COUNT: u32 = 100;
pub struct IndexReconciler {
interval: Duration,
lanes: Vec<LaneId>,
}
impl IndexReconciler {
pub fn new(interval: Duration, lanes: Vec<LaneId>) -> Self {
Self { interval, lanes }
}
}
impl Scanner for IndexReconciler {
fn name(&self) -> &'static str {
"index_reconciler"
}
fn interval(&self) -> Duration {
self.interval
}
async fn scan_partition(
&self,
client: &ferriskey::Client,
partition: u16,
) -> ScanResult {
let p = Partition {
family: PartitionFamily::Execution,
index: partition,
};
let idx = IndexKeys::new(&p);
let all_exec_key = idx.all_executions();
let mut cursor = "0".to_string();
let mut processed: u32 = 0;
let mut errors: u32 = 0;
loop {
let result: ferriskey::Value = match client
.cmd("SSCAN")
.arg(&all_exec_key)
.arg(cursor.as_str())
.arg("COUNT")
.arg(SCAN_COUNT.to_string().as_str())
.execute()
.await
{
Ok(v) => v,
Err(e) => {
tracing::warn!(partition, error = %e, "index_reconciler: SSCAN failed");
return ScanResult { processed, errors: errors + 1 };
}
};
let (next_cursor, members) = match parse_sscan_response(&result) {
Some(v) => v,
None => {
tracing::warn!(partition, "index_reconciler: unexpected SSCAN response format");
return ScanResult { processed, errors: errors + 1 };
}
};
for eid_str in &members {
match check_execution_index(client, &p, &idx, eid_str, &self.lanes).await {
Ok(true) => {} Ok(false) => {
processed += 1;
}
Err(e) => {
tracing::warn!(
partition,
execution_id = eid_str.as_str(),
error = %e,
"index_reconciler: check failed"
);
errors += 1;
}
}
}
cursor = next_cursor;
if cursor == "0" {
break;
}
}
ScanResult { processed, errors }
}
}
async fn check_execution_index(
client: &ferriskey::Client,
partition: &Partition,
idx: &IndexKeys,
eid_str: &str,
_lanes: &[LaneId],
) -> Result<bool, ferriskey::Error> {
let core_key = format!("ff:exec:{}:{}:core", partition.hash_tag(), eid_str);
let fields: Vec<Option<String>> = client
.cmd("HMGET")
.arg(&core_key)
.arg("lifecycle_phase")
.arg("eligibility_state")
.arg("ownership_state")
.arg("lane_id")
.execute()
.await?;
if fields.is_empty() || fields[0].is_none() {
tracing::warn!(
partition = partition.index,
execution_id = eid_str,
"index_reconciler: execution in all_executions but core hash missing"
);
return Ok(false);
}
let lifecycle = fields[0].as_deref().unwrap_or("");
let eligibility = fields[1].as_deref().unwrap_or("");
let ownership = fields[2].as_deref().unwrap_or("");
let lane_str = fields[3].as_deref().unwrap_or("default");
let expected_index = match (lifecycle, eligibility, ownership) {
("active", _, "leased") => "active",
("runnable", "eligible_now", _) => "eligible",
("runnable", "not_eligible_until_time", _) => "delayed",
("runnable", "blocked_by_dependencies", _) => "blocked:dependencies",
("runnable", "blocked_by_budget", _) => "blocked:budget",
("runnable", "blocked_by_quota", _) => "blocked:quota",
("runnable", "blocked_by_route", _) => "blocked:route",
("runnable", "blocked_by_operator", _) => "blocked:operator",
("suspended", _, _) => "suspended",
("terminal", _, _) => "terminal",
_ => "unknown",
};
if expected_index == "unknown" {
return Ok(true);
}
let lane = LaneId::new(lane_str);
let expected_key = match expected_index {
"active" => idx.lane_active(&lane),
"eligible" => idx.lane_eligible(&lane),
"delayed" => idx.lane_delayed(&lane),
"blocked:dependencies" => idx.lane_blocked_dependencies(&lane),
"blocked:budget" => idx.lane_blocked_budget(&lane),
"blocked:quota" => idx.lane_blocked_quota(&lane),
"blocked:route" => idx.lane_blocked_route(&lane),
"blocked:operator" => idx.lane_blocked_operator(&lane),
"suspended" => idx.lane_suspended(&lane),
"terminal" => idx.lane_terminal(&lane),
_ => return Ok(true),
};
let score: Option<String> = client
.cmd("ZSCORE")
.arg(&expected_key)
.arg(eid_str)
.execute()
.await?;
if score.is_none() {
tracing::warn!(
partition = partition.index,
execution_id = eid_str,
expected_index,
expected_key = expected_key.as_str(),
lifecycle,
eligibility,
ownership,
"index_reconciler: execution missing from expected index"
);
return Ok(false);
}
Ok(true)
}
fn parse_sscan_response(val: &ferriskey::Value) -> Option<(String, Vec<String>)> {
let arr = match val {
ferriskey::Value::Array(a) => a,
_ => return None,
};
if arr.len() < 2 {
return None;
}
let cursor = match &arr[0] {
Ok(ferriskey::Value::BulkString(b)) => String::from_utf8_lossy(b).into_owned(),
Ok(ferriskey::Value::SimpleString(s)) => s.clone(),
_ => return None,
};
let mut members = Vec::new();
match &arr[1] {
Ok(ferriskey::Value::Array(inner)) => {
for item in inner {
if let Ok(ferriskey::Value::BulkString(b)) = item {
members.push(String::from_utf8_lossy(b).into_owned());
}
}
}
Ok(ferriskey::Value::Set(inner)) => {
for item in inner {
if let ferriskey::Value::BulkString(b) = item {
members.push(String::from_utf8_lossy(b).into_owned());
}
}
}
_ => return None,
};
Some((cursor, members))
}