use std::time::Duration;
use ff_core::backend::ScannerFilter;
use ff_core::keys::IndexKeys;
use ff_core::partition::{Partition, PartitionFamily};
use super::{should_skip_candidate, FailureTracker, ScanResult, Scanner};
const BATCH_SIZE: u32 = 50;
#[cfg(feature = "postgres")]
pub async fn scan_tick_pg(
pool: &ff_backend_postgres::PgPool,
partition_key: i16,
filter: &ff_core::backend::ScannerFilter,
) -> Result<ff_backend_postgres::reconcilers::ScanReport, ff_core::engine_error::EngineError> {
ff_backend_postgres::reconcilers::suspension_timeout::scan_tick(pool, partition_key, filter)
.await
}
pub struct SuspensionTimeoutScanner {
interval: Duration,
failures: FailureTracker,
filter: ScannerFilter,
}
impl SuspensionTimeoutScanner {
pub fn new(interval: Duration) -> Self {
Self::with_filter(interval, ScannerFilter::default())
}
pub fn with_filter(interval: Duration, filter: ScannerFilter) -> Self {
Self {
interval,
failures: FailureTracker::new(),
filter,
}
}
}
impl Scanner for SuspensionTimeoutScanner {
fn name(&self) -> &'static str {
"suspension_timeout"
}
fn interval(&self) -> Duration {
self.interval
}
fn filter(&self) -> &ScannerFilter {
&self.filter
}
async fn scan_partition(
&self,
client: &ferriskey::Client,
partition: u16,
) -> ScanResult {
let p = Partition {
family: PartitionFamily::Execution,
index: partition,
};
let idx = IndexKeys::new(&p);
let timeout_key = idx.suspension_timeout();
let tag = p.hash_tag();
let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
Ok(t) => t,
Err(e) => {
tracing::warn!(partition, error = %e, "suspension_timeout: failed to get server time");
return ScanResult { processed: 0, errors: 1 };
}
};
let timed_out: Vec<String> = match client
.cmd("ZRANGEBYSCORE")
.arg(&timeout_key)
.arg("-inf")
.arg(now_ms.to_string().as_str())
.arg("LIMIT")
.arg("0")
.arg(BATCH_SIZE.to_string().as_str())
.execute()
.await
{
Ok(ids) => ids,
Err(e) => {
tracing::warn!(partition, error = %e, "suspension_timeout: ZRANGEBYSCORE failed");
return ScanResult { processed: 0, errors: 1 };
}
};
if partition == 0 {
self.failures.advance_cycle();
}
if timed_out.is_empty() {
return ScanResult { processed: 0, errors: 0 };
}
let mut processed: u32 = 0;
let mut errors: u32 = 0;
for eid_str in &timed_out {
if self.failures.should_skip(eid_str) {
continue;
}
if should_skip_candidate(client, &self.filter, partition, eid_str).await {
continue;
}
match expire_suspension(client, &tag, &idx, eid_str).await {
Ok(()) => {
self.failures.record_success(eid_str);
processed += 1;
}
Err(e) => {
tracing::warn!(
partition,
execution_id = eid_str.as_str(),
error = %e,
"suspension_timeout: ff_expire_suspension failed"
);
self.failures.record_failure(eid_str, "suspension_timeout");
errors += 1;
}
}
}
ScanResult { processed, errors }
}
}
async fn expire_suspension(
client: &ferriskey::Client,
tag: &str,
idx: &IndexKeys,
eid_str: &str,
) -> Result<(), ferriskey::Error> {
let exec_core = format!("ff:exec:{}:{}:core", tag, eid_str);
let suspension_current = format!("ff:exec:{}:{}:suspension:current", tag, eid_str);
let wp_id: Option<String> = client
.cmd("HGET")
.arg(&exec_core)
.arg("current_waitpoint_id")
.execute()
.await?;
let att_idx: Option<String> = client
.cmd("HGET")
.arg(&exec_core)
.arg("current_attempt_index")
.execute()
.await?;
let wp_id = wp_id.unwrap_or_default();
let att_idx = att_idx.unwrap_or_else(|| "0".to_string());
let waitpoint_hash = format!("ff:wp:{}:{}", tag, wp_id);
let wp_condition = format!("ff:wp:{}:{}:condition", tag, wp_id);
let attempt_hash = format!("ff:attempt:{}:{}:{}", tag, eid_str, att_idx);
let stream_meta = format!("ff:stream:{}:{}:{}:meta", tag, eid_str, att_idx);
let suspension_timeout = idx.suspension_timeout();
let lane: Option<String> = client
.cmd("HGET")
.arg(&exec_core)
.arg("lane_id")
.execute()
.await?;
let lane_str = lane.unwrap_or_else(|| "default".to_string());
let lane_id = ff_core::types::LaneId::new(&lane_str);
let suspended_zset = idx.lane_suspended(&lane_id);
let terminal_zset = idx.lane_terminal(&lane_id);
let eligible_zset = idx.lane_eligible(&lane_id);
let delayed_zset = idx.lane_delayed(&lane_id);
let lease_history = format!("ff:exec:{}:{}:lease:history", tag, eid_str);
let keys: [&str; 12] = [
&exec_core, &suspension_current, &waitpoint_hash, &wp_condition, &attempt_hash, &stream_meta, &suspension_timeout, &suspended_zset, &terminal_zset, &eligible_zset, &delayed_zset, &lease_history, ];
let argv: [&str; 1] = [eid_str];
let _: ferriskey::Value = client
.fcall("ff_expire_suspension", &keys, &argv)
.await?;
Ok(())
}