Skip to main content

ff_backend_postgres/
completion.rs

1//! `CompletionBackend` implementation for Postgres (Wave 4h / Q1).
2//!
3//! Transport design (per RFC-v0.7 Q1 adjudication):
4//!
5//! * Durable source of truth is the `ff_completion_event` outbox
6//!   table — a `bigserial` event_id column sequenced across all 256
7//!   hash partitions gives a single total order.
8//! * `pg_notify('ff_completion', event_id::text)` fires from the
9//!   `ff_completion_event_notify_trg` trigger (see
10//!   `migrations/0001_initial.sql` §Section 8). NOTIFY is the wake
11//!   signal only — the payload carries only the event_id cursor so
12//!   we sidestep Postgres's 8 KB NOTIFY payload cap and the natural
13//!   coalescing (`pg` collapses duplicate payloads on the same
14//!   channel within a transaction).
15//! * Each subscriber owns a long-lived tokio task holding a
16//!   [`PgListener`] (one dedicated pg connection) + issues
17//!   `SELECT ... WHERE event_id > $last_seen` on every wake to
18//!   catch bursts NOTIFY coalesced.
19//! * On LISTEN-connection loss (pg restart, PgBouncer eviction,
20//!   network drop), `PgListener::recv()` surfaces an error; we
21//!   re-establish LISTEN + replay from `last_seen_event_id`.
22//! * Subscriber drop ends the task via `tx.closed()`.
23//!
24//! Missed NOTIFY between commit and LISTEN start is covered by
25//! `dependency_reconciler`'s scan — out of scope here.
26//!
27//! Coordination with Wave 4e (stream LISTEN/NOTIFY via
28//! [`super::listener::StreamNotifier`]): completion subscribers own
29//! their own `PgListener` instance; they do NOT share the
30//! `StreamNotifier` task. Completion fanout is low cardinality
31//! (engines subscribe, not every execution) so a per-subscriber
32//! connection is cheap — reuse is a future optimisation once Wave 4e
33//! stabilises.
34
35use std::pin::Pin;
36use std::task::{Context, Poll};
37use std::time::Duration;
38
39use async_trait::async_trait;
40use ff_core::backend::{CompletionPayload, ScannerFilter};
41use ff_core::completion_backend::{CompletionBackend, CompletionStream};
42use ff_core::engine_error::EngineError;
43use ff_core::types::{ExecutionId, FlowId, Namespace, TimestampMs};
44use futures_core::Stream;
45use sqlx::postgres::{PgListener, PgPool};
46use tokio::sync::mpsc;
47use tokio::task::JoinHandle;
48use uuid::Uuid;
49
50use crate::PostgresBackend;
51
52/// Channel name fired by `ff_notify_completion_event()` after every
53/// `ff_completion_event` INSERT.
54pub const COMPLETION_CHANNEL: &str = "ff_completion";
55
56/// Bounded fan-out capacity. Matches the Valkey backend's
57/// [`ff_backend_valkey::completion::STREAM_CAPACITY`] so consumer
58/// code sees equivalent backpressure behaviour across backends.
59const STREAM_CAPACITY: usize = 1024;
60
61/// Max rows pulled per wake; caps catch-up query cost.
62const REPLAY_BATCH: i64 = 256;
63
64/// Reconnect backoff when the LISTEN connection drops.
65const RECONNECT_BACKOFF: Duration = Duration::from_millis(200);
66
67/// Stream adapter returned to callers. Holds the `mpsc::Receiver`
68/// plus the task handle that feeds it; dropping the stream drops the
69/// receiver, `tx.closed()` fires in the task, and the task exits —
70/// which aborts the owned `JoinHandle` cleanly on drop.
71pub struct PostgresCompletionStream {
72    rx: mpsc::Receiver<CompletionPayload>,
73    /// Task handle kept alive for the stream's lifetime. Aborted on
74    /// drop (no need to `.await` — the task also exits when the
75    /// receiver is dropped via `tx.closed()`).
76    handle: JoinHandle<()>,
77}
78
79impl Drop for PostgresCompletionStream {
80    fn drop(&mut self) {
81        self.handle.abort();
82    }
83}
84
85impl Stream for PostgresCompletionStream {
86    type Item = CompletionPayload;
87
88    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
89        self.rx.poll_recv(cx)
90    }
91}
92
93/// Decoded outbox row. Mirrors the `ff_completion_event` columns we
94/// need to build a [`CompletionPayload`] + apply [`ScannerFilter`].
95struct CompletionEventRow {
96    event_id: i64,
97    execution_id: Uuid,
98    flow_id: Option<Uuid>,
99    outcome: String,
100    namespace: Option<String>,
101    instance_tag: Option<String>,
102    occurred_at_ms: i64,
103    partition_key: i16,
104}
105
106impl CompletionEventRow {
107    /// Build a [`CompletionPayload`] from the outbox row.
108    ///
109    /// The stored `execution_id` is a bare UUID; we re-attach the
110    /// `{fp:N}:<uuid>` hash-tag using the row's `partition_key` so
111    /// downstream consumers see the same execution-id shape they
112    /// would get from the Valkey backend.
113    fn into_payload(self) -> CompletionPayload {
114        let eid_string = format!("{{fp:{}}}:{}", self.partition_key, self.execution_id);
115        // Parse goes through the canonical shape check; on a
116        // well-formed Wave-3 row this is infallible.
117        let execution_id = ExecutionId::parse(&eid_string)
118            .expect("ff_completion_event row produced malformed ExecutionId");
119        let mut payload = CompletionPayload::new(
120            execution_id,
121            self.outcome,
122            None, // payload_bytes — not persisted in Wave 3 outbox
123            TimestampMs(self.occurred_at_ms),
124        );
125        if let Some(fid) = self.flow_id {
126            payload = payload.with_flow_id(FlowId::from_uuid(fid));
127        }
128        payload
129    }
130
131    /// Apply a [`ScannerFilter`]. The filter's `namespace` dimension
132    /// is checked against the row's `namespace` column; the
133    /// `instance_tag` dimension is checked against the row's
134    /// `instance_tag` column, which the Wave-3 schema stores as the
135    /// denormalised `"key=value"` pair written by the completion
136    /// emitter. When the filter specifies a tag the row must carry a
137    /// matching exact pair; missing `instance_tag` never matches.
138    fn passes(&self, filter: &ScannerFilter) -> bool {
139        if let Some(ref want_ns) = filter.namespace {
140            let have: Option<Namespace> = self.namespace.as_deref().map(Namespace::from);
141            if have.as_ref() != Some(want_ns) {
142                return false;
143            }
144        }
145        if let Some((ref k, ref v)) = filter.instance_tag {
146            let want = format!("{k}={v}");
147            match self.instance_tag {
148                Some(ref have) if have == &want => {}
149                _ => return false,
150            }
151        }
152        true
153    }
154}
155
156/// Public entrypoint — subscribe to completions with an optional
157/// [`ScannerFilter`].
158///
159/// Starting cursor is `max(event_id)` at subscribe time: we tail new
160/// events only. Replay from an older cursor is a future feature and
161/// is deliberately out of scope for v0.7.
162pub(crate) async fn subscribe(
163    pool: &PgPool,
164    filter: Option<ScannerFilter>,
165) -> Result<CompletionStream, EngineError> {
166    let filter = filter.unwrap_or(ScannerFilter::NOOP);
167
168    // Resolve the starting cursor BEFORE we spawn the listen task so
169    // synchronous-setup errors (pool exhausted, schema missing) bubble
170    // back to the caller as `EngineError` per the trait contract.
171    let start: i64 = sqlx::query_scalar("SELECT COALESCE(MAX(event_id), 0) FROM ff_completion_event")
172        .fetch_one(pool)
173        .await
174        .map_err(|e| EngineError::Unavailable {
175            op: match &e {
176                sqlx::Error::Database(_) => "pg.subscribe_completions (max(event_id))",
177                _ => "pg.subscribe_completions (connect)",
178            },
179        })?;
180
181    let (tx, rx) = mpsc::channel::<CompletionPayload>(STREAM_CAPACITY);
182    let pool_clone = pool.clone();
183    let handle = tokio::spawn(subscriber_loop(pool_clone, tx, filter, start));
184
185    let stream = PostgresCompletionStream { rx, handle };
186    Ok(Box::pin(stream))
187}
188
189/// Long-lived LISTEN + replay loop. Ends when `tx.closed()` fires
190/// (consumer dropped the stream) or an unrecoverable setup error
191/// occurs — logged at `tracing::error!` and leaves the receiver to
192/// observe end-of-stream.
193async fn subscriber_loop(
194    pool: PgPool,
195    tx: mpsc::Sender<CompletionPayload>,
196    filter: ScannerFilter,
197    mut last_seen: i64,
198) {
199    loop {
200        // (Re)establish the dedicated LISTEN connection.
201        let mut listener = match PgListener::connect_with(&pool).await {
202            Ok(l) => l,
203            Err(e) => {
204                tracing::warn!(
205                    error = %e,
206                    "pg.completion.subscribe: PgListener::connect_with failed; retrying"
207                );
208                if wait_or_exit(&tx, RECONNECT_BACKOFF).await {
209                    return;
210                }
211                continue;
212            }
213        };
214        if let Err(e) = listener.listen(COMPLETION_CHANNEL).await {
215            tracing::warn!(
216                error = %e,
217                "pg.completion.subscribe: LISTEN ff_completion failed; retrying"
218            );
219            if wait_or_exit(&tx, RECONNECT_BACKOFF).await {
220                return;
221            }
222            continue;
223        }
224
225        // Catch-up replay immediately after (re)subscribe — covers
226        // any events committed between last_seen and the LISTEN
227        // registration that NOTIFY missed for us.
228        if !replay(&pool, &tx, &filter, &mut last_seen).await {
229            return; // tx closed
230        }
231
232        // Steady-state: recv one NOTIFY, drain all events above the
233        // cursor, repeat. `recv()` also surfaces connection drops as
234        // Err; on error we break to the outer reconnect loop.
235        loop {
236            tokio::select! {
237                _ = tx.closed() => return,
238                res = listener.recv() => {
239                    match res {
240                        Ok(_notification) => {
241                            if !replay(&pool, &tx, &filter, &mut last_seen).await {
242                                return;
243                            }
244                        }
245                        Err(e) => {
246                            tracing::warn!(
247                                error = %e,
248                                "pg.completion.subscribe: listener.recv() error; reconnecting"
249                            );
250                            break; // outer loop rebuilds the listener
251                        }
252                    }
253                }
254            }
255        }
256
257        if wait_or_exit(&tx, RECONNECT_BACKOFF).await {
258            return;
259        }
260    }
261}
262
263/// Sleep for `d` OR return `true` if the subscriber dropped during
264/// the wait (caller should exit).
265async fn wait_or_exit(tx: &mpsc::Sender<CompletionPayload>, d: Duration) -> bool {
266    tokio::select! {
267        _ = tx.closed() => true,
268        _ = tokio::time::sleep(d) => false,
269    }
270}
271
272/// Drain all events above `last_seen`, filter, forward. Returns
273/// `false` iff the consumer has dropped the stream (caller must
274/// exit).
275async fn replay(
276    pool: &PgPool,
277    tx: &mpsc::Sender<CompletionPayload>,
278    filter: &ScannerFilter,
279    last_seen: &mut i64,
280) -> bool {
281    loop {
282        let rows: Vec<CompletionEventRow> = match sqlx::query_as::<_, (i64, Uuid, Option<Uuid>, String, Option<String>, Option<String>, i64, i16)>(
283            "SELECT event_id, execution_id, flow_id, outcome, namespace, instance_tag, occurred_at_ms, partition_key \
284             FROM ff_completion_event \
285             WHERE event_id > $1 \
286             ORDER BY event_id ASC \
287             LIMIT $2"
288        )
289        .bind(*last_seen)
290        .bind(REPLAY_BATCH)
291        .fetch_all(pool)
292        .await
293        {
294            Ok(rows) => rows
295                .into_iter()
296                .map(|(event_id, execution_id, flow_id, outcome, namespace, instance_tag, occurred_at_ms, partition_key)| CompletionEventRow {
297                    event_id,
298                    execution_id,
299                    flow_id,
300                    outcome,
301                    namespace,
302                    instance_tag,
303                    occurred_at_ms,
304                    partition_key,
305                })
306                .collect(),
307            Err(e) => {
308                tracing::warn!(error = %e, "pg.completion.replay: query failed");
309                return !tx.is_closed();
310            }
311        };
312
313        if rows.is_empty() {
314            return !tx.is_closed();
315        }
316
317        for row in rows {
318            *last_seen = row.event_id;
319            let passes = row.passes(filter);
320            if !passes {
321                continue;
322            }
323            let payload = row.into_payload();
324            if tx.send(payload).await.is_err() {
325                return false; // consumer dropped
326            }
327        }
328        // Loop: there may be more rows than REPLAY_BATCH. Keep
329        // draining until we hit an empty page.
330    }
331}
332
333/// Compile-time proof that `PostgresBackend` stays dyn-compatible
334/// under the `CompletionBackend` trait. Mirrors the sibling guard in
335/// `ff_core::completion_backend::_assert_dyn_compatible`.
336#[allow(dead_code)]
337fn _assert_pg_dyn_completion(b: std::sync::Arc<PostgresBackend>) -> std::sync::Arc<dyn CompletionBackend> {
338    b
339}
340
341#[async_trait]
342impl CompletionBackend for PostgresBackend {
343    async fn subscribe_completions(&self) -> Result<CompletionStream, EngineError> {
344        subscribe(&self.pool, None).await
345    }
346
347    async fn subscribe_completions_filtered(
348        &self,
349        filter: &ScannerFilter,
350    ) -> Result<CompletionStream, EngineError> {
351        if filter.is_noop() {
352            return self.subscribe_completions().await;
353        }
354        subscribe(&self.pool, Some(filter.clone())).await
355    }
356}