ff_backend_postgres/completion.rs
1//! `CompletionBackend` implementation for Postgres (Wave 4h / Q1).
2//!
3//! Transport design (per RFC-v0.7 Q1 adjudication):
4//!
5//! * Durable source of truth is the `ff_completion_event` outbox
6//! table — a `bigserial` event_id column sequenced across all 256
7//! hash partitions gives a single total order.
8//! * `pg_notify('ff_completion', event_id::text)` fires from the
9//! `ff_completion_event_notify_trg` trigger (see
10//! `migrations/0001_initial.sql` §Section 8). NOTIFY is the wake
11//! signal only — the payload carries only the event_id cursor so
12//! we sidestep Postgres's 8 KB NOTIFY payload cap and the natural
13//! coalescing (`pg` collapses duplicate payloads on the same
14//! channel within a transaction).
15//! * Each subscriber owns a long-lived tokio task holding a
16//! [`PgListener`] (one dedicated pg connection) + issues
17//! `SELECT ... WHERE event_id > $last_seen` on every wake to
18//! catch bursts NOTIFY coalesced.
19//! * On LISTEN-connection loss (pg restart, PgBouncer eviction,
20//! network drop), `PgListener::recv()` surfaces an error; we
21//! re-establish LISTEN + replay from `last_seen_event_id`.
22//! * Subscriber drop ends the task via `tx.closed()`.
23//!
24//! Missed NOTIFY between commit and LISTEN start is covered by
25//! `dependency_reconciler`'s scan — out of scope here.
26//!
27//! Coordination with Wave 4e (stream LISTEN/NOTIFY via
28//! [`super::listener::StreamNotifier`]): completion subscribers own
29//! their own `PgListener` instance; they do NOT share the
30//! `StreamNotifier` task. Completion fanout is low cardinality
31//! (engines subscribe, not every execution) so a per-subscriber
32//! connection is cheap — reuse is a future optimisation once Wave 4e
33//! stabilises.
34
35use std::pin::Pin;
36use std::task::{Context, Poll};
37use std::time::Duration;
38
39use async_trait::async_trait;
40use ff_core::backend::{CompletionPayload, ScannerFilter};
41use ff_core::completion_backend::{CompletionBackend, CompletionStream};
42use ff_core::engine_error::EngineError;
43use ff_core::types::{ExecutionId, FlowId, Namespace, TimestampMs};
44use futures_core::Stream;
45use sqlx::postgres::{PgListener, PgPool};
46use tokio::sync::mpsc;
47use tokio::task::JoinHandle;
48use uuid::Uuid;
49
50use crate::PostgresBackend;
51
52/// Channel name fired by `ff_notify_completion_event()` after every
53/// `ff_completion_event` INSERT.
54pub const COMPLETION_CHANNEL: &str = "ff_completion";
55
56/// Bounded fan-out capacity. Matches the Valkey backend's
57/// [`ff_backend_valkey::completion::STREAM_CAPACITY`] so consumer
58/// code sees equivalent backpressure behaviour across backends.
59const STREAM_CAPACITY: usize = 1024;
60
61/// Max rows pulled per wake; caps catch-up query cost.
62const REPLAY_BATCH: i64 = 256;
63
64/// Reconnect backoff when the LISTEN connection drops.
65const RECONNECT_BACKOFF: Duration = Duration::from_millis(200);
66
67/// Stream adapter returned to callers. Holds the `mpsc::Receiver`
68/// plus the task handle that feeds it; dropping the stream drops the
69/// receiver, `tx.closed()` fires in the task, and the task exits —
70/// which aborts the owned `JoinHandle` cleanly on drop.
71pub struct PostgresCompletionStream {
72 rx: mpsc::Receiver<CompletionPayload>,
73 /// Task handle kept alive for the stream's lifetime. Aborted on
74 /// drop (no need to `.await` — the task also exits when the
75 /// receiver is dropped via `tx.closed()`).
76 handle: JoinHandle<()>,
77}
78
79impl Drop for PostgresCompletionStream {
80 fn drop(&mut self) {
81 self.handle.abort();
82 }
83}
84
85impl Stream for PostgresCompletionStream {
86 type Item = CompletionPayload;
87
88 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
89 self.rx.poll_recv(cx)
90 }
91}
92
93/// Decoded outbox row. Mirrors the `ff_completion_event` columns we
94/// need to build a [`CompletionPayload`] + apply [`ScannerFilter`].
95struct CompletionEventRow {
96 event_id: i64,
97 execution_id: Uuid,
98 flow_id: Option<Uuid>,
99 outcome: String,
100 namespace: Option<String>,
101 instance_tag: Option<String>,
102 occurred_at_ms: i64,
103 partition_key: i16,
104}
105
106impl CompletionEventRow {
107 /// Build a [`CompletionPayload`] from the outbox row.
108 ///
109 /// The stored `execution_id` is a bare UUID; we re-attach the
110 /// `{fp:N}:<uuid>` hash-tag using the row's `partition_key` so
111 /// downstream consumers see the same execution-id shape they
112 /// would get from the Valkey backend.
113 fn into_payload(self) -> CompletionPayload {
114 let eid_string = format!("{{fp:{}}}:{}", self.partition_key, self.execution_id);
115 // Parse goes through the canonical shape check; on a
116 // well-formed Wave-3 row this is infallible.
117 let execution_id = ExecutionId::parse(&eid_string)
118 .expect("ff_completion_event row produced malformed ExecutionId");
119 let mut payload = CompletionPayload::new(
120 execution_id,
121 self.outcome,
122 None, // payload_bytes — not persisted in Wave 3 outbox
123 TimestampMs(self.occurred_at_ms),
124 );
125 if let Some(fid) = self.flow_id {
126 payload = payload.with_flow_id(FlowId::from_uuid(fid));
127 }
128 payload
129 }
130
131 /// Apply a [`ScannerFilter`]. The filter's `namespace` dimension
132 /// is checked against the row's `namespace` column; the
133 /// `instance_tag` dimension is checked against the row's
134 /// `instance_tag` column, which the Wave-3 schema stores as the
135 /// denormalised `"key=value"` pair written by the completion
136 /// emitter. When the filter specifies a tag the row must carry a
137 /// matching exact pair; missing `instance_tag` never matches.
138 fn passes(&self, filter: &ScannerFilter) -> bool {
139 if let Some(ref want_ns) = filter.namespace {
140 let have: Option<Namespace> = self.namespace.as_deref().map(Namespace::from);
141 if have.as_ref() != Some(want_ns) {
142 return false;
143 }
144 }
145 if let Some((ref k, ref v)) = filter.instance_tag {
146 let want = format!("{k}={v}");
147 match self.instance_tag {
148 Some(ref have) if have == &want => {}
149 _ => return false,
150 }
151 }
152 true
153 }
154}
155
156/// Public entrypoint — subscribe to completions with an optional
157/// [`ScannerFilter`].
158///
159/// Starting cursor is `max(event_id)` at subscribe time: we tail new
160/// events only. Replay from an older cursor is a future feature and
161/// is deliberately out of scope for v0.7.
162pub(crate) async fn subscribe(
163 pool: &PgPool,
164 filter: Option<ScannerFilter>,
165) -> Result<CompletionStream, EngineError> {
166 let filter = filter.unwrap_or(ScannerFilter::NOOP);
167
168 // Resolve the starting cursor BEFORE we spawn the listen task so
169 // synchronous-setup errors (pool exhausted, schema missing) bubble
170 // back to the caller as `EngineError` per the trait contract.
171 let start: i64 = sqlx::query_scalar("SELECT COALESCE(MAX(event_id), 0) FROM ff_completion_event")
172 .fetch_one(pool)
173 .await
174 .map_err(|e| EngineError::Unavailable {
175 op: match &e {
176 sqlx::Error::Database(_) => "pg.subscribe_completions (max(event_id))",
177 _ => "pg.subscribe_completions (connect)",
178 },
179 })?;
180
181 let (tx, rx) = mpsc::channel::<CompletionPayload>(STREAM_CAPACITY);
182 let pool_clone = pool.clone();
183 let handle = tokio::spawn(subscriber_loop(pool_clone, tx, filter, start));
184
185 let stream = PostgresCompletionStream { rx, handle };
186 Ok(Box::pin(stream))
187}
188
189/// Long-lived LISTEN + replay loop. Ends when `tx.closed()` fires
190/// (consumer dropped the stream) or an unrecoverable setup error
191/// occurs — logged at `tracing::error!` and leaves the receiver to
192/// observe end-of-stream.
193async fn subscriber_loop(
194 pool: PgPool,
195 tx: mpsc::Sender<CompletionPayload>,
196 filter: ScannerFilter,
197 mut last_seen: i64,
198) {
199 loop {
200 // (Re)establish the dedicated LISTEN connection.
201 let mut listener = match PgListener::connect_with(&pool).await {
202 Ok(l) => l,
203 Err(e) => {
204 tracing::warn!(
205 error = %e,
206 "pg.completion.subscribe: PgListener::connect_with failed; retrying"
207 );
208 if wait_or_exit(&tx, RECONNECT_BACKOFF).await {
209 return;
210 }
211 continue;
212 }
213 };
214 if let Err(e) = listener.listen(COMPLETION_CHANNEL).await {
215 tracing::warn!(
216 error = %e,
217 "pg.completion.subscribe: LISTEN ff_completion failed; retrying"
218 );
219 if wait_or_exit(&tx, RECONNECT_BACKOFF).await {
220 return;
221 }
222 continue;
223 }
224
225 // Catch-up replay immediately after (re)subscribe — covers
226 // any events committed between last_seen and the LISTEN
227 // registration that NOTIFY missed for us.
228 if !replay(&pool, &tx, &filter, &mut last_seen).await {
229 return; // tx closed
230 }
231
232 // Steady-state: recv one NOTIFY, drain all events above the
233 // cursor, repeat. `recv()` also surfaces connection drops as
234 // Err; on error we break to the outer reconnect loop.
235 loop {
236 tokio::select! {
237 _ = tx.closed() => return,
238 res = listener.recv() => {
239 match res {
240 Ok(_notification) => {
241 if !replay(&pool, &tx, &filter, &mut last_seen).await {
242 return;
243 }
244 }
245 Err(e) => {
246 tracing::warn!(
247 error = %e,
248 "pg.completion.subscribe: listener.recv() error; reconnecting"
249 );
250 break; // outer loop rebuilds the listener
251 }
252 }
253 }
254 }
255 }
256
257 if wait_or_exit(&tx, RECONNECT_BACKOFF).await {
258 return;
259 }
260 }
261}
262
263/// Sleep for `d` OR return `true` if the subscriber dropped during
264/// the wait (caller should exit).
265async fn wait_or_exit(tx: &mpsc::Sender<CompletionPayload>, d: Duration) -> bool {
266 tokio::select! {
267 _ = tx.closed() => true,
268 _ = tokio::time::sleep(d) => false,
269 }
270}
271
272/// Drain all events above `last_seen`, filter, forward. Returns
273/// `false` iff the consumer has dropped the stream (caller must
274/// exit).
275async fn replay(
276 pool: &PgPool,
277 tx: &mpsc::Sender<CompletionPayload>,
278 filter: &ScannerFilter,
279 last_seen: &mut i64,
280) -> bool {
281 loop {
282 let rows: Vec<CompletionEventRow> = match sqlx::query_as::<_, (i64, Uuid, Option<Uuid>, String, Option<String>, Option<String>, i64, i16)>(
283 "SELECT event_id, execution_id, flow_id, outcome, namespace, instance_tag, occurred_at_ms, partition_key \
284 FROM ff_completion_event \
285 WHERE event_id > $1 \
286 ORDER BY event_id ASC \
287 LIMIT $2"
288 )
289 .bind(*last_seen)
290 .bind(REPLAY_BATCH)
291 .fetch_all(pool)
292 .await
293 {
294 Ok(rows) => rows
295 .into_iter()
296 .map(|(event_id, execution_id, flow_id, outcome, namespace, instance_tag, occurred_at_ms, partition_key)| CompletionEventRow {
297 event_id,
298 execution_id,
299 flow_id,
300 outcome,
301 namespace,
302 instance_tag,
303 occurred_at_ms,
304 partition_key,
305 })
306 .collect(),
307 Err(e) => {
308 tracing::warn!(error = %e, "pg.completion.replay: query failed");
309 return !tx.is_closed();
310 }
311 };
312
313 if rows.is_empty() {
314 return !tx.is_closed();
315 }
316
317 for row in rows {
318 *last_seen = row.event_id;
319 let passes = row.passes(filter);
320 if !passes {
321 continue;
322 }
323 let payload = row.into_payload();
324 if tx.send(payload).await.is_err() {
325 return false; // consumer dropped
326 }
327 }
328 // Loop: there may be more rows than REPLAY_BATCH. Keep
329 // draining until we hit an empty page.
330 }
331}
332
333/// Compile-time proof that `PostgresBackend` stays dyn-compatible
334/// under the `CompletionBackend` trait. Mirrors the sibling guard in
335/// `ff_core::completion_backend::_assert_dyn_compatible`.
336#[allow(dead_code)]
337fn _assert_pg_dyn_completion(b: std::sync::Arc<PostgresBackend>) -> std::sync::Arc<dyn CompletionBackend> {
338 b
339}
340
341#[async_trait]
342impl CompletionBackend for PostgresBackend {
343 async fn subscribe_completions(&self) -> Result<CompletionStream, EngineError> {
344 subscribe(&self.pool, None).await
345 }
346
347 async fn subscribe_completions_filtered(
348 &self,
349 filter: &ScannerFilter,
350 ) -> Result<CompletionStream, EngineError> {
351 if filter.is_noop() {
352 return self.subscribe_completions().await;
353 }
354 subscribe(&self.pool, Some(filter.clone())).await
355 }
356}