Skip to main content

ff_backend_postgres/
lib.rs

1//! `EngineBackend` implementation backed by Postgres.
2//!
3//! **RFC-v0.7 Wave 0 — scaffold.** This crate lands the
4//! [`PostgresBackend`] struct + the `impl EngineBackend` block with
5//! every method stubbed to return `EngineError::Unavailable`.
6//! Subsequent waves fill in bodies:
7//!
8//! * Wave 1: cross-cutting error/helpers.
9//! * Wave 2: describe / list / resolve (read surface).
10//! * Wave 3: schema migrations (replaces the Wave 0 placeholder).
11//! * Wave 4: LISTEN/NOTIFY wiring + stream reads/tails.
12//! * Wave 5-7: hot-path write methods.
13//! * Wave 8: ff-server wire-up + dual-backend running.
14//!
15//! The trait stays object-safe; consumers can hold
16//! `Arc<dyn EngineBackend>`. No ferriskey dep — this crate's
17//! transport is `sqlx`.
18
19#![allow(clippy::result_large_err)]
20
21use std::sync::Arc;
22use std::time::Duration;
23
24use async_trait::async_trait;
25
26use ff_core::backend::{
27    AppendFrameOutcome, BackendConfig, CancelFlowPolicy, CancelFlowWait, CapabilitySet,
28    ClaimPolicy, FailOutcome, FailureClass, FailureReason, Frame, Handle, LeaseRenewal,
29    PendingWaitpoint, ReclaimToken, ResumeSignal, SummaryDocument, TailVisibility,
30    UsageDimensions,
31};
32#[cfg(feature = "core")]
33use ff_core::contracts::{
34    AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
35    ApplyDependencyToChildResult, ClaimResumedExecutionArgs, ClaimResumedExecutionResult,
36    CreateExecutionArgs, CreateExecutionResult, CreateFlowArgs, CreateFlowResult,
37    DeliverSignalArgs, DeliverSignalResult, EdgeDependencyPolicy, EdgeDirection, EdgeSnapshot,
38    ListExecutionsPage, ListFlowsPage, ListLanesPage, ListSuspendedPage,
39    SetEdgeGroupPolicyResult, StageDependencyEdgeArgs, StageDependencyEdgeResult,
40};
41#[cfg(feature = "core")]
42use ff_core::state::PublicState;
43use ff_core::contracts::{
44    CancelFlowResult, ExecutionSnapshot, FlowSnapshot, ReportUsageResult,
45    RotateWaitpointHmacSecretAllArgs, RotateWaitpointHmacSecretAllResult, SeedOutcome,
46    SeedWaitpointHmacSecretArgs, SuspendArgs, SuspendOutcome,
47};
48#[cfg(feature = "streaming")]
49use ff_core::contracts::{StreamCursor, StreamFrames};
50use ff_core::engine_backend::EngineBackend;
51use ff_core::engine_error::EngineError;
52#[cfg(feature = "core")]
53use ff_core::partition::PartitionKey;
54use ff_core::partition::PartitionConfig;
55#[cfg(feature = "streaming")]
56use ff_core::types::AttemptIndex;
57#[cfg(feature = "core")]
58use ff_core::types::EdgeId;
59use ff_core::types::{BudgetId, ExecutionId, FlowId, LaneId, TimestampMs};
60// Wave 5a — re-export `PgPool` so crates that depend on
61// `ff-backend-postgres` (and not `sqlx` directly) can name the pool
62// type in their own APIs (e.g. `ff-engine::dispatch_via_postgres`).
63pub use sqlx::PgPool;
64
65#[cfg(feature = "core")]
66mod admin;
67pub mod attempt;
68pub mod budget;
69pub mod completion;
70#[cfg(feature = "core")]
71pub mod dispatch;
72pub mod error;
73pub mod exec_core;
74pub mod flow;
75#[cfg(feature = "core")]
76pub mod flow_staging;
77pub mod handle_codec;
78pub mod listener;
79pub mod migrate;
80pub mod pool;
81#[cfg(feature = "core")]
82pub mod reconcilers;
83#[cfg(feature = "core")]
84pub mod scanner_supervisor;
85#[cfg(feature = "core")]
86pub mod scheduler;
87pub mod signal;
88#[cfg(feature = "streaming")]
89pub mod stream;
90pub mod suspend;
91pub mod suspend_ops;
92pub mod version;
93
94pub use completion::{PostgresCompletionStream, COMPLETION_CHANNEL};
95pub use error::{map_sqlx_error, PostgresTransportError};
96pub use listener::StreamNotifier;
97pub use migrate::{apply_migrations, MigrationError};
98#[cfg(feature = "core")]
99pub use scanner_supervisor::{PostgresScannerConfig, PostgresScannerHandle};
100pub use version::check_schema_version;
101
102// Re-export the new `PostgresConnection` shape so consumers can name
103// it from this crate directly without dipping into `ff_core::backend`.
104// `BackendConfig` is already imported above and is part of the
105// `connect()` signature, so it re-exports transparently via
106// rustdoc — no explicit `pub use` needed.
107pub use ff_core::backend::PostgresConnection;
108
109/// Postgres-backed `EngineBackend`.
110///
111/// Wave 0 shape: holds a `sqlx::PgPool`, the deployment's
112/// [`PartitionConfig`] (Q5 — partition column survives on Postgres
113/// with hash partitioning across the same 256 slots Valkey uses),
114/// and an optional `ff_observability::Metrics` handle mirroring
115/// [`ff_backend_valkey::ValkeyBackend`]. Future waves add the
116/// [`StreamNotifier`] handle once Wave 4 wires up LISTEN/NOTIFY.
117/// RFC-018 Stage A: static capability table for the Postgres
118/// backend at v0.8.1. Rows still `Unsupported` are Wave-9 follow-up
119/// scope — see `docs/POSTGRES_PARITY_MATRIX.md` for the authoritative
120/// per-row status. `Supported` rows correspond to trait methods
121/// `PostgresBackend` overrides with a real body (ingress,
122/// scheduler, seed, read, cross-cutting); `Unsupported` rows
123/// correspond to trait methods that return
124/// `EngineError::Unavailable` on Postgres today.
125///
126/// Streaming rows are marked `Unsupported` rather than omitted
127/// because the Postgres backend exposes the trait's streaming
128/// methods under the default `streaming` feature but returns
129/// `Unavailable` — that is an operationally distinct state from
130/// "compiled out," and consumers benefit from the typed signal.
131static POSTGRES_CAPS: &[(
132    ff_core::capability::Capability,
133    ff_core::capability::CapabilityStatus,
134)] = &[
135    (
136        ff_core::capability::Capability::ClaimForWorker,
137        ff_core::capability::CapabilityStatus::Supported,
138    ),
139    (
140        ff_core::capability::Capability::ClaimFromReclaim,
141        ff_core::capability::CapabilityStatus::Unsupported,
142    ),
143    (
144        ff_core::capability::Capability::SuspendResumeByCount,
145        ff_core::capability::CapabilityStatus::Unsupported,
146    ),
147    (
148        ff_core::capability::Capability::CancelExecution,
149        ff_core::capability::CapabilityStatus::Unsupported,
150    ),
151    (
152        ff_core::capability::Capability::CancelFlow,
153        ff_core::capability::CapabilityStatus::Supported,
154    ),
155    (
156        ff_core::capability::Capability::CancelFlowWaitTimeout,
157        ff_core::capability::CapabilityStatus::Supported,
158    ),
159    (
160        ff_core::capability::Capability::CancelFlowWaitIndefinite,
161        ff_core::capability::CapabilityStatus::Supported,
162    ),
163    (
164        ff_core::capability::Capability::StreamRead,
165        ff_core::capability::CapabilityStatus::Unsupported,
166    ),
167    (
168        ff_core::capability::Capability::StreamBestEffortLive,
169        ff_core::capability::CapabilityStatus::Unsupported,
170    ),
171    (
172        ff_core::capability::Capability::StreamDurableSummary,
173        ff_core::capability::CapabilityStatus::Unsupported,
174    ),
175    (
176        ff_core::capability::Capability::DeliverSignal,
177        ff_core::capability::CapabilityStatus::Unsupported,
178    ),
179    (
180        ff_core::capability::Capability::ListPendingWaitpoints,
181        ff_core::capability::CapabilityStatus::Unsupported,
182    ),
183    (
184        ff_core::capability::Capability::RotateWaitpointHmac,
185        ff_core::capability::CapabilityStatus::Unsupported,
186    ),
187    (
188        ff_core::capability::Capability::SeedWaitpointHmac,
189        ff_core::capability::CapabilityStatus::Supported,
190    ),
191    (
192        ff_core::capability::Capability::ReportUsage,
193        ff_core::capability::CapabilityStatus::Unsupported,
194    ),
195    (
196        ff_core::capability::Capability::ReportUsageAdminPath,
197        ff_core::capability::CapabilityStatus::Unsupported,
198    ),
199    (
200        ff_core::capability::Capability::ResetBudget,
201        ff_core::capability::CapabilityStatus::Unsupported,
202    ),
203    (
204        ff_core::capability::Capability::CreateFlow,
205        ff_core::capability::CapabilityStatus::Supported,
206    ),
207    (
208        ff_core::capability::Capability::CreateExecution,
209        ff_core::capability::CapabilityStatus::Supported,
210    ),
211    (
212        ff_core::capability::Capability::StageDependencyEdge,
213        ff_core::capability::CapabilityStatus::Supported,
214    ),
215    (
216        ff_core::capability::Capability::ApplyDependencyToChild,
217        ff_core::capability::CapabilityStatus::Supported,
218    ),
219    (
220        ff_core::capability::Capability::PreparableBoot,
221        // Postgres `prepare()` returns `NoOp` (schema migrations
222        // are applied out-of-band); reported as `Unsupported` so
223        // consumers don't waste a boot-time call expecting work.
224        ff_core::capability::CapabilityStatus::Unsupported,
225    ),
226    (
227        ff_core::capability::Capability::SubscribeLeaseHistory,
228        ff_core::capability::CapabilityStatus::Unsupported,
229    ),
230    (
231        ff_core::capability::Capability::SubscribeCompletion,
232        ff_core::capability::CapabilityStatus::Unsupported,
233    ),
234    (
235        ff_core::capability::Capability::SubscribeSignalDelivery,
236        ff_core::capability::CapabilityStatus::Unsupported,
237    ),
238    (
239        ff_core::capability::Capability::Ping,
240        ff_core::capability::CapabilityStatus::Supported,
241    ),
242];
243
244pub struct PostgresBackend {
245    #[allow(dead_code)] // filled in across waves 2-7
246    pool: PgPool,
247    #[allow(dead_code)]
248    partition_config: PartitionConfig,
249    #[allow(dead_code)]
250    metrics: Option<Arc<ff_observability::Metrics>>,
251    /// Wave 4: shared LISTEN notifier. Present on `connect()`-built
252    /// backends; `None` on bare `from_pool` constructions that skip
253    /// LISTEN wiring (tests that only exercise the write path).
254    #[allow(dead_code)]
255    stream_notifier: Option<Arc<StreamNotifier>>,
256    /// RFC-017 Wave 8 Stage E3: scanner supervisor handle. Spawned
257    /// during [`Self::connect_with_metrics`] when the caller opts in
258    /// via [`Self::spawn_scanners_during_connect`]; drained on
259    /// [`EngineBackend::shutdown_prepare`]. `None` on `from_pool` /
260    /// test builds that don't want background reconcilers.
261    #[cfg(feature = "core")]
262    scanner_handle: Option<Arc<scanner_supervisor::PostgresScannerHandle>>,
263}
264
265impl PostgresBackend {
266    /// Dial Postgres with [`BackendConfig`] and return the backend as
267    /// `Arc<dyn EngineBackend>`. Modeled on
268    /// [`ff_backend_valkey::ValkeyBackend::connect`] so ff-server /
269    /// SDK call sites can swap backends without changing the
270    /// constructor shape.
271    ///
272    /// **Wave 0:** builds the pool and constructs the backend. Does
273    /// NOT run migrations (Q12 — operator out-of-band). Does NOT run
274    /// the schema-version check (Wave 3 adds the version const and
275    /// wires [`check_schema_version`] in). Does NOT start the LISTEN
276    /// task (Wave 4).
277    ///
278    /// Returns `EngineError::Unavailable` when the config's
279    /// connection arm is not Postgres.
280    pub async fn connect(config: BackendConfig) -> Result<Arc<dyn EngineBackend>, EngineError> {
281        let pool = pool::build_pool(&config).await?;
282        warn_if_max_locks_low(&pool).await;
283        let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
284        let backend = Self {
285            pool,
286            partition_config: PartitionConfig::default(),
287            metrics: None,
288            stream_notifier,
289            #[cfg(feature = "core")]
290            scanner_handle: None,
291        };
292        Ok(Arc::new(backend))
293    }
294
295    /// Test / advanced constructor: build a `PostgresBackend` from an
296    /// already-constructed `PgPool` + explicit partition config. No
297    /// network I/O. Useful for integration tests against a shared
298    /// pool and for a future migration CLI that wants to reuse a pool
299    /// across migrate-run + smoke-check.
300    pub fn from_pool(pool: PgPool, partition_config: PartitionConfig) -> Arc<Self> {
301        let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
302        Arc::new(Self {
303            pool,
304            partition_config,
305            metrics: None,
306            stream_notifier,
307            #[cfg(feature = "core")]
308            scanner_handle: None,
309        })
310    }
311
312    /// RFC-017 Wave 8 Stage E1: dial Postgres with an explicit
313    /// [`PartitionConfig`] + shared [`ff_observability::Metrics`].
314    /// Mirrors [`ff_backend_valkey::ValkeyBackend::connect_with_metrics`]
315    /// so `ff-server::Server::start_with_metrics` can wire the Postgres
316    /// branch without reaching into the pool builder directly.
317    ///
318    /// Returns a concrete `Arc<Self>` rather than `Arc<dyn EngineBackend>`
319    /// so the caller can cast to the trait object after any additional
320    /// field installs (parallel to the Valkey path which calls
321    /// `with_scheduler` / `with_stream_semaphore_permits` before the
322    /// cast). Stage E1 does NOT run `apply_migrations` — schema
323    /// provisioning is an operator concern (matches the Wave 0 contract
324    /// on [`Self::connect`]).
325    pub async fn connect_with_metrics(
326        config: BackendConfig,
327        partition_config: PartitionConfig,
328        metrics: Arc<ff_observability::Metrics>,
329    ) -> Result<Arc<Self>, EngineError> {
330        let pool = pool::build_pool(&config).await?;
331        warn_if_max_locks_low(&pool).await;
332        let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
333        Ok(Arc::new(Self {
334            pool,
335            partition_config,
336            metrics: Some(metrics),
337            stream_notifier,
338            #[cfg(feature = "core")]
339            scanner_handle: None,
340        }))
341    }
342
343    /// RFC-017 Wave 8 Stage E3: spawn the six Postgres reconcilers as
344    /// background tick loops. Returns `true` if the scanner handle
345    /// was installed; `false` if the `Arc<Self>` has outstanding
346    /// clones (mirrors the Valkey `with_*` pattern). Callers must
347    /// invoke this before publishing the `Arc<dyn EngineBackend>` so
348    /// the underlying `Arc::get_mut` succeeds.
349    #[cfg(feature = "core")]
350    pub fn with_scanners(
351        self: &mut Arc<Self>,
352        cfg: scanner_supervisor::PostgresScannerConfig,
353    ) -> bool {
354        let Some(inner) = Arc::get_mut(self) else {
355            return false;
356        };
357        let handle = scanner_supervisor::spawn_scanners(inner.pool.clone(), cfg);
358        inner.scanner_handle = Some(Arc::new(handle));
359        true
360    }
361
362    /// Accessor for the underlying `PgPool`. Stage E1 uses this so
363    /// `ff-server::Server::start_with_metrics` can run
364    /// [`apply_migrations`] on the same pool before handing the backend
365    /// out as `Arc<dyn EngineBackend>`.
366    pub fn pool(&self) -> &PgPool {
367        &self.pool
368    }
369
370    /// Create one execution row (+ seed the lane registry if new).
371    ///
372    /// **RFC-017 Stage A:** this inherent method is retained as a
373    /// thin wrapper around the module-level impl so existing in-tree
374    /// callers (ff-server request handlers, integration tests) keep
375    /// compiling. The trait-lifted entry point is
376    /// [`EngineBackend::create_execution`] below, which calls the
377    /// same impl. Return shape differs — inherent returns
378    /// `ExecutionId`, trait returns
379    /// [`CreateExecutionResult`] per RFC-017 §5 — so we cannot simply
380    /// replace the inherent method. A follow-up PR may deprecate
381    /// this inherent alongside the broader ingress shape alignment.
382    #[cfg(feature = "core")]
383    #[tracing::instrument(name = "pg.create_execution", skip_all)]
384    pub async fn create_execution(
385        &self,
386        args: CreateExecutionArgs,
387    ) -> Result<ExecutionId, EngineError> {
388        exec_core::create_execution_impl(&self.pool, &self.partition_config, args).await
389    }
390
391    // ── RFC-017 Stage A: inherent ingress methods retained for
392    // back-compat with in-tree test harnesses + ff-server direct
393    // calls. The trait-lifted peers (`EngineBackend::create_flow`
394    // etc.) delegate to the SAME module-level impls under the hood.
395    // Follow-up PR may sunset these inherents once all in-tree
396    // consumers route through `Arc<dyn EngineBackend>`.
397
398    #[cfg(feature = "core")]
399    #[tracing::instrument(name = "pg.create_flow.inherent", skip_all)]
400    pub async fn create_flow(
401        &self,
402        args: &CreateFlowArgs,
403    ) -> Result<CreateFlowResult, EngineError> {
404        flow_staging::create_flow(&self.pool, &self.partition_config, args).await
405    }
406
407    #[cfg(feature = "core")]
408    #[tracing::instrument(name = "pg.add_execution_to_flow.inherent", skip_all)]
409    pub async fn add_execution_to_flow(
410        &self,
411        args: &AddExecutionToFlowArgs,
412    ) -> Result<AddExecutionToFlowResult, EngineError> {
413        flow_staging::add_execution_to_flow(&self.pool, &self.partition_config, args).await
414    }
415
416    #[cfg(feature = "core")]
417    #[tracing::instrument(name = "pg.stage_dependency_edge.inherent", skip_all)]
418    pub async fn stage_dependency_edge(
419        &self,
420        args: &StageDependencyEdgeArgs,
421    ) -> Result<StageDependencyEdgeResult, EngineError> {
422        flow_staging::stage_dependency_edge(&self.pool, &self.partition_config, args).await
423    }
424
425    #[cfg(feature = "core")]
426    #[tracing::instrument(name = "pg.apply_dependency_to_child.inherent", skip_all)]
427    pub async fn apply_dependency_to_child(
428        &self,
429        args: &ApplyDependencyToChildArgs,
430    ) -> Result<ApplyDependencyToChildResult, EngineError> {
431        flow_staging::apply_dependency_to_child(&self.pool, &self.partition_config, args).await
432    }
433}
434
435/// Short helper: every stub method returns this. Kept as a function
436/// (rather than a macro) so rust-analyzer / IDE jumps show a single
437/// definition site for the Wave 0 stub pattern.
438#[inline]
439fn unavailable<T>(op: &'static str) -> Result<T, EngineError> {
440    Err(EngineError::Unavailable { op })
441}
442
443#[async_trait]
444impl EngineBackend for PostgresBackend {
445    // ── Claim + lifecycle ──
446
447    #[tracing::instrument(name = "pg.claim", skip_all)]
448    async fn claim(
449        &self,
450        lane: &LaneId,
451        capabilities: &CapabilitySet,
452        policy: ClaimPolicy,
453    ) -> Result<Option<Handle>, EngineError> {
454        attempt::claim(&self.pool, lane, capabilities, &policy).await
455    }
456
457    #[tracing::instrument(name = "pg.renew", skip_all)]
458    async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError> {
459        attempt::renew(&self.pool, handle).await
460    }
461
462    #[tracing::instrument(name = "pg.progress", skip_all)]
463    async fn progress(
464        &self,
465        handle: &Handle,
466        percent: Option<u8>,
467        message: Option<String>,
468    ) -> Result<(), EngineError> {
469        attempt::progress(&self.pool, handle, percent, message).await
470    }
471
472    #[tracing::instrument(name = "pg.append_frame", skip_all)]
473    async fn append_frame(
474        &self,
475        handle: &Handle,
476        frame: Frame,
477    ) -> Result<AppendFrameOutcome, EngineError> {
478        #[cfg(feature = "streaming")]
479        {
480            stream::append_frame(&self.pool, &self.partition_config, handle, frame).await
481        }
482        #[cfg(not(feature = "streaming"))]
483        {
484            let _ = (handle, frame);
485            unavailable("pg.append_frame")
486        }
487    }
488
489    #[tracing::instrument(name = "pg.complete", skip_all)]
490    async fn complete(
491        &self,
492        handle: &Handle,
493        payload: Option<Vec<u8>>,
494    ) -> Result<(), EngineError> {
495        attempt::complete(&self.pool, handle, payload).await
496    }
497
498    #[tracing::instrument(name = "pg.fail", skip_all)]
499    async fn fail(
500        &self,
501        handle: &Handle,
502        reason: FailureReason,
503        classification: FailureClass,
504    ) -> Result<FailOutcome, EngineError> {
505        attempt::fail(&self.pool, handle, reason, classification).await
506    }
507
508    #[tracing::instrument(name = "pg.cancel", skip_all)]
509    async fn cancel(&self, handle: &Handle, reason: &str) -> Result<(), EngineError> {
510        let payload = handle_codec::decode_handle(handle)?;
511        exec_core::cancel_impl(
512            &self.pool,
513            &self.partition_config,
514            &payload.execution_id,
515            reason,
516        )
517        .await
518    }
519
520    #[tracing::instrument(name = "pg.suspend", skip_all)]
521    async fn suspend(
522        &self,
523        handle: &Handle,
524        args: SuspendArgs,
525    ) -> Result<SuspendOutcome, EngineError> {
526        suspend_ops::suspend_impl(&self.pool, &self.partition_config, handle, args).await
527    }
528
529    #[tracing::instrument(name = "pg.create_waitpoint", skip_all)]
530    async fn create_waitpoint(
531        &self,
532        _handle: &Handle,
533        _waitpoint_key: &str,
534        _expires_in: Duration,
535    ) -> Result<PendingWaitpoint, EngineError> {
536        unavailable("pg.create_waitpoint")
537    }
538
539    #[tracing::instrument(name = "pg.observe_signals", skip_all)]
540    async fn observe_signals(
541        &self,
542        handle: &Handle,
543    ) -> Result<Vec<ResumeSignal>, EngineError> {
544        suspend_ops::observe_signals_impl(&self.pool, handle).await
545    }
546
547    #[tracing::instrument(name = "pg.claim_from_reclaim", skip_all)]
548    async fn claim_from_reclaim(
549        &self,
550        token: ReclaimToken,
551    ) -> Result<Option<Handle>, EngineError> {
552        attempt::claim_from_reclaim(&self.pool, token).await
553    }
554
555    #[tracing::instrument(name = "pg.delay", skip_all)]
556    async fn delay(
557        &self,
558        handle: &Handle,
559        delay_until: TimestampMs,
560    ) -> Result<(), EngineError> {
561        attempt::delay(&self.pool, handle, delay_until).await
562    }
563
564    #[tracing::instrument(name = "pg.wait_children", skip_all)]
565    async fn wait_children(&self, handle: &Handle) -> Result<(), EngineError> {
566        attempt::wait_children(&self.pool, handle).await
567    }
568
569    // ── Read / admin ──
570
571    #[tracing::instrument(name = "pg.describe_execution", skip_all)]
572    async fn describe_execution(
573        &self,
574        id: &ExecutionId,
575    ) -> Result<Option<ExecutionSnapshot>, EngineError> {
576        exec_core::describe_execution_impl(&self.pool, &self.partition_config, id).await
577    }
578
579    #[tracing::instrument(name = "pg.describe_flow", skip_all)]
580    async fn describe_flow(
581        &self,
582        id: &FlowId,
583    ) -> Result<Option<FlowSnapshot>, EngineError> {
584        flow::describe_flow(&self.pool, &self.partition_config, id).await
585    }
586
587    #[cfg(feature = "core")]
588    #[tracing::instrument(name = "pg.list_edges", skip_all)]
589    async fn list_edges(
590        &self,
591        flow_id: &FlowId,
592        direction: EdgeDirection,
593    ) -> Result<Vec<EdgeSnapshot>, EngineError> {
594        flow::list_edges(&self.pool, &self.partition_config, flow_id, direction).await
595    }
596
597    #[cfg(feature = "core")]
598    #[tracing::instrument(name = "pg.describe_edge", skip_all)]
599    async fn describe_edge(
600        &self,
601        flow_id: &FlowId,
602        edge_id: &EdgeId,
603    ) -> Result<Option<EdgeSnapshot>, EngineError> {
604        flow::describe_edge(&self.pool, &self.partition_config, flow_id, edge_id).await
605    }
606
607    #[cfg(feature = "core")]
608    #[tracing::instrument(name = "pg.resolve_execution_flow_id", skip_all)]
609    async fn resolve_execution_flow_id(
610        &self,
611        eid: &ExecutionId,
612    ) -> Result<Option<FlowId>, EngineError> {
613        exec_core::resolve_execution_flow_id_impl(&self.pool, &self.partition_config, eid).await
614    }
615
616    #[cfg(feature = "core")]
617    #[tracing::instrument(name = "pg.list_flows", skip_all)]
618    async fn list_flows(
619        &self,
620        partition: PartitionKey,
621        cursor: Option<FlowId>,
622        limit: usize,
623    ) -> Result<ListFlowsPage, EngineError> {
624        flow::list_flows(&self.pool, partition, cursor, limit).await
625    }
626
627    #[cfg(feature = "core")]
628    #[tracing::instrument(name = "pg.list_lanes", skip_all)]
629    async fn list_lanes(
630        &self,
631        cursor: Option<LaneId>,
632        limit: usize,
633    ) -> Result<ListLanesPage, EngineError> {
634        admin::list_lanes_impl(&self.pool, cursor, limit).await
635    }
636
637    #[cfg(feature = "core")]
638    #[tracing::instrument(name = "pg.list_suspended", skip_all)]
639    async fn list_suspended(
640        &self,
641        partition: PartitionKey,
642        cursor: Option<ExecutionId>,
643        limit: usize,
644    ) -> Result<ListSuspendedPage, EngineError> {
645        admin::list_suspended_impl(&self.pool, partition, cursor, limit).await
646    }
647
648    #[cfg(feature = "core")]
649    #[tracing::instrument(name = "pg.list_executions", skip_all)]
650    async fn list_executions(
651        &self,
652        partition: PartitionKey,
653        cursor: Option<ExecutionId>,
654        limit: usize,
655    ) -> Result<ListExecutionsPage, EngineError> {
656        exec_core::list_executions_impl(
657            &self.pool,
658            &self.partition_config,
659            partition,
660            cursor,
661            limit,
662        )
663        .await
664    }
665
666    // ── Trigger ops (issue #150) ──
667
668    #[cfg(feature = "core")]
669    #[tracing::instrument(name = "pg.deliver_signal", skip_all)]
670    async fn deliver_signal(
671        &self,
672        args: DeliverSignalArgs,
673    ) -> Result<DeliverSignalResult, EngineError> {
674        suspend_ops::deliver_signal_impl(&self.pool, &self.partition_config, args).await
675    }
676
677    #[cfg(feature = "core")]
678    #[tracing::instrument(name = "pg.claim_resumed_execution", skip_all)]
679    async fn claim_resumed_execution(
680        &self,
681        args: ClaimResumedExecutionArgs,
682    ) -> Result<ClaimResumedExecutionResult, EngineError> {
683        suspend_ops::claim_resumed_execution_impl(&self.pool, &self.partition_config, args).await
684    }
685
686    // ── RFC-017 Stage A — ingress (promoted from inherent) ────
687
688    /// RFC-017 Wave 8 Stage E1: lift the inherent
689    /// [`PostgresBackend::create_execution`] onto the trait so
690    /// ff-server's migrated HTTP handler can dispatch to Postgres.
691    /// Post-insert the row is idempotent; the Postgres impl does not
692    /// distinguish `Created` from `Duplicate` at the helper level
693    /// (both paths commit and return the execution id), so we always
694    /// surface `Created { public_state: Waiting }` here. A follow-up
695    /// may lift the distinction if a consumer relies on it.
696    #[cfg(feature = "core")]
697    #[tracing::instrument(name = "pg.create_execution.trait", skip_all)]
698    async fn create_execution(
699        &self,
700        args: CreateExecutionArgs,
701    ) -> Result<CreateExecutionResult, EngineError> {
702        let eid = args.execution_id.clone();
703        exec_core::create_execution_impl(&self.pool, &self.partition_config, args).await?;
704        Ok(CreateExecutionResult::Created {
705            execution_id: eid,
706            public_state: PublicState::Waiting,
707        })
708    }
709
710    #[cfg(feature = "core")]
711    #[tracing::instrument(name = "pg.create_flow", skip_all)]
712    async fn create_flow(
713        &self,
714        args: CreateFlowArgs,
715    ) -> Result<CreateFlowResult, EngineError> {
716        flow_staging::create_flow(&self.pool, &self.partition_config, &args).await
717    }
718
719    #[cfg(feature = "core")]
720    #[tracing::instrument(name = "pg.add_execution_to_flow", skip_all)]
721    async fn add_execution_to_flow(
722        &self,
723        args: AddExecutionToFlowArgs,
724    ) -> Result<AddExecutionToFlowResult, EngineError> {
725        flow_staging::add_execution_to_flow(&self.pool, &self.partition_config, &args).await
726    }
727
728    #[cfg(feature = "core")]
729    #[tracing::instrument(name = "pg.stage_dependency_edge", skip_all)]
730    async fn stage_dependency_edge(
731        &self,
732        args: StageDependencyEdgeArgs,
733    ) -> Result<StageDependencyEdgeResult, EngineError> {
734        flow_staging::stage_dependency_edge(&self.pool, &self.partition_config, &args).await
735    }
736
737    #[cfg(feature = "core")]
738    #[tracing::instrument(name = "pg.apply_dependency_to_child", skip_all)]
739    async fn apply_dependency_to_child(
740        &self,
741        args: ApplyDependencyToChildArgs,
742    ) -> Result<ApplyDependencyToChildResult, EngineError> {
743        flow_staging::apply_dependency_to_child(&self.pool, &self.partition_config, &args).await
744    }
745
746    fn backend_label(&self) -> &'static str {
747        "postgres"
748    }
749
750    /// RFC-018 Stage A: populate the capability matrix from the
751    /// static [`POSTGRES_CAPS`] table. The Postgres backend landed
752    /// through RFC-017 Stage E4 at v0.8.0; rows still marked
753    /// `Unsupported` here correspond to Wave-9 follow-up work
754    /// (`cancel_flow_header`, `ack_cancel_member`, read-model,
755    /// operator control, budget/quota, HMAC rotation,
756    /// `list_pending_waitpoints`). See
757    /// `docs/POSTGRES_PARITY_MATRIX.md` for the per-row breakdown.
758    fn capabilities_matrix(&self) -> ff_core::capability::CapabilityMatrix {
759        let mut matrix = ff_core::capability::CapabilityMatrix::new(
760            ff_core::capability::BackendIdentity::new(
761                "postgres",
762                ff_core::capability::Version::new(0, 8, 1),
763                "E-shipped",
764            ),
765        );
766        for (cap, status) in POSTGRES_CAPS.iter() {
767            matrix.set(*cap, status.clone());
768        }
769        matrix
770    }
771
772    /// Issue #281: no-op. Schema migrations are applied out-of-band
773    /// per `rfcs/drafts/v0.7-migration-master.md §Q12` (operator runs
774    /// `sqlx migrate run` or the future `ff-migrate` CLI). Boot runs a
775    /// schema-version check at connect time
776    /// ([`crate::version::check_schema_version`]) and refuses to
777    /// start on mismatch, so by the time `prepare()` is callable
778    /// there is nothing further to do.
779    async fn prepare(
780        &self,
781    ) -> Result<ff_core::backend::PrepareOutcome, EngineError> {
782        Ok(ff_core::backend::PrepareOutcome::NoOp)
783    }
784
785    /// RFC-017 Wave 8 Stage E3 (§4 row 9, §7): forward the claim to the
786    /// Postgres-native admission pipeline. Returns `NoWork` when no
787    /// eligible execution is admissible this scan cycle. Budget
788    /// breaches surface as `NoWork` (leaving the row eligible for a
789    /// retry by another worker); validation-class rejections
790    /// (malformed partition, unknown kid) surface as typed
791    /// [`EngineError`] variants mapped to the Server's 400/503 arms.
792    #[cfg(feature = "core")]
793    #[tracing::instrument(name = "pg.claim_for_worker", skip_all)]
794    async fn claim_for_worker(
795        &self,
796        args: ff_core::contracts::ClaimForWorkerArgs,
797    ) -> Result<ff_core::contracts::ClaimForWorkerOutcome, EngineError> {
798        let sched = scheduler::PostgresScheduler::new(self.pool.clone());
799        let grant_opt = sched
800            .claim_for_worker(
801                &args.lane_id,
802                &args.worker_id,
803                &args.worker_instance_id,
804                &args.worker_capabilities,
805                args.grant_ttl_ms,
806            )
807            .await?;
808        Ok(match grant_opt {
809            Some(g) => ff_core::contracts::ClaimForWorkerOutcome::granted(g),
810            None => ff_core::contracts::ClaimForWorkerOutcome::no_work(),
811        })
812    }
813
814    async fn ping(&self) -> Result<(), EngineError> {
815        // Postgres analogue to Valkey PING — single-round-trip pool
816        // liveness. Errors propagate as transport-class EngineError via
817        // the existing sqlx→EngineError map.
818        let _ = sqlx::query_scalar::<_, i32>("SELECT 1")
819            .fetch_one(&self.pool)
820            .await
821            .map_err(error::map_sqlx_error)?;
822        Ok(())
823    }
824
825    /// RFC-017 Wave 8 Stage E3: drain the scanner supervisor's
826    /// reconciler tasks up to `grace`, then close the sqlx pool.
827    /// Matches the Valkey backend's shutdown_prepare contract —
828    /// bounded best-effort drain, never returns an error.
829    async fn shutdown_prepare(&self, grace: Duration) -> Result<(), EngineError> {
830        #[cfg(feature = "core")]
831        if let Some(handle) = self.scanner_handle.as_ref() {
832            let timed_out = handle.shutdown(grace).await;
833            if timed_out > 0 {
834                tracing::warn!(
835                    timed_out,
836                    ?grace,
837                    "postgres scanner supervisor exceeded grace on shutdown"
838                );
839            }
840        }
841        Ok(())
842    }
843
844    #[tracing::instrument(name = "pg.cancel_flow", skip_all)]
845    async fn cancel_flow(
846        &self,
847        id: &FlowId,
848        policy: CancelFlowPolicy,
849        wait: CancelFlowWait,
850    ) -> Result<CancelFlowResult, EngineError> {
851        let result = flow::cancel_flow(&self.pool, &self.partition_config, id, policy).await?;
852        if let Some(deadline) = ff_core::engine_backend::cancel_flow_wait_deadline(wait) {
853            ff_core::engine_backend::wait_for_flow_cancellation(self, id, deadline).await?;
854        }
855        Ok(result)
856    }
857
858    #[cfg(feature = "core")]
859    #[tracing::instrument(name = "pg.set_edge_group_policy", skip_all)]
860    async fn set_edge_group_policy(
861        &self,
862        flow_id: &FlowId,
863        downstream_execution_id: &ExecutionId,
864        policy: EdgeDependencyPolicy,
865    ) -> Result<SetEdgeGroupPolicyResult, EngineError> {
866        flow::set_edge_group_policy(
867            &self.pool,
868            &self.partition_config,
869            flow_id,
870            downstream_execution_id,
871            policy,
872        )
873        .await
874    }
875
876    // ── Budget ──
877
878    #[tracing::instrument(name = "pg.report_usage", skip_all)]
879    async fn report_usage(
880        &self,
881        _handle: &Handle,
882        budget: &BudgetId,
883        dimensions: UsageDimensions,
884    ) -> Result<ReportUsageResult, EngineError> {
885        budget::report_usage_impl(&self.pool, &self.partition_config, budget, dimensions).await
886    }
887
888    // ── HMAC secret rotation (v0.7 migration-master Q4) ──
889    //
890    // Wave 4 replaces this stub with a single INSERT into
891    // `ff_waitpoint_hmac(kid, secret, rotated_at)`. Wave 0/1 keep
892    // the `Unavailable` shape so a running Postgres backend surfaces
893    // the unimplemented op loudly rather than silently no-op'ing.
894    #[tracing::instrument(name = "pg.rotate_waitpoint_hmac_secret_all", skip_all)]
895    async fn rotate_waitpoint_hmac_secret_all(
896        &self,
897        args: RotateWaitpointHmacSecretAllArgs,
898    ) -> Result<RotateWaitpointHmacSecretAllResult, EngineError> {
899        // Wave 4 Agent D: Q4 single-global-row write against
900        // `ff_waitpoint_hmac`. `now_ms` is captured here (not
901        // inside the impl) so tests can inject a deterministic
902        // clock via the pool layer in the future.
903        let now_ms = std::time::SystemTime::now()
904            .duration_since(std::time::UNIX_EPOCH)
905            .map(|d| d.as_millis() as i64)
906            .unwrap_or(0);
907        signal::rotate_waitpoint_hmac_secret_all_impl(&self.pool, args, now_ms).await
908    }
909
910    #[tracing::instrument(name = "pg.seed_waitpoint_hmac_secret", skip_all)]
911    async fn seed_waitpoint_hmac_secret(
912        &self,
913        args: SeedWaitpointHmacSecretArgs,
914    ) -> Result<SeedOutcome, EngineError> {
915        // Issue #280: install-only boot-time seed against the global
916        // `ff_waitpoint_hmac` table. Idempotent — cairn calls this on
917        // every boot and the backend decides whether to INSERT.
918        let now_ms = std::time::SystemTime::now()
919            .duration_since(std::time::UNIX_EPOCH)
920            .map(|d| d.as_millis() as i64)
921            .unwrap_or(0);
922        signal::seed_waitpoint_hmac_secret_impl(&self.pool, args, now_ms).await
923    }
924
925    // ── Stream reads (streaming feature) ──
926
927    #[cfg(feature = "streaming")]
928    #[tracing::instrument(name = "pg.read_stream", skip_all)]
929    async fn read_stream(
930        &self,
931        execution_id: &ExecutionId,
932        attempt_index: AttemptIndex,
933        from: StreamCursor,
934        to: StreamCursor,
935        count_limit: u64,
936    ) -> Result<StreamFrames, EngineError> {
937        stream::read_stream(&self.pool, execution_id, attempt_index, from, to, count_limit).await
938    }
939
940    #[cfg(feature = "streaming")]
941    #[tracing::instrument(name = "pg.tail_stream", skip_all)]
942    async fn tail_stream(
943        &self,
944        execution_id: &ExecutionId,
945        attempt_index: AttemptIndex,
946        after: StreamCursor,
947        block_ms: u64,
948        count_limit: u64,
949        visibility: TailVisibility,
950    ) -> Result<StreamFrames, EngineError> {
951        let notifier = self
952            .stream_notifier
953            .as_ref()
954            .ok_or(EngineError::Unavailable {
955                op: "pg.tail_stream (notifier not initialised)",
956            })?;
957        stream::tail_stream(
958            &self.pool,
959            notifier,
960            execution_id,
961            attempt_index,
962            after,
963            block_ms,
964            count_limit,
965            visibility,
966        )
967        .await
968    }
969
970    #[cfg(feature = "streaming")]
971    #[tracing::instrument(name = "pg.read_summary", skip_all)]
972    async fn read_summary(
973        &self,
974        execution_id: &ExecutionId,
975        attempt_index: AttemptIndex,
976    ) -> Result<Option<SummaryDocument>, EngineError> {
977        stream::read_summary(&self.pool, execution_id, attempt_index).await
978    }
979
980    // ── RFC-019 Stage A — `subscribe_completion` ──────────────────
981    //
982    // Postgres real impl. Wraps the existing `ff_completion_event`
983    // outbox + `LISTEN ff_completion` machinery
984    // (see `completion::subscribe`) and adapts each completion
985    // payload into a `stream_subscribe::StreamEvent`.
986    //
987    // Cursor encoding: `POSTGRES_CURSOR_PREFIX (0x02)` + `event_id`
988    // (i64 BE). Stage A resume-from-cursor is not plumbed through the
989    // adapter (the existing subscriber tails from `max(event_id)`);
990    // Stage B threads the cursor into the replay path. The surface is
991    // correct today for consumers that subscribe from tail and
992    // persist cursors for future resume.
993    #[tracing::instrument(name = "pg.subscribe_completion", skip_all)]
994    async fn subscribe_completion(
995        &self,
996        _cursor: ff_core::stream_subscribe::StreamCursor,
997    ) -> Result<ff_core::stream_subscribe::StreamSubscription, EngineError> {
998        use ff_core::stream_subscribe::{
999            encode_postgres_event_cursor, StreamEvent, StreamFamily,
1000        };
1001        use futures_core::Stream;
1002        use std::pin::Pin;
1003        use std::task::{Context, Poll};
1004
1005        // Delegate to the existing CompletionBackend implementation so
1006        // the LISTEN/replay machinery is shared. Stage A ignores the
1007        // caller's cursor (tails from tail) and surfaces that via the
1008        // RFC-019 method doc; Stage B lands resume-from-cursor.
1009        let inner = ff_core::completion_backend::CompletionBackend::subscribe_completions(self)
1010            .await?;
1011
1012        // Adapter: `CompletionStream` yields `CompletionPayload` (not
1013        // Result); RFC-019 requires `Result<StreamEvent, EngineError>`.
1014        // Wrap into an infallible stream mapping each payload.
1015        struct Adapter {
1016            inner: ff_core::completion_backend::CompletionStream,
1017        }
1018
1019        impl Stream for Adapter {
1020            type Item = Result<StreamEvent, EngineError>;
1021            fn poll_next(
1022                mut self: Pin<&mut Self>,
1023                cx: &mut Context<'_>,
1024            ) -> Poll<Option<Self::Item>> {
1025                match Pin::new(&mut self.inner).poll_next(cx) {
1026                    Poll::Pending => Poll::Pending,
1027                    Poll::Ready(None) => Poll::Ready(None),
1028                    Poll::Ready(Some(payload)) => {
1029                        // Stage A: the cursor is a placeholder
1030                        // (0-event_id) because the current
1031                        // `CompletionPayload` shape does not surface
1032                        // `event_id` — Stage B widens
1033                        // `CompletionPayload` + plumbs
1034                        // replay-from-cursor through the adapter.
1035                        // The family prefix stays stable so Stage B
1036                        // persistence is forward-compatible for
1037                        // consumers that tail from the empty cursor.
1038                        let cursor = encode_postgres_event_cursor(0);
1039                        let event = StreamEvent::new(
1040                            StreamFamily::Completion,
1041                            cursor,
1042                            payload.produced_at_ms,
1043                            bytes::Bytes::from(payload.outcome.clone().into_bytes()),
1044                        )
1045                        .with_execution_id(payload.execution_id.clone());
1046                        Poll::Ready(Some(Ok(event)))
1047                    }
1048                }
1049            }
1050        }
1051
1052        Ok(Box::pin(Adapter { inner }))
1053    }
1054}
1055
1056/// Minimum recommended `max_locks_per_transaction`. Partition-heavy
1057/// schemas (256 hash partitions per logical table) can exceed the
1058/// Postgres default of `64` per tx under modest concurrent bench
1059/// load — the Wave 7c bench hit `out of shared memory` at 16 workers
1060/// × 10k tasks with the default and unblocked at `512`. We warn at
1061/// boot rather than hard-fail because operators may legitimately
1062/// run with a tuned value that still exceeds 64 but sits below our
1063/// threshold.
1064const MIN_MAX_LOCKS_PER_TRANSACTION: i64 = 256;
1065
1066/// Probe `max_locks_per_transaction` at connect time + log a warning
1067/// when the current value is below the production-safe threshold.
1068/// Never fails the connect — probe errors are logged at debug and
1069/// swallowed (pg_show may be restricted on exotic deploys).
1070async fn warn_if_max_locks_low(pool: &PgPool) {
1071    let row: Result<(String,), sqlx::Error> =
1072        sqlx::query_as("SHOW max_locks_per_transaction")
1073            .fetch_one(pool)
1074            .await;
1075    match row {
1076        Ok((raw,)) => emit_max_locks_decision(&raw),
1077        Err(e) => {
1078            tracing::debug!("failed to probe max_locks_per_transaction: {e}");
1079        }
1080    }
1081}
1082
1083/// Pure decision surface for the max-locks probe — extracted for
1084/// unit-testability (the live probe is gated by a running Postgres).
1085/// Returns the integer value when a warning SHOULD fire, `None`
1086/// otherwise (either the raw is valid + at/above threshold, or the
1087/// raw is unparseable — the latter is debug-only).
1088fn max_locks_warn_value(raw: &str) -> Option<i64> {
1089    match raw.parse::<i64>() {
1090        Ok(v) if v < MIN_MAX_LOCKS_PER_TRANSACTION => Some(v),
1091        Ok(_) => None,
1092        Err(e) => {
1093            tracing::debug!(raw, "failed to parse max_locks_per_transaction: {e}");
1094            None
1095        }
1096    }
1097}
1098
1099fn emit_max_locks_decision(raw: &str) {
1100    if let Some(v) = max_locks_warn_value(raw) {
1101        tracing::warn!(
1102            current = v,
1103            recommended = MIN_MAX_LOCKS_PER_TRANSACTION,
1104            "postgres max_locks_per_transaction={v} is below the recommended \
1105             minimum ({MIN_MAX_LOCKS_PER_TRANSACTION}); partition-heavy workloads \
1106             may hit 'out of shared memory' under concurrent load. \
1107             See docs/operator-guide-postgres.md."
1108        );
1109    }
1110}
1111
1112#[cfg(test)]
1113mod max_locks_tests {
1114    use super::{max_locks_warn_value, MIN_MAX_LOCKS_PER_TRANSACTION};
1115
1116    #[test]
1117    fn warns_when_below_threshold() {
1118        assert_eq!(max_locks_warn_value("64"), Some(64));
1119        assert_eq!(
1120            max_locks_warn_value(&(MIN_MAX_LOCKS_PER_TRANSACTION - 1).to_string()),
1121            Some(MIN_MAX_LOCKS_PER_TRANSACTION - 1)
1122        );
1123    }
1124
1125    #[test]
1126    fn silent_at_or_above_threshold() {
1127        assert_eq!(
1128            max_locks_warn_value(&MIN_MAX_LOCKS_PER_TRANSACTION.to_string()),
1129            None
1130        );
1131        assert_eq!(max_locks_warn_value("1024"), None);
1132    }
1133
1134    #[test]
1135    fn silent_for_unparseable_raw() {
1136        assert_eq!(max_locks_warn_value("not-a-number"), None);
1137    }
1138}