use std::time::Duration;
use ff_core::backend::ScannerFilter;
use ff_core::keys::{ExecKeyContext, FlowIndexKeys, FlowKeyContext, IndexKeys};
use ff_core::partition::{
execution_partition, Partition, PartitionConfig, PartitionFamily,
};
use ff_core::types::{AttemptIndex, ExecutionId, FlowId, LaneId, WaitpointId, WorkerInstanceId};
use super::{should_skip_candidate, ScanResult, Scanner};
const BATCH_SIZE: u32 = 50;
const MAX_MEMBERS_PER_FLOW_PER_CYCLE: usize = 500;
pub struct CancelReconciler {
interval: Duration,
partition_config: PartitionConfig,
filter: ScannerFilter,
}
impl CancelReconciler {
pub fn new(interval: Duration, partition_config: PartitionConfig) -> Self {
Self::with_filter(interval, partition_config, ScannerFilter::default())
}
pub fn with_filter(
interval: Duration,
partition_config: PartitionConfig,
filter: ScannerFilter,
) -> Self {
Self {
interval,
partition_config,
filter,
}
}
}
impl Scanner for CancelReconciler {
fn name(&self) -> &'static str {
"cancel_reconciler"
}
fn interval(&self) -> Duration {
self.interval
}
fn filter(&self) -> &ScannerFilter {
&self.filter
}
async fn sample_backlog_depth(
&self,
client: &ferriskey::Client,
partition: u16,
) -> Option<u64> {
let p = Partition {
family: PartitionFamily::Flow,
index: partition,
};
let fidx = FlowIndexKeys::new(&p);
let backlog_key = fidx.cancel_backlog();
let card: Result<Option<u64>, _> = client
.cmd("ZCARD")
.arg(&backlog_key)
.execute()
.await;
card.ok().flatten()
}
async fn scan_partition(
&self,
client: &ferriskey::Client,
partition: u16,
) -> ScanResult {
let p = Partition {
family: PartitionFamily::Flow,
index: partition,
};
let fidx = FlowIndexKeys::new(&p);
let backlog_key = fidx.cancel_backlog();
let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
Ok(t) => t,
Err(e) => {
tracing::warn!(partition, error = %e, "cancel_reconciler: TIME failed");
return ScanResult { processed: 0, errors: 1 };
}
};
let flow_ids: Vec<String> = match client
.cmd("ZRANGEBYSCORE")
.arg(&backlog_key)
.arg("-inf")
.arg(now_ms.to_string().as_str())
.arg("LIMIT")
.arg("0")
.arg(BATCH_SIZE.to_string().as_str())
.execute()
.await
{
Ok(ids) => ids,
Err(e) => {
tracing::warn!(
partition,
error = %e,
"cancel_reconciler: ZRANGEBYSCORE cancel_backlog failed"
);
return ScanResult { processed: 0, errors: 1 };
}
};
if flow_ids.is_empty() {
return ScanResult { processed: 0, errors: 0 };
}
let mut processed: u32 = 0;
let mut errors: u32 = 0;
for flow_id_str in flow_ids {
let flow_id = match FlowId::parse(&flow_id_str) {
Ok(id) => id,
Err(e) => {
tracing::warn!(
partition,
raw = %flow_id_str,
error = %e,
"cancel_reconciler: malformed flow_id in cancel_backlog; ZREM"
);
let _: Result<i64, _> = client
.cmd("ZREM")
.arg(&backlog_key)
.arg(flow_id_str.as_str())
.execute()
.await;
errors += 1;
continue;
}
};
let fctx = FlowKeyContext::new(&p, &flow_id);
let pending_key = fctx.pending_cancels();
let core_exists: bool = match client
.cmd("EXISTS")
.arg(fctx.core().as_str())
.execute()
.await
{
Ok(v) => v,
Err(e) => {
tracing::warn!(
flow_id = %flow_id,
error = %e,
"cancel_reconciler: EXISTS flow_core failed"
);
errors += 1;
continue;
}
};
if !core_exists {
let _: Result<i64, _> = client
.cmd("DEL")
.arg(pending_key.as_str())
.execute()
.await;
let _: Result<i64, _> = client
.cmd("ZREM")
.arg(&backlog_key)
.arg(flow_id.to_string().as_str())
.execute()
.await;
continue;
}
let member_strs: Vec<String> = match client
.cmd("SRANDMEMBER")
.arg(pending_key.as_str())
.arg(MAX_MEMBERS_PER_FLOW_PER_CYCLE.to_string().as_str())
.execute()
.await
{
Ok(m) => m,
Err(e) => {
tracing::warn!(
flow_id = %flow_id,
error = %e,
"cancel_reconciler: SRANDMEMBER pending_cancels failed"
);
errors += 1;
continue;
}
};
if member_strs.is_empty() {
let _: Result<i64, _> = client
.cmd("ZREM")
.arg(&backlog_key)
.arg(flow_id.to_string().as_str())
.execute()
.await;
continue;
}
let reason: String = match client
.cmd("HGET")
.arg(fctx.core().as_str())
.arg("cancel_reason")
.execute::<Option<String>>()
.await
{
Ok(Some(s)) => s,
Ok(None) => "flow_cancelled".to_owned(),
Err(e) => {
tracing::warn!(
flow_id = %flow_id,
error = %e,
"cancel_reconciler: HGET cancel_reason failed; retry next cycle"
);
errors += 1;
continue;
}
};
for eid_str in &member_strs {
let execution_id = match ExecutionId::parse(eid_str) {
Ok(id) => id,
Err(e) => {
tracing::warn!(
flow_id = %flow_id,
raw = %eid_str,
error = %e,
"cancel_reconciler: malformed eid in pending_cancels; SREM"
);
let _: Result<i64, _> = client
.cmd("SREM")
.arg(pending_key.as_str())
.arg(eid_str.as_str())
.execute()
.await;
errors += 1;
continue;
}
};
let member_part = execution_partition(
&execution_id,
&self.partition_config,
).index;
if should_skip_candidate(
client,
&self.filter,
member_part,
eid_str,
)
.await
{
continue;
}
if cancel_member(
client,
&self.partition_config,
&execution_id,
&reason,
)
.await
{
let flow_id_str = flow_id.to_string();
let ack_keys = [pending_key.as_str(), backlog_key.as_str()];
let ack_args = [eid_str.as_str(), flow_id_str.as_str()];
match client
.fcall::<ferriskey::Value>("ff_ack_cancel_member", &ack_keys, &ack_args)
.await
{
Ok(_) => processed += 1,
Err(e) => {
tracing::debug!(
flow_id = %flow_id,
execution_id = %eid_str,
error = %e,
"cancel_reconciler: ack failed; retry next cycle"
);
errors += 1;
}
}
} else {
errors += 1;
}
}
}
ScanResult { processed, errors }
}
}
async fn cancel_member(
client: &ferriskey::Client,
partition_config: &PartitionConfig,
execution_id: &ExecutionId,
reason: &str,
) -> bool {
let partition = execution_partition(execution_id, partition_config);
let ctx = ExecKeyContext::new(&partition, execution_id);
let idx = IndexKeys::new(&partition);
let lane_str: Option<String> = match client.hget(&ctx.core(), "lane_id").await {
Ok(v) => v,
Err(e) => {
let kind = e.kind();
let retryable = is_retryable_kind(kind);
if !retryable {
tracing::warn!(
execution_id = %execution_id,
error = %e,
"cancel_reconciler: permanent HGET lane_id error; ack to avoid poison"
);
return true;
}
tracing::debug!(
execution_id = %execution_id,
error = %e,
"cancel_reconciler: transient HGET lane_id; retry next cycle"
);
return false;
}
};
let lane = LaneId::new(lane_str.as_deref().unwrap_or("default"));
let dyn_fields: Vec<Option<String>> = match client
.cmd("HMGET")
.arg(ctx.core())
.arg("current_attempt_index")
.arg("current_waitpoint_id")
.arg("current_worker_instance_id")
.execute()
.await
{
Ok(v) => v,
Err(e) => {
let kind = e.kind();
let retryable = is_retryable_kind(kind);
if !retryable {
tracing::warn!(
execution_id = %execution_id,
error = %e,
"cancel_reconciler: permanent HMGET error; ack to avoid poison"
);
return true;
}
tracing::debug!(
execution_id = %execution_id,
error = %e,
"cancel_reconciler: transient HMGET; retry next cycle"
);
return false;
}
};
let att_idx_val = dyn_fields
.first()
.and_then(|v| v.as_ref())
.and_then(|s| s.parse::<u32>().ok())
.unwrap_or(0);
let att_idx = AttemptIndex::new(att_idx_val);
let wp_id_str = dyn_fields.get(1).and_then(|v| v.as_ref()).cloned().unwrap_or_default();
let wp_id = if wp_id_str.is_empty() {
WaitpointId::new()
} else {
WaitpointId::parse(&wp_id_str).unwrap_or_else(|_| WaitpointId::new())
};
let wiid_str = dyn_fields.get(2).and_then(|v| v.as_ref()).cloned().unwrap_or_default();
let wiid = WorkerInstanceId::new(&wiid_str);
let keys: Vec<String> = vec![
ctx.core(),
ctx.attempt_hash(att_idx),
ctx.stream_meta(att_idx),
ctx.lease_current(),
ctx.lease_history(),
idx.lease_expiry(),
idx.worker_leases(&wiid),
ctx.suspension_current(),
ctx.waitpoint(&wp_id),
ctx.waitpoint_condition(&wp_id),
idx.suspension_timeout(),
idx.lane_terminal(&lane),
idx.attempt_timeout(),
idx.execution_deadline(),
idx.lane_eligible(&lane),
idx.lane_delayed(&lane),
idx.lane_blocked_dependencies(&lane),
idx.lane_blocked_budget(&lane),
idx.lane_blocked_quota(&lane),
idx.lane_blocked_route(&lane),
idx.lane_blocked_operator(&lane),
];
let argv: Vec<String> = vec![
execution_id.to_string(),
reason.to_owned(),
"operator_override".to_owned(),
String::new(),
String::new(),
];
let kr: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
let ar: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
match client.fcall::<ferriskey::Value>("ff_cancel_execution", &kr, &ar).await {
Ok(ferriskey::Value::Array(arr)) => match arr.first() {
Some(Ok(ferriskey::Value::Int(1))) => true,
Some(Ok(ferriskey::Value::Int(0))) => {
let code = arr
.get(1)
.and_then(|r| match r {
Ok(ferriskey::Value::BulkString(b)) => {
Some(String::from_utf8_lossy(b).into_owned())
}
Ok(ferriskey::Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_default();
matches!(code.as_str(), "execution_not_active" | "execution_not_found")
}
_ => false,
},
Ok(_) => false,
Err(e) => {
let retryable = is_retryable_kind(e.kind());
if !retryable {
tracing::warn!(
execution_id = %execution_id,
error = %e,
"cancel_reconciler: permanent error on FCALL; ack to avoid poison"
);
return true;
}
tracing::debug!(
execution_id = %execution_id,
error = %e,
"cancel_reconciler: transient FCALL error; retry next cycle"
);
false
}
}
}
fn is_retryable_kind(kind: ferriskey::ErrorKind) -> bool {
use ferriskey::ErrorKind::*;
matches!(
kind,
IoError | FatalSendError | TryAgain | BusyLoadingError | ClusterDown
)
}