pub mod attempt_timeout;
pub mod budget_reconciler;
pub mod execution_deadline;
pub mod budget_reset;
pub mod delayed_promoter;
pub mod cancel_reconciler;
pub mod dependency_reconciler;
pub mod flow_projector;
pub mod index_reconciler;
pub mod lease_expiry;
pub mod pending_wp_expiry;
pub mod quota_reconciler;
pub mod retention_trimmer;
pub mod suspension_timeout;
pub mod unblock;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Mutex;
use std::time::Duration;
use ff_core::backend::ScannerFilter;
use ff_core::partition::{Partition, PartitionFamily};
use ff_core::types::Namespace;
use tokio::sync::watch;
use tokio::task::JoinHandle;
pub struct ScanResult {
pub processed: u32,
pub errors: u32,
}
const FAILURE_THRESHOLD: u32 = 3;
const BACKOFF_CYCLES: u64 = 10;
const GC_THRESHOLD: usize = 500;
struct FailureEntry {
consecutive_failures: u32,
skip_until_cycle: u64,
}
#[derive(Default)]
pub struct FailureTracker {
inner: Mutex<HashMap<String, FailureEntry>>,
cycle: AtomicU64,
}
impl FailureTracker {
pub fn new() -> Self {
Self::default()
}
pub fn advance_cycle(&self) {
let cycle = self.cycle.fetch_add(1, Ordering::Relaxed) + 1;
if cycle.is_multiple_of(50) {
let mut map = self.inner.lock().unwrap();
if map.len() > GC_THRESHOLD {
map.retain(|_, e| {
e.consecutive_failures >= FAILURE_THRESHOLD
&& e.skip_until_cycle > cycle
});
}
}
}
pub fn should_skip(&self, key: &str) -> bool {
let mut map = self.inner.lock().unwrap();
if let Some(entry) = map.get_mut(key)
&& entry.consecutive_failures >= FAILURE_THRESHOLD
{
let cycle = self.cycle.load(Ordering::Relaxed);
if entry.skip_until_cycle > cycle {
return true;
}
entry.consecutive_failures = 0;
entry.skip_until_cycle = 0;
}
false
}
pub fn record_failure(&self, key: &str, scanner_name: &str) {
let mut map = self.inner.lock().unwrap();
let entry = map.entry(key.to_owned()).or_insert(FailureEntry {
consecutive_failures: 0,
skip_until_cycle: 0,
});
entry.consecutive_failures += 1;
if entry.consecutive_failures == FAILURE_THRESHOLD {
let cycle = self.cycle.load(Ordering::Relaxed);
entry.skip_until_cycle = cycle + BACKOFF_CYCLES;
tracing::error!(
scanner = scanner_name,
item = key,
failures = entry.consecutive_failures,
backoff_cycles = BACKOFF_CYCLES,
"persistent FCALL failure — skipping for {BACKOFF_CYCLES} scan cycles"
);
}
}
pub fn record_success(&self, key: &str) {
let mut map = self.inner.lock().unwrap();
map.remove(key);
}
}
pub trait Scanner: Send + Sync + 'static {
fn name(&self) -> &'static str;
fn interval(&self) -> Duration;
fn filter(&self) -> &ScannerFilter {
&ScannerFilter::NOOP
}
fn scan_partition(
&self,
client: &ferriskey::Client,
partition: u16,
) -> impl std::future::Future<Output = ScanResult> + Send;
fn sample_backlog_depth(
&self,
_client: &ferriskey::Client,
_partition: u16,
) -> impl std::future::Future<Output = Option<u64>> + Send {
async { None }
}
}
pub async fn should_skip_candidate(
client: &ferriskey::Client,
filter: &ScannerFilter,
partition: u16,
eid: &str,
) -> bool {
if filter.is_noop() {
return false;
}
let p = Partition {
family: PartitionFamily::Execution,
index: partition,
};
let tag = p.hash_tag();
if let Some(ref want_ns) = filter.namespace {
let core_key = format!("ff:exec:{}:{}:core", tag, eid);
match client
.cmd("HGET")
.arg(&core_key)
.arg("namespace")
.execute::<Option<String>>()
.await
{
Ok(Some(s)) => {
if &Namespace::new(s) != want_ns {
return true;
}
}
_ => return true,
}
}
if let Some((ref tag_key, ref want_value)) = filter.instance_tag {
let tags_key = format!("ff:exec:{}:{}:tags", tag, eid);
match client
.cmd("HGET")
.arg(&tags_key)
.arg(tag_key.as_str())
.execute::<Option<String>>()
.await
{
Ok(Some(v)) if &v == want_value => {}
_ => return true,
}
}
false
}
pub struct ScannerRunner;
impl ScannerRunner {
pub fn spawn<S: Scanner>(
scanner: Arc<S>,
client: ferriskey::Client,
num_partitions: u16,
mut shutdown: watch::Receiver<bool>,
metrics: Arc<ff_observability::Metrics>,
) -> JoinHandle<()> {
tokio::spawn(async move {
let name = scanner.name();
let interval = scanner.interval().max(Duration::from_millis(100));
tracing::info!(scanner = name, ?interval, partitions = num_partitions, "scanner started");
loop {
let cycle_start = tokio::time::Instant::now();
let mut total_processed: u32 = 0;
let mut total_errors: u32 = 0;
let mut total_backlog_depth: u64 = 0;
let mut sampled = false;
let mut sample_valid = true;
for p in 0..num_partitions {
if *shutdown.borrow() {
tracing::info!(scanner = name, "shutdown requested, stopping");
return;
}
let result = scanner.scan_partition(&client, p).await;
total_processed += result.processed;
total_errors += result.errors;
match scanner.sample_backlog_depth(&client, p).await {
Some(d) => {
sampled = true;
total_backlog_depth =
total_backlog_depth.saturating_add(d);
}
None => {
if sampled {
sample_valid = false;
}
}
}
}
let elapsed = cycle_start.elapsed();
metrics.record_scanner_cycle(name, elapsed);
if sampled && sample_valid {
metrics.set_cancel_backlog_depth(total_backlog_depth);
} else if sampled && !sample_valid {
tracing::debug!(
scanner = name,
"skipping cancel_backlog_depth gauge write this cycle \
(partial partition sample)"
);
}
if total_processed > 0 || total_errors > 0 {
tracing::info!(
scanner = name,
processed = total_processed,
errors = total_errors,
elapsed_ms = elapsed.as_millis() as u64,
"scan cycle complete"
);
} else {
tracing::trace!(
scanner = name,
elapsed_ms = elapsed.as_millis() as u64,
"scan cycle complete (nothing to do)"
);
}
let sleep_dur = interval.saturating_sub(elapsed);
tokio::select! {
_ = tokio::time::sleep(sleep_dur) => {}
_ = shutdown.changed() => {
if *shutdown.borrow() {
tracing::info!(scanner = name, "shutdown requested, stopping");
return;
}
}
}
}
})
}
}