use std::time::Duration;
use ff_core::keys::IndexKeys;
use ff_core::partition::{Partition, PartitionFamily};
use ff_core::types::LaneId;
use super::{FailureTracker, ScanResult, Scanner};
const BATCH_SIZE: u32 = 50;
pub struct AttemptTimeoutScanner {
interval: Duration,
failures: FailureTracker,
}
impl AttemptTimeoutScanner {
pub fn new(interval: Duration, _lanes: Vec<LaneId>) -> Self {
Self { interval, failures: FailureTracker::new() }
}
}
impl Scanner for AttemptTimeoutScanner {
fn name(&self) -> &'static str {
"attempt_timeout"
}
fn interval(&self) -> Duration {
self.interval
}
async fn scan_partition(
&self,
client: &ferriskey::Client,
partition: u16,
) -> ScanResult {
let p = Partition {
family: PartitionFamily::Execution,
index: partition,
};
let idx = IndexKeys::new(&p);
let timeout_key = idx.attempt_timeout();
let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
Ok(t) => t,
Err(e) => {
tracing::warn!(partition, error = %e, "attempt_timeout: failed to get server time");
return ScanResult { processed: 0, errors: 1 };
}
};
let timed_out: Vec<String> = match client
.cmd("ZRANGEBYSCORE")
.arg(&timeout_key)
.arg("-inf")
.arg(now_ms.to_string().as_str())
.arg("LIMIT")
.arg("0")
.arg(BATCH_SIZE.to_string().as_str())
.execute()
.await
{
Ok(ids) => ids,
Err(e) => {
tracing::warn!(partition, error = %e, "attempt_timeout: ZRANGEBYSCORE failed");
return ScanResult { processed: 0, errors: 1 };
}
};
if partition == 0 {
self.failures.advance_cycle();
}
if timed_out.is_empty() {
return ScanResult { processed: 0, errors: 0 };
}
let mut processed: u32 = 0;
let mut errors: u32 = 0;
for eid_str in &timed_out {
if self.failures.should_skip(eid_str) {
continue;
}
match expire_execution_raw(client, &p, &idx, eid_str, "attempt_timeout").await {
Ok(()) => {
self.failures.record_success(eid_str);
processed += 1;
}
Err(e) => {
tracing::warn!(
partition,
execution_id = eid_str.as_str(),
error = %e,
"attempt_timeout: ff_expire_execution failed"
);
self.failures.record_failure(eid_str, "attempt_timeout");
errors += 1;
}
}
}
ScanResult { processed, errors }
}
}
pub async fn expire_execution_raw(
client: &ferriskey::Client,
partition: &Partition,
idx: &IndexKeys,
eid_str: &str,
reason: &str,
) -> Result<(), ferriskey::Error> {
let tag = partition.hash_tag();
let exec_core = format!("ff:exec:{}:{}:core", tag, eid_str);
let lease_current = format!("ff:exec:{}:{}:lease:current", tag, eid_str);
let lease_history = format!("ff:exec:{}:{}:lease:history", tag, eid_str);
let susp_current = format!("ff:exec:{}:{}:suspension:current", tag, eid_str);
let pre_fields: Vec<Option<String>> = client
.cmd("HMGET")
.arg(&exec_core)
.arg("lane_id")
.arg("current_attempt_index")
.execute()
.await?;
let lane = ff_core::types::LaneId::new(
pre_fields.first()
.and_then(|v| v.as_deref())
.unwrap_or("default"),
);
let att_idx = pre_fields.get(1)
.and_then(|v| v.as_deref())
.unwrap_or("0");
let attempt_hash = format!("ff:attempt:{}:{}:{}", tag, eid_str, att_idx);
let stream_meta = format!("ff:stream:{}:{}:{}:meta", tag, eid_str, att_idx);
let lease_expiry = idx.lease_expiry();
let worker_leases = idx.worker_leases(&ff_core::types::WorkerInstanceId::new(""));
let active = idx.lane_active(&lane);
let terminal = idx.lane_terminal(&lane);
let attempt_timeout = idx.attempt_timeout();
let execution_deadline = idx.execution_deadline();
let suspended = idx.lane_suspended(&lane);
let suspension_timeout = idx.suspension_timeout();
let keys: [&str; 14] = [
&exec_core, &attempt_hash, &stream_meta, &lease_current, &lease_history, &lease_expiry, &worker_leases, &active, &terminal, &attempt_timeout, &execution_deadline, &suspended, &suspension_timeout, &susp_current, ];
let argv: [&str; 2] = [eid_str, reason];
let _: ferriskey::Value = client
.fcall("ff_expire_execution", &keys, &argv)
.await?;
Ok(())
}