use std::time::Duration;
use ff_core::backend::ScannerFilter;
use ff_core::keys::IndexKeys;
use ff_core::partition::{Partition, PartitionFamily};
use super::{should_skip_candidate, FailureTracker, ScanResult, Scanner};
const BATCH_SIZE: u32 = 100;
pub struct PendingWaitpointExpiryScanner {
interval: Duration,
failures: FailureTracker,
filter: ScannerFilter,
}
impl PendingWaitpointExpiryScanner {
pub fn new(interval: Duration) -> Self {
Self::with_filter(interval, ScannerFilter::default())
}
pub fn with_filter(interval: Duration, filter: ScannerFilter) -> Self {
Self {
interval,
failures: FailureTracker::new(),
filter,
}
}
}
impl Scanner for PendingWaitpointExpiryScanner {
fn name(&self) -> &'static str {
"pending_wp_expiry"
}
fn interval(&self) -> Duration {
self.interval
}
fn filter(&self) -> &ScannerFilter {
&self.filter
}
async fn scan_partition(
&self,
client: &ferriskey::Client,
partition: u16,
) -> ScanResult {
let p = Partition {
family: PartitionFamily::Execution,
index: partition,
};
let idx = IndexKeys::new(&p);
let expiry_key = idx.pending_waitpoint_expiry();
let tag = p.hash_tag();
let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
Ok(t) => t,
Err(e) => {
tracing::warn!(partition, error = %e, "pending_wp_expiry: failed to get server time");
return ScanResult { processed: 0, errors: 1 };
}
};
let expired: Vec<String> = match client
.cmd("ZRANGEBYSCORE")
.arg(&expiry_key)
.arg("-inf")
.arg(now_ms.to_string().as_str())
.arg("LIMIT")
.arg("0")
.arg(BATCH_SIZE.to_string().as_str())
.execute()
.await
{
Ok(ids) => ids,
Err(e) => {
tracing::warn!(partition, error = %e, "pending_wp_expiry: ZRANGEBYSCORE failed");
return ScanResult { processed: 0, errors: 1 };
}
};
if partition == 0 {
self.failures.advance_cycle();
}
if expired.is_empty() {
return ScanResult { processed: 0, errors: 0 };
}
let mut processed: u32 = 0;
let mut errors: u32 = 0;
for wp_id_str in &expired {
if self.failures.should_skip(wp_id_str) {
continue;
}
if !self.filter.is_noop() {
let waitpoint_hash = format!("ff:wp:{}:{}", tag, wp_id_str);
let eid: Option<String> = match client
.cmd("HGET")
.arg(&waitpoint_hash)
.arg("execution_id")
.execute()
.await
{
Ok(v) => v,
Err(_) => {
continue;
}
};
let Some(eid) = eid else { continue };
if should_skip_candidate(client, &self.filter, partition, &eid).await {
continue;
}
}
match close_expired_waitpoint(client, &tag, &idx, wp_id_str).await {
Ok(()) => {
self.failures.record_success(wp_id_str);
processed += 1;
}
Err(e) => {
tracing::warn!(
partition,
waitpoint_id = wp_id_str.as_str(),
error = %e,
"pending_wp_expiry: ff_close_waitpoint failed"
);
self.failures.record_failure(wp_id_str, "pending_wp_expiry");
errors += 1;
}
}
}
ScanResult { processed, errors }
}
}
async fn close_expired_waitpoint(
client: &ferriskey::Client,
tag: &str,
idx: &IndexKeys,
wp_id_str: &str,
) -> Result<(), ferriskey::Error> {
let waitpoint_hash = format!("ff:wp:{}:{}", tag, wp_id_str);
let eid: Option<String> = client
.cmd("HGET")
.arg(&waitpoint_hash)
.arg("execution_id")
.execute()
.await?;
let eid = eid.unwrap_or_default();
let exec_core = format!("ff:exec:{}:{}:core", tag, eid);
let pending_wp_expiry = idx.pending_waitpoint_expiry();
let keys: [&str; 3] = [
&exec_core,
&waitpoint_hash,
&pending_wp_expiry,
];
let argv: [&str; 2] = [wp_id_str, "never_committed"];
let _: ferriskey::Value = client
.fcall("ff_close_waitpoint", &keys, &argv)
.await?;
Ok(())
}