use std::time::Duration;
use ff_core::backend::ScannerFilter;
use ff_core::keys::IndexKeys;
use ff_core::partition::{Partition, PartitionFamily};
use ff_core::types::LaneId;
use super::{should_skip_candidate, ScanResult, Scanner};
const BATCH_SIZE: u32 = 20;
const DEFAULT_RETENTION_MS: u64 = 24 * 60 * 60 * 1000;
pub struct RetentionTrimmer {
interval: Duration,
lanes: Vec<LaneId>,
default_retention_ms: u64,
filter: ScannerFilter,
}
impl RetentionTrimmer {
pub fn new(interval: Duration, lanes: Vec<LaneId>) -> Self {
Self::with_filter(interval, lanes, ScannerFilter::default())
}
pub fn with_filter(interval: Duration, lanes: Vec<LaneId>, filter: ScannerFilter) -> Self {
Self {
interval,
lanes,
default_retention_ms: DEFAULT_RETENTION_MS,
filter,
}
}
}
impl Scanner for RetentionTrimmer {
fn name(&self) -> &'static str {
"retention_trimmer"
}
fn interval(&self) -> Duration {
self.interval
}
fn filter(&self) -> &ScannerFilter {
&self.filter
}
async fn scan_partition(
&self,
client: &ferriskey::Client,
partition: u16,
) -> ScanResult {
let p = Partition {
family: PartitionFamily::Execution,
index: partition,
};
let idx = IndexKeys::new(&p);
let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
Ok(t) => t,
Err(e) => {
tracing::warn!(partition, error = %e, "retention_trimmer: failed to get server time");
return ScanResult { processed: 0, errors: 1 };
}
};
let mut total_processed: u32 = 0;
let mut total_errors: u32 = 0;
for lane in &self.lanes {
let terminal_key = idx.lane_terminal(lane);
let cutoff = now_ms.saturating_sub(self.default_retention_ms);
let expired: Vec<String> = match client
.cmd("ZRANGEBYSCORE")
.arg(&terminal_key)
.arg("-inf")
.arg(cutoff.to_string().as_str())
.arg("LIMIT")
.arg("0")
.arg(BATCH_SIZE.to_string().as_str())
.execute()
.await
{
Ok(ids) => ids,
Err(e) => {
tracing::warn!(
partition, lane = lane.as_str(), error = %e,
"retention_trimmer: ZRANGEBYSCORE failed"
);
total_errors += 1;
continue;
}
};
if expired.is_empty() {
continue;
}
for eid_str in &expired {
if should_skip_candidate(client, &self.filter, partition, eid_str).await {
continue;
}
match purge_execution(
client, &p, &idx, lane, eid_str, &terminal_key, now_ms,
self.default_retention_ms,
).await {
Ok(true) => total_processed += 1,
Ok(false) => {} Err(e) => {
tracing::warn!(
partition,
execution_id = eid_str.as_str(),
error = %e,
"retention_trimmer: purge failed"
);
total_errors += 1;
}
}
}
}
ScanResult { processed: total_processed, errors: total_errors }
}
}
#[allow(clippy::too_many_arguments)]
async fn purge_execution(
client: &ferriskey::Client,
partition: &Partition,
idx: &IndexKeys,
_lane: &LaneId,
eid_str: &str,
terminal_key: &str,
now_ms: u64,
default_retention_ms: u64,
) -> Result<bool, ferriskey::Error> {
let tag = partition.hash_tag();
let exec_core_key = format!("ff:exec:{}:{}:core", tag, eid_str);
let fields: Vec<Option<String>> = client
.cmd("HMGET")
.arg(&exec_core_key)
.arg("completed_at")
.arg("total_attempt_count")
.execute()
.await?;
let completed_at: u64 = fields.first()
.and_then(|v| v.as_ref())
.and_then(|s| s.parse().ok())
.unwrap_or(0);
let total_attempts: u32 = fields.get(1)
.and_then(|v| v.as_ref())
.and_then(|s| s.parse().ok())
.unwrap_or(0);
if completed_at == 0 {
let _: u32 = client.cmd("ZREM").arg(terminal_key).arg(eid_str).execute().await?;
return Ok(true);
}
let policy_key = format!("ff:exec:{}:{}:policy", tag, eid_str);
let retention_ms = read_retention_ms(client, &policy_key, default_retention_ms).await;
if now_ms < completed_at + retention_ms {
return Ok(false); }
let mut del_keys: Vec<String> = Vec::with_capacity(16 + total_attempts as usize * 5);
del_keys.push(format!("ff:exec:{}:{}:payload", tag, eid_str));
del_keys.push(format!("ff:exec:{}:{}:result", tag, eid_str));
del_keys.push(format!("ff:exec:{}:{}:tags", tag, eid_str));
del_keys.push(format!("ff:exec:{}:{}:lease:current", tag, eid_str));
del_keys.push(format!("ff:exec:{}:{}:lease:history", tag, eid_str));
del_keys.push(format!("ff:exec:{}:{}:claim_grant", tag, eid_str));
del_keys.push(format!("ff:exec:{}:{}:attempts", tag, eid_str));
for i in 0..total_attempts {
del_keys.push(format!("ff:attempt:{}:{}:{}", tag, eid_str, i));
del_keys.push(format!("ff:attempt:{}:{}:{}:usage", tag, eid_str, i));
del_keys.push(format!("ff:attempt:{}:{}:{}:policy", tag, eid_str, i));
del_keys.push(format!("ff:stream:{}:{}:{}", tag, eid_str, i));
del_keys.push(format!("ff:stream:{}:{}:{}:meta", tag, eid_str, i));
}
del_keys.push(format!("ff:exec:{}:{}:suspension:current", tag, eid_str));
let deps_all_edges_key = format!("ff:exec:{}:{}:deps:all_edges", tag, eid_str);
let dep_edge_ids: Vec<String> = client
.cmd("SMEMBERS")
.arg(&deps_all_edges_key)
.execute()
.await
.unwrap_or_default();
del_keys.push(format!("ff:exec:{}:{}:deps:meta", tag, eid_str));
del_keys.push(format!("ff:exec:{}:{}:deps:unresolved", tag, eid_str));
del_keys.push(deps_all_edges_key);
for edge_id in &dep_edge_ids {
del_keys.push(format!("ff:exec:{}:{}:dep:{}", tag, eid_str, edge_id));
}
let waitpoints_key = format!("ff:exec:{}:{}:waitpoints", tag, eid_str);
let wp_ids: Vec<String> = client
.cmd("SMEMBERS")
.arg(&waitpoints_key)
.execute()
.await
.unwrap_or_default();
del_keys.push(waitpoints_key);
for wp_id_str in &wp_ids {
del_keys.push(format!("ff:wp:{}:{}", tag, wp_id_str));
del_keys.push(format!("ff:wp:{}:{}:signals", tag, wp_id_str));
del_keys.push(format!("ff:wp:{}:{}:condition", tag, wp_id_str));
}
let signal_key = format!("ff:exec:{}:{}:signals", tag, eid_str);
let sig_ids: Vec<String> = client
.cmd("ZRANGE")
.arg(&signal_key)
.arg("0")
.arg("-1")
.execute()
.await
.unwrap_or_default();
del_keys.push(signal_key);
for sig_id_str in &sig_ids {
del_keys.push(format!("ff:signal:{}:{}", tag, sig_id_str));
del_keys.push(format!("ff:signal:{}:{}:payload", tag, sig_id_str));
}
for chunk in del_keys.chunks(500) {
let key_refs: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect();
let _: u32 = client
.cmd("DEL")
.arg(&key_refs)
.execute()
.await?;
}
let _: u32 = client
.cmd("DEL")
.arg(&[exec_core_key.as_str(), policy_key.as_str()][..])
.execute()
.await?;
let _: u32 = client.cmd("ZREM").arg(terminal_key).arg(eid_str).execute().await?;
let all_exec_key = idx.all_executions();
let _: u32 = client.cmd("SREM").arg(&all_exec_key).arg(eid_str).execute().await?;
tracing::debug!(
execution_id = eid_str,
attempts = total_attempts,
waitpoints = wp_ids.len(),
signals = sig_ids.len(),
"retention_trimmer: purged execution"
);
Ok(true)
}
async fn read_retention_ms(
client: &ferriskey::Client,
policy_key: &str,
default_retention_ms: u64,
) -> u64 {
let policy_json: Option<String> = match client
.cmd("GET")
.arg(policy_key)
.execute()
.await
{
Ok(v) => v,
Err(_) => return default_retention_ms,
};
let json_str = match policy_json {
Some(s) if !s.is_empty() => s,
_ => return default_retention_ms,
};
let parsed: serde_json::Value = match serde_json::from_str(&json_str) {
Ok(v) => v,
Err(_) => return default_retention_ms,
};
parsed
.get("stream_policy")
.and_then(|sp| sp.get("retention_ttl_ms"))
.and_then(|v| v.as_u64())
.unwrap_or(default_retention_ms)
}