ff_backend_postgres/reconcilers/dependency.rs
1//! Dependency-resolution reconciler (RFC-v0.7 Wave 6a).
2//!
3//! Backstop for the per-hop-tx cascade implemented in
4//! [`crate::dispatch::dispatch_completion`] (Wave 5a). The cascade
5//! is deliberately NOT wrapped in a single transaction spanning
6//! transitive descendants — a mid-cascade failure (crash, lock
7//! timeout, SERIALIZABLE retry exhaustion) leaves downstream
8//! `ff_completion_event` rows with `dispatched_at_ms IS NULL`. This
9//! reconciler sweeps those rows on its interval and re-invokes
10//! `dispatch_completion`, which is idempotent via the
11//! `UPDATE ... RETURNING` claim on `dispatched_at_ms`.
12//!
13//! # Transitive-descendant sweep (K-2 round-2)
14//!
15//! The reconciler query covers the **entire transitive descendant
16//! set** of any interrupted cascade, not just 1-hop children of the
17//! original trigger. Proof by construction:
18//!
19//! 1. Every `PolicyDecision::Satisfied` / `::Impossible` that
20//! transitions a downstream to `completed` / `skipped` INSERTs a
21//! new row into `ff_completion_event` inside the same per-hop tx
22//! ([`crate::dispatch::advance_edge_group`] line ~430).
23//! 2. If the per-hop tx for hop N commits but hop N+1 crashes, the
24//! hop-N event lands durably with `dispatched_at_ms = NULL`.
25//! 3. The reconciler finds that row via the partial index
26//! `ff_completion_event_pending_dispatch_idx` and fires
27//! `dispatch_completion(event_id)`, which kicks the cascade at
28//! hop N+1.
29//! 4. Hop N+1's successful execution emits hop N+2's event, and so
30//! on — the reconciler's next tick (or the in-process cascade
31//! inside `dispatch_completion` itself) carries the chain
32//! forward.
33//!
34//! So a single crashed cascade of depth D eventually drains in at
35//! most D reconciler ticks regardless of where the interruption
36//! happened in the chain.
37//!
38//! # Interval + threshold
39//!
40//! `stale_threshold_ms` (default 1 000 ms) keeps the reconciler out
41//! of the hot path: the normal in-process dispatcher flips
42//! `dispatched_at_ms` within sub-millisecond of the commit that
43//! inserted the row, so rows younger than the threshold are left
44//! for the primary dispatcher.
45
46use ff_core::backend::ScannerFilter;
47use ff_core::engine_error::EngineError;
48use sqlx::{PgPool, Row};
49
50use crate::dispatch::{dispatch_completion, DispatchOutcome};
51use crate::error::map_sqlx_error;
52
53/// Per-tick work report. Returned so the driving scanner task can
54/// emit metrics + logs at cycle boundary without re-querying.
55#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
56pub struct ReconcileReport {
57 /// Rows matched by the scan query before filter application.
58 pub scanned: u64,
59 /// Events that produced `DispatchOutcome::Advanced(_)`.
60 pub reconciled: u64,
61 /// Events that produced `DispatchOutcome::NoOp` (concurrent
62 /// dispatcher won the claim race between our SELECT and our
63 /// re-dispatch — legal, logged at trace).
64 pub noop: u64,
65 /// Events skipped by the `ScannerFilter`.
66 pub filtered: u64,
67 /// Re-dispatch invocations that returned `EngineError`. The
68 /// event row stays `dispatched_at_ms IS NULL` and the next tick
69 /// will retry.
70 pub errors: u64,
71}
72
73/// Max rows per tick. Bounds the per-tick pool + transaction budget.
74const BATCH_LIMIT: i64 = 1_000;
75
76/// Default minimum age of a `ff_completion_event` row before the
77/// reconciler will claim it. Keeps this off the hot path — the
78/// primary dispatcher has this long to flip `dispatched_at_ms` from
79/// NULL before the reconciler takes over.
80pub const DEFAULT_STALE_THRESHOLD_MS: i64 = 1_000;
81
82/// Run one reconciler tick. See module docs for semantics.
83///
84/// The query uses the `ff_completion_event_pending_dispatch_idx`
85/// partial index from migration 0003 for partition-local index
86/// scans. `ScannerFilter` is applied inline via optional SQL
87/// predicates (namespace + instance_tag both live on the event row
88/// — issue #122 column additions) so we do not pay an extra RTT per
89/// candidate like the Valkey reconciler does.
90#[tracing::instrument(name = "pg.dep_reconciler.tick", skip(pool, filter))]
91pub async fn reconcile_tick(
92 pool: &PgPool,
93 filter: &ScannerFilter,
94 stale_threshold_ms: i64,
95) -> Result<ReconcileReport, EngineError> {
96 let now_ms = i64::try_from(
97 std::time::SystemTime::now()
98 .duration_since(std::time::UNIX_EPOCH)
99 .map(|d| d.as_millis())
100 .unwrap_or(0),
101 )
102 .unwrap_or(i64::MAX);
103 let cutoff_ms = now_ms.saturating_sub(stale_threshold_ms);
104
105 // We bind both filter dimensions as NULLABLE parameters and let
106 // `($N IS NULL OR column = $N)` short-circuit in the planner.
107 // That keeps one prepared statement for every filter shape.
108 let ns_param: Option<String> = filter.namespace.as_ref().map(|ns| ns.as_str().to_owned());
109 let tag_param: Option<String> = filter
110 .instance_tag
111 .as_ref()
112 .map(|(_, v)| v.clone());
113
114 let rows = sqlx::query(
115 r#"
116 SELECT event_id, namespace, instance_tag
117 FROM ff_completion_event
118 WHERE dispatched_at_ms IS NULL
119 AND committed_at_ms < $1
120 AND ($2::text IS NULL OR namespace = $2)
121 AND ($3::text IS NULL OR instance_tag = $3)
122 ORDER BY event_id ASC
123 LIMIT $4
124 "#,
125 )
126 .bind(cutoff_ms)
127 .bind(&ns_param)
128 .bind(&tag_param)
129 .bind(BATCH_LIMIT)
130 .fetch_all(pool)
131 .await
132 .map_err(map_sqlx_error)?;
133
134 let mut report = ReconcileReport {
135 scanned: rows.len() as u64,
136 ..ReconcileReport::default()
137 };
138
139 for row in rows {
140 let event_id: i64 = row.get("event_id");
141 match dispatch_completion(pool, event_id).await {
142 Ok(DispatchOutcome::Advanced(n)) => {
143 report.reconciled += 1;
144 tracing::debug!(
145 event_id,
146 advanced = n,
147 "dep_reconciler: re-dispatched stale completion event"
148 );
149 }
150 Ok(DispatchOutcome::NoOp) => {
151 report.noop += 1;
152 tracing::trace!(
153 event_id,
154 "dep_reconciler: concurrent dispatcher won claim race"
155 );
156 }
157 Err(e) => {
158 report.errors += 1;
159 tracing::warn!(
160 event_id,
161 error = %e,
162 "dep_reconciler: dispatch_completion failed — will retry next tick"
163 );
164 }
165 }
166 }
167
168 // `filtered` is always zero here: filter pushdown is SQL-side.
169 // The field is retained in the report so a future filter
170 // dimension that can't be pushed down stays additive-compatible.
171 let _ = &mut report.filtered;
172
173 Ok(report)
174}
175
176// ── unit tests ───────────────────────────────────────────────────────────
177
178#[cfg(test)]
179mod tests {
180 use super::*;
181
182 #[test]
183 fn report_default_is_zero() {
184 let r = ReconcileReport::default();
185 assert_eq!(r.scanned, 0);
186 assert_eq!(r.reconciled, 0);
187 assert_eq!(r.noop, 0);
188 assert_eq!(r.filtered, 0);
189 assert_eq!(r.errors, 0);
190 }
191
192 #[test]
193 fn default_threshold_is_one_second() {
194 assert_eq!(DEFAULT_STALE_THRESHOLD_MS, 1_000);
195 }
196}