Skip to main content

ff_backend_postgres/reconcilers/
dependency.rs

1//! Dependency-resolution reconciler (RFC-v0.7 Wave 6a).
2//!
3//! Backstop for the per-hop-tx cascade implemented in
4//! [`crate::dispatch::dispatch_completion`] (Wave 5a). The cascade
5//! is deliberately NOT wrapped in a single transaction spanning
6//! transitive descendants — a mid-cascade failure (crash, lock
7//! timeout, SERIALIZABLE retry exhaustion) leaves downstream
8//! `ff_completion_event` rows with `dispatched_at_ms IS NULL`. This
9//! reconciler sweeps those rows on its interval and re-invokes
10//! `dispatch_completion`, which is idempotent via the
11//! `UPDATE ... RETURNING` claim on `dispatched_at_ms`.
12//!
13//! # Transitive-descendant sweep (K-2 round-2)
14//!
15//! The reconciler query covers the **entire transitive descendant
16//! set** of any interrupted cascade, not just 1-hop children of the
17//! original trigger. Proof by construction:
18//!
19//! 1. Every `PolicyDecision::Satisfied` / `::Impossible` that
20//!    transitions a downstream to `completed` / `skipped` INSERTs a
21//!    new row into `ff_completion_event` inside the same per-hop tx
22//!    ([`crate::dispatch::advance_edge_group`] line ~430).
23//! 2. If the per-hop tx for hop N commits but hop N+1 crashes, the
24//!    hop-N event lands durably with `dispatched_at_ms = NULL`.
25//! 3. The reconciler finds that row via the partial index
26//!    `ff_completion_event_pending_dispatch_idx` and fires
27//!    `dispatch_completion(event_id)`, which kicks the cascade at
28//!    hop N+1.
29//! 4. Hop N+1's successful execution emits hop N+2's event, and so
30//!    on — the reconciler's next tick (or the in-process cascade
31//!    inside `dispatch_completion` itself) carries the chain
32//!    forward.
33//!
34//! So a single crashed cascade of depth D eventually drains in at
35//! most D reconciler ticks regardless of where the interruption
36//! happened in the chain.
37//!
38//! # Interval + threshold
39//!
40//! `stale_threshold_ms` (default 1 000 ms) keeps the reconciler out
41//! of the hot path: the normal in-process dispatcher flips
42//! `dispatched_at_ms` within sub-millisecond of the commit that
43//! inserted the row, so rows younger than the threshold are left
44//! for the primary dispatcher.
45
46use ff_core::backend::ScannerFilter;
47use ff_core::engine_error::EngineError;
48use sqlx::{PgPool, Row};
49
50use crate::dispatch::{dispatch_completion, DispatchOutcome};
51use crate::error::map_sqlx_error;
52
53/// Per-tick work report. Returned so the driving scanner task can
54/// emit metrics + logs at cycle boundary without re-querying.
55#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
56pub struct ReconcileReport {
57    /// Rows matched by the scan query before filter application.
58    pub scanned: u64,
59    /// Events that produced `DispatchOutcome::Advanced(_)`.
60    pub reconciled: u64,
61    /// Events that produced `DispatchOutcome::NoOp` (concurrent
62    /// dispatcher won the claim race between our SELECT and our
63    /// re-dispatch — legal, logged at trace).
64    pub noop: u64,
65    /// Events skipped by the `ScannerFilter`.
66    pub filtered: u64,
67    /// Re-dispatch invocations that returned `EngineError`. The
68    /// event row stays `dispatched_at_ms IS NULL` and the next tick
69    /// will retry.
70    pub errors: u64,
71}
72
73/// Max rows per tick. Bounds the per-tick pool + transaction budget.
74const BATCH_LIMIT: i64 = 1_000;
75
76/// Default minimum age of a `ff_completion_event` row before the
77/// reconciler will claim it. Keeps this off the hot path — the
78/// primary dispatcher has this long to flip `dispatched_at_ms` from
79/// NULL before the reconciler takes over.
80pub const DEFAULT_STALE_THRESHOLD_MS: i64 = 1_000;
81
82/// Run one reconciler tick. See module docs for semantics.
83///
84/// The query uses the `ff_completion_event_pending_dispatch_idx`
85/// partial index from migration 0003 for partition-local index
86/// scans. `ScannerFilter` is applied inline via optional SQL
87/// predicates (namespace + instance_tag both live on the event row
88/// — issue #122 column additions) so we do not pay an extra RTT per
89/// candidate like the Valkey reconciler does.
90#[tracing::instrument(name = "pg.dep_reconciler.tick", skip(pool, filter))]
91pub async fn reconcile_tick(
92    pool: &PgPool,
93    filter: &ScannerFilter,
94    stale_threshold_ms: i64,
95) -> Result<ReconcileReport, EngineError> {
96    let now_ms = i64::try_from(
97        std::time::SystemTime::now()
98            .duration_since(std::time::UNIX_EPOCH)
99            .map(|d| d.as_millis())
100            .unwrap_or(0),
101    )
102    .unwrap_or(i64::MAX);
103    let cutoff_ms = now_ms.saturating_sub(stale_threshold_ms);
104
105    // We bind both filter dimensions as NULLABLE parameters and let
106    // `($N IS NULL OR column = $N)` short-circuit in the planner.
107    // That keeps one prepared statement for every filter shape.
108    let ns_param: Option<String> = filter.namespace.as_ref().map(|ns| ns.as_str().to_owned());
109    let tag_param: Option<String> = filter
110        .instance_tag
111        .as_ref()
112        .map(|(_, v)| v.clone());
113
114    let rows = sqlx::query(
115        r#"
116        SELECT event_id, namespace, instance_tag
117          FROM ff_completion_event
118         WHERE dispatched_at_ms IS NULL
119           AND committed_at_ms < $1
120           AND ($2::text IS NULL OR namespace = $2)
121           AND ($3::text IS NULL OR instance_tag = $3)
122         ORDER BY event_id ASC
123         LIMIT $4
124        "#,
125    )
126    .bind(cutoff_ms)
127    .bind(&ns_param)
128    .bind(&tag_param)
129    .bind(BATCH_LIMIT)
130    .fetch_all(pool)
131    .await
132    .map_err(map_sqlx_error)?;
133
134    let mut report = ReconcileReport {
135        scanned: rows.len() as u64,
136        ..ReconcileReport::default()
137    };
138
139    for row in rows {
140        let event_id: i64 = row.get("event_id");
141        match dispatch_completion(pool, event_id).await {
142            Ok(DispatchOutcome::Advanced(n)) => {
143                report.reconciled += 1;
144                tracing::debug!(
145                    event_id,
146                    advanced = n,
147                    "dep_reconciler: re-dispatched stale completion event"
148                );
149            }
150            Ok(DispatchOutcome::NoOp) => {
151                report.noop += 1;
152                tracing::trace!(
153                    event_id,
154                    "dep_reconciler: concurrent dispatcher won claim race"
155                );
156            }
157            Err(e) => {
158                report.errors += 1;
159                tracing::warn!(
160                    event_id,
161                    error = %e,
162                    "dep_reconciler: dispatch_completion failed — will retry next tick"
163                );
164            }
165        }
166    }
167
168    // `filtered` is always zero here: filter pushdown is SQL-side.
169    // The field is retained in the report so a future filter
170    // dimension that can't be pushed down stays additive-compatible.
171    let _ = &mut report.filtered;
172
173    Ok(report)
174}
175
176// ── unit tests ───────────────────────────────────────────────────────────
177
178#[cfg(test)]
179mod tests {
180    use super::*;
181
182    #[test]
183    fn report_default_is_zero() {
184        let r = ReconcileReport::default();
185        assert_eq!(r.scanned, 0);
186        assert_eq!(r.reconciled, 0);
187        assert_eq!(r.noop, 0);
188        assert_eq!(r.filtered, 0);
189        assert_eq!(r.errors, 0);
190    }
191
192    #[test]
193    fn default_threshold_is_one_second() {
194        assert_eq!(DEFAULT_STALE_THRESHOLD_MS, 1_000);
195    }
196}