use ff_core::backend::ScannerFilter;
use ff_core::engine_error::EngineError;
use sqlx::{PgPool, Row};
use crate::dispatch::{dispatch_completion, DispatchOutcome};
use crate::error::map_sqlx_error;
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub struct ReconcileReport {
pub scanned: u64,
pub reconciled: u64,
pub noop: u64,
pub filtered: u64,
pub errors: u64,
}
const BATCH_LIMIT: i64 = 1_000;
pub const DEFAULT_STALE_THRESHOLD_MS: i64 = 1_000;
#[tracing::instrument(name = "pg.dep_reconciler.tick", skip(pool, filter))]
pub async fn reconcile_tick(
pool: &PgPool,
filter: &ScannerFilter,
stale_threshold_ms: i64,
) -> Result<ReconcileReport, EngineError> {
let now_ms = i64::try_from(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis())
.unwrap_or(0),
)
.unwrap_or(i64::MAX);
let cutoff_ms = now_ms.saturating_sub(stale_threshold_ms);
let ns_param: Option<String> = filter.namespace.as_ref().map(|ns| ns.as_str().to_owned());
let tag_param: Option<String> = filter
.instance_tag
.as_ref()
.map(|(_, v)| v.clone());
let rows = sqlx::query(
r#"
SELECT event_id, namespace, instance_tag
FROM ff_completion_event
WHERE dispatched_at_ms IS NULL
AND committed_at_ms < $1
AND ($2::text IS NULL OR namespace = $2)
AND ($3::text IS NULL OR instance_tag = $3)
ORDER BY event_id ASC
LIMIT $4
"#,
)
.bind(cutoff_ms)
.bind(&ns_param)
.bind(&tag_param)
.bind(BATCH_LIMIT)
.fetch_all(pool)
.await
.map_err(map_sqlx_error)?;
let mut report = ReconcileReport {
scanned: rows.len() as u64,
..ReconcileReport::default()
};
for row in rows {
let event_id: i64 = row.get("event_id");
match dispatch_completion(pool, event_id).await {
Ok(DispatchOutcome::Advanced(n)) => {
report.reconciled += 1;
tracing::debug!(
event_id,
advanced = n,
"dep_reconciler: re-dispatched stale completion event"
);
}
Ok(DispatchOutcome::NoOp) => {
report.noop += 1;
tracing::trace!(
event_id,
"dep_reconciler: concurrent dispatcher won claim race"
);
}
Err(e) => {
report.errors += 1;
tracing::warn!(
event_id,
error = %e,
"dep_reconciler: dispatch_completion failed — will retry next tick"
);
}
}
}
let _ = &mut report.filtered;
Ok(report)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn report_default_is_zero() {
let r = ReconcileReport::default();
assert_eq!(r.scanned, 0);
assert_eq!(r.reconciled, 0);
assert_eq!(r.noop, 0);
assert_eq!(r.filtered, 0);
assert_eq!(r.errors, 0);
}
#[test]
fn default_threshold_is_one_second() {
assert_eq!(DEFAULT_STALE_THRESHOLD_MS, 1_000);
}
}