pub mod attempt_timeout;
pub mod budget_reconciler;
pub mod execution_deadline;
pub mod budget_reset;
pub mod delayed_promoter;
pub mod dependency_reconciler;
pub mod flow_projector;
pub mod index_reconciler;
pub mod lease_expiry;
pub mod pending_wp_expiry;
pub mod quota_reconciler;
pub mod retention_trimmer;
pub mod suspension_timeout;
pub mod unblock;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Mutex;
use std::time::Duration;
use tokio::sync::watch;
use tokio::task::JoinHandle;
pub struct ScanResult {
pub processed: u32,
pub errors: u32,
}
const FAILURE_THRESHOLD: u32 = 3;
const BACKOFF_CYCLES: u64 = 10;
const GC_THRESHOLD: usize = 500;
struct FailureEntry {
consecutive_failures: u32,
skip_until_cycle: u64,
}
#[derive(Default)]
pub struct FailureTracker {
inner: Mutex<HashMap<String, FailureEntry>>,
cycle: AtomicU64,
}
impl FailureTracker {
pub fn new() -> Self {
Self::default()
}
pub fn advance_cycle(&self) {
let cycle = self.cycle.fetch_add(1, Ordering::Relaxed) + 1;
if cycle.is_multiple_of(50) {
let mut map = self.inner.lock().unwrap();
if map.len() > GC_THRESHOLD {
map.retain(|_, e| {
e.consecutive_failures >= FAILURE_THRESHOLD
&& e.skip_until_cycle > cycle
});
}
}
}
pub fn should_skip(&self, key: &str) -> bool {
let mut map = self.inner.lock().unwrap();
if let Some(entry) = map.get_mut(key)
&& entry.consecutive_failures >= FAILURE_THRESHOLD
{
let cycle = self.cycle.load(Ordering::Relaxed);
if entry.skip_until_cycle > cycle {
return true;
}
entry.consecutive_failures = 0;
entry.skip_until_cycle = 0;
}
false
}
pub fn record_failure(&self, key: &str, scanner_name: &str) {
let mut map = self.inner.lock().unwrap();
let entry = map.entry(key.to_owned()).or_insert(FailureEntry {
consecutive_failures: 0,
skip_until_cycle: 0,
});
entry.consecutive_failures += 1;
if entry.consecutive_failures == FAILURE_THRESHOLD {
let cycle = self.cycle.load(Ordering::Relaxed);
entry.skip_until_cycle = cycle + BACKOFF_CYCLES;
tracing::error!(
scanner = scanner_name,
item = key,
failures = entry.consecutive_failures,
backoff_cycles = BACKOFF_CYCLES,
"persistent FCALL failure — skipping for {BACKOFF_CYCLES} scan cycles"
);
}
}
pub fn record_success(&self, key: &str) {
let mut map = self.inner.lock().unwrap();
map.remove(key);
}
}
pub trait Scanner: Send + Sync + 'static {
fn name(&self) -> &'static str;
fn interval(&self) -> Duration;
fn scan_partition(
&self,
client: &ferriskey::Client,
partition: u16,
) -> impl std::future::Future<Output = ScanResult> + Send;
}
pub struct ScannerRunner;
impl ScannerRunner {
pub fn spawn<S: Scanner>(
scanner: Arc<S>,
client: ferriskey::Client,
num_partitions: u16,
mut shutdown: watch::Receiver<bool>,
) -> JoinHandle<()> {
tokio::spawn(async move {
let name = scanner.name();
let interval = scanner.interval().max(Duration::from_millis(100));
tracing::info!(scanner = name, ?interval, partitions = num_partitions, "scanner started");
loop {
let cycle_start = tokio::time::Instant::now();
let mut total_processed: u32 = 0;
let mut total_errors: u32 = 0;
for p in 0..num_partitions {
if *shutdown.borrow() {
tracing::info!(scanner = name, "shutdown requested, stopping");
return;
}
let result = scanner.scan_partition(&client, p).await;
total_processed += result.processed;
total_errors += result.errors;
}
let elapsed = cycle_start.elapsed();
if total_processed > 0 || total_errors > 0 {
tracing::info!(
scanner = name,
processed = total_processed,
errors = total_errors,
elapsed_ms = elapsed.as_millis() as u64,
"scan cycle complete"
);
} else {
tracing::trace!(
scanner = name,
elapsed_ms = elapsed.as_millis() as u64,
"scan cycle complete (nothing to do)"
);
}
let sleep_dur = interval.saturating_sub(elapsed);
tokio::select! {
_ = tokio::time::sleep(sleep_dur) => {}
_ = shutdown.changed() => {
if *shutdown.borrow() {
tracing::info!(scanner = name, "shutdown requested, stopping");
return;
}
}
}
}
})
}
}