Skip to main content

ff_backend_postgres/
lib.rs

1//! `EngineBackend` implementation backed by Postgres.
2//!
3//! **RFC-v0.7 Wave 0 — scaffold.** This crate lands the
4//! [`PostgresBackend`] struct + the `impl EngineBackend` block with
5//! every method stubbed to return `EngineError::Unavailable`.
6//! Subsequent waves fill in bodies:
7//!
8//! * Wave 1: cross-cutting error/helpers.
9//! * Wave 2: describe / list / resolve (read surface).
10//! * Wave 3: schema migrations (replaces the Wave 0 placeholder).
11//! * Wave 4: LISTEN/NOTIFY wiring + stream reads/tails.
12//! * Wave 5-7: hot-path write methods.
13//! * Wave 8: ff-server wire-up + dual-backend running.
14//!
15//! The trait stays object-safe; consumers can hold
16//! `Arc<dyn EngineBackend>`. No ferriskey dep — this crate's
17//! transport is `sqlx`.
18
19#![allow(clippy::result_large_err)]
20
21use std::sync::Arc;
22use std::time::Duration;
23
24use async_trait::async_trait;
25
26use ff_core::backend::{
27    AppendFrameOutcome, BackendConfig, CancelFlowPolicy, CancelFlowWait, CapabilitySet,
28    ClaimPolicy, FailOutcome, FailureClass, FailureReason, Frame, Handle, LeaseRenewal,
29    PendingWaitpoint, ReclaimToken, ResumeSignal, SummaryDocument, TailVisibility,
30    UsageDimensions,
31};
32#[cfg(feature = "core")]
33use ff_core::contracts::{
34    AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
35    ApplyDependencyToChildResult, ClaimResumedExecutionArgs, ClaimResumedExecutionResult,
36    CreateExecutionArgs, CreateExecutionResult, CreateFlowArgs, CreateFlowResult,
37    DeliverSignalArgs, DeliverSignalResult, EdgeDependencyPolicy, EdgeDirection, EdgeSnapshot,
38    ListExecutionsPage, ListFlowsPage, ListLanesPage, ListSuspendedPage,
39    SetEdgeGroupPolicyResult, StageDependencyEdgeArgs, StageDependencyEdgeResult,
40};
41#[cfg(feature = "core")]
42use ff_core::state::PublicState;
43use ff_core::contracts::{
44    CancelFlowResult, ExecutionSnapshot, FlowSnapshot, ReportUsageResult,
45    RotateWaitpointHmacSecretAllArgs, RotateWaitpointHmacSecretAllResult, SeedOutcome,
46    SeedWaitpointHmacSecretArgs, SuspendArgs, SuspendOutcome,
47};
48#[cfg(feature = "streaming")]
49use ff_core::contracts::{StreamCursor, StreamFrames};
50use ff_core::engine_backend::EngineBackend;
51use ff_core::engine_error::EngineError;
52#[cfg(feature = "core")]
53use ff_core::partition::PartitionKey;
54use ff_core::partition::PartitionConfig;
55#[cfg(feature = "streaming")]
56use ff_core::types::AttemptIndex;
57#[cfg(feature = "core")]
58use ff_core::types::EdgeId;
59use ff_core::types::{BudgetId, ExecutionId, FlowId, LaneId, LeaseFence, TimestampMs};
60// Wave 5a — re-export `PgPool` so crates that depend on
61// `ff-backend-postgres` (and not `sqlx` directly) can name the pool
62// type in their own APIs (e.g. `ff-engine::dispatch_via_postgres`).
63pub use sqlx::PgPool;
64
65#[cfg(feature = "core")]
66mod admin;
67pub mod attempt;
68pub mod budget;
69pub mod completion;
70#[cfg(feature = "core")]
71pub mod dispatch;
72pub mod error;
73pub mod exec_core;
74pub mod flow;
75#[cfg(feature = "core")]
76pub mod flow_staging;
77pub mod handle_codec;
78mod lease_event;
79mod lease_event_subscribe;
80pub mod listener;
81pub mod migrate;
82pub mod pool;
83#[cfg(feature = "core")]
84pub mod reconcilers;
85#[cfg(feature = "core")]
86pub mod scanner_supervisor;
87#[cfg(feature = "core")]
88pub mod scheduler;
89pub mod signal;
90mod signal_delivery_subscribe;
91mod signal_event;
92#[cfg(feature = "streaming")]
93pub mod stream;
94pub mod suspend;
95pub mod suspend_ops;
96pub mod version;
97
98pub use completion::{PostgresCompletionStream, COMPLETION_CHANNEL};
99pub use error::{map_sqlx_error, PostgresTransportError};
100pub use listener::StreamNotifier;
101pub use migrate::{apply_migrations, MigrationError};
102#[cfg(feature = "core")]
103pub use scanner_supervisor::{PostgresScannerConfig, PostgresScannerHandle};
104pub use version::check_schema_version;
105
106// Re-export the new `PostgresConnection` shape so consumers can name
107// it from this crate directly without dipping into `ff_core::backend`.
108// `BackendConfig` is already imported above and is part of the
109// `connect()` signature, so it re-exports transparently via
110// rustdoc — no explicit `pub use` needed.
111pub use ff_core::backend::PostgresConnection;
112
113/// Postgres-backed `EngineBackend`.
114///
115/// Wave 0 shape: holds a `sqlx::PgPool`, the deployment's
116/// [`PartitionConfig`] (Q5 — partition column survives on Postgres
117/// with hash partitioning across the same 256 slots Valkey uses),
118/// and an optional `ff_observability::Metrics` handle mirroring
119/// [`ff_backend_valkey::ValkeyBackend`]. Future waves add the
120/// [`StreamNotifier`] handle once Wave 4 wires up LISTEN/NOTIFY.
121/// RFC-018 Stage A: build a [`ff_core::capability::Supports`]
122/// snapshot for the Postgres backend at v0.9. `true` fields correspond
123/// to trait methods `PostgresBackend` overrides with a real body
124/// (ingress, scheduler, seed + rotate HMAC, flow cancel bulk path,
125/// stream reads, RFC-019 subscriptions, cross-cutting). `false` fields
126/// correspond to trait methods that still return
127/// `EngineError::Unavailable` on Postgres today — Wave 9 follow-up
128/// scope. See `docs/POSTGRES_PARITY_MATRIX.md` for the authoritative
129/// per-row status.
130///
131/// `prepare` is `true` on Postgres even though `prepare()` returns
132/// `PrepareOutcome::NoOp` (schema migrations are applied out-of-band).
133/// `Supports.prepare` means "can the consumer call `backend.prepare()`
134/// without getting `EngineError::Unavailable`?" — for Postgres the
135/// answer is yes; NoOp is a successful well-defined outcome. Gating
136/// the call off in consumer UIs based on a `false` bool would hide
137/// a callable + correct method.
138///
139/// `Supports` is `#[non_exhaustive]` so struct-literal construction
140/// from this crate is forbidden; we start from
141/// [`ff_core::capability::Supports::none`] and mutate named fields.
142fn postgres_supports_base() -> ff_core::capability::Supports {
143    let mut s = ff_core::capability::Supports::none();
144
145    // ── Flow bulk cancel (impl) ──
146    s.cancel_flow_wait_timeout = true;
147    s.cancel_flow_wait_indefinite = true;
148
149    // ── Admin seed + rotate HMAC (impl) ──
150    s.rotate_waitpoint_hmac_secret_all = true;
151    s.seed_waitpoint_hmac_secret = true;
152
153    // ── Scheduler ──
154    s.claim_for_worker = true;
155
156    // ── RFC-019 subscriptions ──
157    s.subscribe_lease_history = true;
158    s.subscribe_completion = true;
159    s.subscribe_signal_delivery = true;
160    s.subscribe_instance_tags = false;
161
162    // ── Streaming (RFC-015) ──
163    s.stream_durable_summary = true;
164    s.stream_best_effort_live = true;
165
166    // ── Boot (Postgres returns NoOp but call is callable + correct) ──
167    s.prepare = true;
168
169    // Everything else — operator control, execution reads, budget
170    // admin, quota admin, list_pending_waitpoints, cancel_flow_header,
171    // ack_cancel_member — defaults to `false`. Wave 9 follow-up scope.
172
173    s
174}
175
176pub struct PostgresBackend {
177    #[allow(dead_code)] // filled in across waves 2-7
178    pool: PgPool,
179    #[allow(dead_code)]
180    partition_config: PartitionConfig,
181    #[allow(dead_code)]
182    metrics: Option<Arc<ff_observability::Metrics>>,
183    /// Wave 4: shared LISTEN notifier. Present on `connect()`-built
184    /// backends; `None` on bare `from_pool` constructions that skip
185    /// LISTEN wiring (tests that only exercise the write path).
186    #[allow(dead_code)]
187    stream_notifier: Option<Arc<StreamNotifier>>,
188    /// RFC-017 Wave 8 Stage E3: scanner supervisor handle. Spawned
189    /// during [`Self::connect_with_metrics`] when the caller opts in
190    /// via [`Self::spawn_scanners_during_connect`]; drained on
191    /// [`EngineBackend::shutdown_prepare`]. `None` on `from_pool` /
192    /// test builds that don't want background reconcilers.
193    #[cfg(feature = "core")]
194    scanner_handle: Option<Arc<scanner_supervisor::PostgresScannerHandle>>,
195}
196
197impl PostgresBackend {
198    /// Dial Postgres with [`BackendConfig`] and return the backend as
199    /// `Arc<dyn EngineBackend>`. Modeled on
200    /// [`ff_backend_valkey::ValkeyBackend::connect`] so ff-server /
201    /// SDK call sites can swap backends without changing the
202    /// constructor shape.
203    ///
204    /// **Wave 0:** builds the pool and constructs the backend. Does
205    /// NOT run migrations (Q12 — operator out-of-band). Does NOT run
206    /// the schema-version check (Wave 3 adds the version const and
207    /// wires [`check_schema_version`] in). Does NOT start the LISTEN
208    /// task (Wave 4).
209    ///
210    /// Returns `EngineError::Unavailable` when the config's
211    /// connection arm is not Postgres.
212    pub async fn connect(config: BackendConfig) -> Result<Arc<dyn EngineBackend>, EngineError> {
213        let pool = pool::build_pool(&config).await?;
214        warn_if_max_locks_low(&pool).await;
215        let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
216        let backend = Self {
217            pool,
218            partition_config: PartitionConfig::default(),
219            metrics: None,
220            stream_notifier,
221            #[cfg(feature = "core")]
222            scanner_handle: None,
223        };
224        Ok(Arc::new(backend))
225    }
226
227    /// Test / advanced constructor: build a `PostgresBackend` from an
228    /// already-constructed `PgPool` + explicit partition config. No
229    /// network I/O. Useful for integration tests against a shared
230    /// pool and for a future migration CLI that wants to reuse a pool
231    /// across migrate-run + smoke-check.
232    pub fn from_pool(pool: PgPool, partition_config: PartitionConfig) -> Arc<Self> {
233        let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
234        Arc::new(Self {
235            pool,
236            partition_config,
237            metrics: None,
238            stream_notifier,
239            #[cfg(feature = "core")]
240            scanner_handle: None,
241        })
242    }
243
244    /// RFC-017 Wave 8 Stage E1: dial Postgres with an explicit
245    /// [`PartitionConfig`] + shared [`ff_observability::Metrics`].
246    /// Mirrors [`ff_backend_valkey::ValkeyBackend::connect_with_metrics`]
247    /// so `ff-server::Server::start_with_metrics` can wire the Postgres
248    /// branch without reaching into the pool builder directly.
249    ///
250    /// Returns a concrete `Arc<Self>` rather than `Arc<dyn EngineBackend>`
251    /// so the caller can cast to the trait object after any additional
252    /// field installs (parallel to the Valkey path which calls
253    /// `with_scheduler` / `with_stream_semaphore_permits` before the
254    /// cast). Stage E1 does NOT run `apply_migrations` — schema
255    /// provisioning is an operator concern (matches the Wave 0 contract
256    /// on [`Self::connect`]).
257    pub async fn connect_with_metrics(
258        config: BackendConfig,
259        partition_config: PartitionConfig,
260        metrics: Arc<ff_observability::Metrics>,
261    ) -> Result<Arc<Self>, EngineError> {
262        let pool = pool::build_pool(&config).await?;
263        warn_if_max_locks_low(&pool).await;
264        let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
265        Ok(Arc::new(Self {
266            pool,
267            partition_config,
268            metrics: Some(metrics),
269            stream_notifier,
270            #[cfg(feature = "core")]
271            scanner_handle: None,
272        }))
273    }
274
275    /// RFC-017 Wave 8 Stage E3: spawn the six Postgres reconcilers as
276    /// background tick loops. Returns `true` if the scanner handle
277    /// was installed; `false` if the `Arc<Self>` has outstanding
278    /// clones (mirrors the Valkey `with_*` pattern). Callers must
279    /// invoke this before publishing the `Arc<dyn EngineBackend>` so
280    /// the underlying `Arc::get_mut` succeeds.
281    #[cfg(feature = "core")]
282    pub fn with_scanners(
283        self: &mut Arc<Self>,
284        cfg: scanner_supervisor::PostgresScannerConfig,
285    ) -> bool {
286        let Some(inner) = Arc::get_mut(self) else {
287            return false;
288        };
289        let handle = scanner_supervisor::spawn_scanners(inner.pool.clone(), cfg);
290        inner.scanner_handle = Some(Arc::new(handle));
291        true
292    }
293
294    /// Accessor for the underlying `PgPool`. Stage E1 uses this so
295    /// `ff-server::Server::start_with_metrics` can run
296    /// [`apply_migrations`] on the same pool before handing the backend
297    /// out as `Arc<dyn EngineBackend>`.
298    pub fn pool(&self) -> &PgPool {
299        &self.pool
300    }
301
302    /// Create one execution row (+ seed the lane registry if new).
303    ///
304    /// **RFC-017 Stage A:** this inherent method is retained as a
305    /// thin wrapper around the module-level impl so existing in-tree
306    /// callers (ff-server request handlers, integration tests) keep
307    /// compiling. The trait-lifted entry point is
308    /// [`EngineBackend::create_execution`] below, which calls the
309    /// same impl. Return shape differs — inherent returns
310    /// `ExecutionId`, trait returns
311    /// [`CreateExecutionResult`] per RFC-017 §5 — so we cannot simply
312    /// replace the inherent method. A follow-up PR may deprecate
313    /// this inherent alongside the broader ingress shape alignment.
314    #[cfg(feature = "core")]
315    #[tracing::instrument(name = "pg.create_execution", skip_all)]
316    pub async fn create_execution(
317        &self,
318        args: CreateExecutionArgs,
319    ) -> Result<ExecutionId, EngineError> {
320        exec_core::create_execution_impl(&self.pool, &self.partition_config, args).await
321    }
322
323    // ── RFC-017 Stage A: inherent ingress methods retained for
324    // back-compat with in-tree test harnesses + ff-server direct
325    // calls. The trait-lifted peers (`EngineBackend::create_flow`
326    // etc.) delegate to the SAME module-level impls under the hood.
327    // Follow-up PR may sunset these inherents once all in-tree
328    // consumers route through `Arc<dyn EngineBackend>`.
329
330    #[cfg(feature = "core")]
331    #[tracing::instrument(name = "pg.create_flow.inherent", skip_all)]
332    pub async fn create_flow(
333        &self,
334        args: &CreateFlowArgs,
335    ) -> Result<CreateFlowResult, EngineError> {
336        flow_staging::create_flow(&self.pool, &self.partition_config, args).await
337    }
338
339    #[cfg(feature = "core")]
340    #[tracing::instrument(name = "pg.add_execution_to_flow.inherent", skip_all)]
341    pub async fn add_execution_to_flow(
342        &self,
343        args: &AddExecutionToFlowArgs,
344    ) -> Result<AddExecutionToFlowResult, EngineError> {
345        flow_staging::add_execution_to_flow(&self.pool, &self.partition_config, args).await
346    }
347
348    #[cfg(feature = "core")]
349    #[tracing::instrument(name = "pg.stage_dependency_edge.inherent", skip_all)]
350    pub async fn stage_dependency_edge(
351        &self,
352        args: &StageDependencyEdgeArgs,
353    ) -> Result<StageDependencyEdgeResult, EngineError> {
354        flow_staging::stage_dependency_edge(&self.pool, &self.partition_config, args).await
355    }
356
357    #[cfg(feature = "core")]
358    #[tracing::instrument(name = "pg.apply_dependency_to_child.inherent", skip_all)]
359    pub async fn apply_dependency_to_child(
360        &self,
361        args: &ApplyDependencyToChildArgs,
362    ) -> Result<ApplyDependencyToChildResult, EngineError> {
363        flow_staging::apply_dependency_to_child(&self.pool, &self.partition_config, args).await
364    }
365}
366
367/// Short helper: every stub method returns this. Kept as a function
368/// (rather than a macro) so rust-analyzer / IDE jumps show a single
369/// definition site for the Wave 0 stub pattern.
370#[inline]
371fn unavailable<T>(op: &'static str) -> Result<T, EngineError> {
372    Err(EngineError::Unavailable { op })
373}
374
375#[async_trait]
376impl EngineBackend for PostgresBackend {
377    // ── Claim + lifecycle ──
378
379    #[tracing::instrument(name = "pg.claim", skip_all)]
380    async fn claim(
381        &self,
382        lane: &LaneId,
383        capabilities: &CapabilitySet,
384        policy: ClaimPolicy,
385    ) -> Result<Option<Handle>, EngineError> {
386        attempt::claim(&self.pool, lane, capabilities, &policy).await
387    }
388
389    #[tracing::instrument(name = "pg.renew", skip_all)]
390    async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError> {
391        attempt::renew(&self.pool, handle).await
392    }
393
394    #[tracing::instrument(name = "pg.progress", skip_all)]
395    async fn progress(
396        &self,
397        handle: &Handle,
398        percent: Option<u8>,
399        message: Option<String>,
400    ) -> Result<(), EngineError> {
401        attempt::progress(&self.pool, handle, percent, message).await
402    }
403
404    #[tracing::instrument(name = "pg.append_frame", skip_all)]
405    async fn append_frame(
406        &self,
407        handle: &Handle,
408        frame: Frame,
409    ) -> Result<AppendFrameOutcome, EngineError> {
410        #[cfg(feature = "streaming")]
411        {
412            stream::append_frame(&self.pool, &self.partition_config, handle, frame).await
413        }
414        #[cfg(not(feature = "streaming"))]
415        {
416            let _ = (handle, frame);
417            unavailable("pg.append_frame")
418        }
419    }
420
421    #[tracing::instrument(name = "pg.complete", skip_all)]
422    async fn complete(
423        &self,
424        handle: &Handle,
425        payload: Option<Vec<u8>>,
426    ) -> Result<(), EngineError> {
427        attempt::complete(&self.pool, handle, payload).await
428    }
429
430    #[tracing::instrument(name = "pg.fail", skip_all)]
431    async fn fail(
432        &self,
433        handle: &Handle,
434        reason: FailureReason,
435        classification: FailureClass,
436    ) -> Result<FailOutcome, EngineError> {
437        attempt::fail(&self.pool, handle, reason, classification).await
438    }
439
440    #[tracing::instrument(name = "pg.cancel", skip_all)]
441    async fn cancel(&self, handle: &Handle, reason: &str) -> Result<(), EngineError> {
442        let payload = handle_codec::decode_handle(handle)?;
443        exec_core::cancel_impl(
444            &self.pool,
445            &self.partition_config,
446            &payload.execution_id,
447            reason,
448        )
449        .await
450    }
451
452    #[tracing::instrument(name = "pg.suspend", skip_all)]
453    async fn suspend(
454        &self,
455        handle: &Handle,
456        args: SuspendArgs,
457    ) -> Result<SuspendOutcome, EngineError> {
458        suspend_ops::suspend_impl(&self.pool, &self.partition_config, handle, args).await
459    }
460
461    #[tracing::instrument(name = "pg.suspend_by_triple", skip_all)]
462    async fn suspend_by_triple(
463        &self,
464        exec_id: ExecutionId,
465        triple: LeaseFence,
466        args: SuspendArgs,
467    ) -> Result<SuspendOutcome, EngineError> {
468        suspend_ops::suspend_by_triple_impl(
469            &self.pool,
470            &self.partition_config,
471            exec_id,
472            triple,
473            args,
474        )
475        .await
476    }
477
478    #[tracing::instrument(name = "pg.create_waitpoint", skip_all)]
479    async fn create_waitpoint(
480        &self,
481        _handle: &Handle,
482        _waitpoint_key: &str,
483        _expires_in: Duration,
484    ) -> Result<PendingWaitpoint, EngineError> {
485        unavailable("pg.create_waitpoint")
486    }
487
488    #[tracing::instrument(name = "pg.observe_signals", skip_all)]
489    async fn observe_signals(
490        &self,
491        handle: &Handle,
492    ) -> Result<Vec<ResumeSignal>, EngineError> {
493        suspend_ops::observe_signals_impl(&self.pool, handle).await
494    }
495
496    #[tracing::instrument(name = "pg.claim_from_reclaim", skip_all)]
497    async fn claim_from_reclaim(
498        &self,
499        token: ReclaimToken,
500    ) -> Result<Option<Handle>, EngineError> {
501        attempt::claim_from_reclaim(&self.pool, token).await
502    }
503
504    #[tracing::instrument(name = "pg.delay", skip_all)]
505    async fn delay(
506        &self,
507        handle: &Handle,
508        delay_until: TimestampMs,
509    ) -> Result<(), EngineError> {
510        attempt::delay(&self.pool, handle, delay_until).await
511    }
512
513    #[tracing::instrument(name = "pg.wait_children", skip_all)]
514    async fn wait_children(&self, handle: &Handle) -> Result<(), EngineError> {
515        attempt::wait_children(&self.pool, handle).await
516    }
517
518    // ── Read / admin ──
519
520    #[tracing::instrument(name = "pg.describe_execution", skip_all)]
521    async fn describe_execution(
522        &self,
523        id: &ExecutionId,
524    ) -> Result<Option<ExecutionSnapshot>, EngineError> {
525        exec_core::describe_execution_impl(&self.pool, &self.partition_config, id).await
526    }
527
528    #[tracing::instrument(name = "pg.describe_flow", skip_all)]
529    async fn describe_flow(
530        &self,
531        id: &FlowId,
532    ) -> Result<Option<FlowSnapshot>, EngineError> {
533        flow::describe_flow(&self.pool, &self.partition_config, id).await
534    }
535
536    #[cfg(feature = "core")]
537    #[tracing::instrument(name = "pg.list_edges", skip_all)]
538    async fn list_edges(
539        &self,
540        flow_id: &FlowId,
541        direction: EdgeDirection,
542    ) -> Result<Vec<EdgeSnapshot>, EngineError> {
543        flow::list_edges(&self.pool, &self.partition_config, flow_id, direction).await
544    }
545
546    #[cfg(feature = "core")]
547    #[tracing::instrument(name = "pg.describe_edge", skip_all)]
548    async fn describe_edge(
549        &self,
550        flow_id: &FlowId,
551        edge_id: &EdgeId,
552    ) -> Result<Option<EdgeSnapshot>, EngineError> {
553        flow::describe_edge(&self.pool, &self.partition_config, flow_id, edge_id).await
554    }
555
556    #[cfg(feature = "core")]
557    #[tracing::instrument(name = "pg.resolve_execution_flow_id", skip_all)]
558    async fn resolve_execution_flow_id(
559        &self,
560        eid: &ExecutionId,
561    ) -> Result<Option<FlowId>, EngineError> {
562        exec_core::resolve_execution_flow_id_impl(&self.pool, &self.partition_config, eid).await
563    }
564
565    #[cfg(feature = "core")]
566    #[tracing::instrument(name = "pg.list_flows", skip_all)]
567    async fn list_flows(
568        &self,
569        partition: PartitionKey,
570        cursor: Option<FlowId>,
571        limit: usize,
572    ) -> Result<ListFlowsPage, EngineError> {
573        flow::list_flows(&self.pool, partition, cursor, limit).await
574    }
575
576    #[cfg(feature = "core")]
577    #[tracing::instrument(name = "pg.list_lanes", skip_all)]
578    async fn list_lanes(
579        &self,
580        cursor: Option<LaneId>,
581        limit: usize,
582    ) -> Result<ListLanesPage, EngineError> {
583        admin::list_lanes_impl(&self.pool, cursor, limit).await
584    }
585
586    #[cfg(feature = "core")]
587    #[tracing::instrument(name = "pg.list_suspended", skip_all)]
588    async fn list_suspended(
589        &self,
590        partition: PartitionKey,
591        cursor: Option<ExecutionId>,
592        limit: usize,
593    ) -> Result<ListSuspendedPage, EngineError> {
594        admin::list_suspended_impl(&self.pool, partition, cursor, limit).await
595    }
596
597    #[cfg(feature = "core")]
598    #[tracing::instrument(name = "pg.list_executions", skip_all)]
599    async fn list_executions(
600        &self,
601        partition: PartitionKey,
602        cursor: Option<ExecutionId>,
603        limit: usize,
604    ) -> Result<ListExecutionsPage, EngineError> {
605        exec_core::list_executions_impl(
606            &self.pool,
607            &self.partition_config,
608            partition,
609            cursor,
610            limit,
611        )
612        .await
613    }
614
615    // ── Trigger ops (issue #150) ──
616
617    #[cfg(feature = "core")]
618    #[tracing::instrument(name = "pg.deliver_signal", skip_all)]
619    async fn deliver_signal(
620        &self,
621        args: DeliverSignalArgs,
622    ) -> Result<DeliverSignalResult, EngineError> {
623        suspend_ops::deliver_signal_impl(&self.pool, &self.partition_config, args).await
624    }
625
626    #[cfg(feature = "core")]
627    #[tracing::instrument(name = "pg.claim_resumed_execution", skip_all)]
628    async fn claim_resumed_execution(
629        &self,
630        args: ClaimResumedExecutionArgs,
631    ) -> Result<ClaimResumedExecutionResult, EngineError> {
632        suspend_ops::claim_resumed_execution_impl(&self.pool, &self.partition_config, args).await
633    }
634
635    // ── RFC-017 Stage A — ingress (promoted from inherent) ────
636
637    /// RFC-017 Wave 8 Stage E1: lift the inherent
638    /// [`PostgresBackend::create_execution`] onto the trait so
639    /// ff-server's migrated HTTP handler can dispatch to Postgres.
640    /// Post-insert the row is idempotent; the Postgres impl does not
641    /// distinguish `Created` from `Duplicate` at the helper level
642    /// (both paths commit and return the execution id), so we always
643    /// surface `Created { public_state: Waiting }` here. A follow-up
644    /// may lift the distinction if a consumer relies on it.
645    #[cfg(feature = "core")]
646    #[tracing::instrument(name = "pg.create_execution.trait", skip_all)]
647    async fn create_execution(
648        &self,
649        args: CreateExecutionArgs,
650    ) -> Result<CreateExecutionResult, EngineError> {
651        let eid = args.execution_id.clone();
652        exec_core::create_execution_impl(&self.pool, &self.partition_config, args).await?;
653        Ok(CreateExecutionResult::Created {
654            execution_id: eid,
655            public_state: PublicState::Waiting,
656        })
657    }
658
659    #[cfg(feature = "core")]
660    #[tracing::instrument(name = "pg.create_flow", skip_all)]
661    async fn create_flow(
662        &self,
663        args: CreateFlowArgs,
664    ) -> Result<CreateFlowResult, EngineError> {
665        flow_staging::create_flow(&self.pool, &self.partition_config, &args).await
666    }
667
668    #[cfg(feature = "core")]
669    #[tracing::instrument(name = "pg.add_execution_to_flow", skip_all)]
670    async fn add_execution_to_flow(
671        &self,
672        args: AddExecutionToFlowArgs,
673    ) -> Result<AddExecutionToFlowResult, EngineError> {
674        flow_staging::add_execution_to_flow(&self.pool, &self.partition_config, &args).await
675    }
676
677    #[cfg(feature = "core")]
678    #[tracing::instrument(name = "pg.stage_dependency_edge", skip_all)]
679    async fn stage_dependency_edge(
680        &self,
681        args: StageDependencyEdgeArgs,
682    ) -> Result<StageDependencyEdgeResult, EngineError> {
683        flow_staging::stage_dependency_edge(&self.pool, &self.partition_config, &args).await
684    }
685
686    #[cfg(feature = "core")]
687    #[tracing::instrument(name = "pg.apply_dependency_to_child", skip_all)]
688    async fn apply_dependency_to_child(
689        &self,
690        args: ApplyDependencyToChildArgs,
691    ) -> Result<ApplyDependencyToChildResult, EngineError> {
692        flow_staging::apply_dependency_to_child(&self.pool, &self.partition_config, &args).await
693    }
694
695    fn backend_label(&self) -> &'static str {
696        "postgres"
697    }
698
699    /// RFC-018 Stage A: populate the `Capabilities` snapshot from the
700    /// static [`postgres_supports_base`] shape. The Postgres backend
701    /// landed through RFC-017 Stage E4 at v0.8.0; fields still `false`
702    /// correspond to Wave-9 follow-up work (`cancel_flow_header`,
703    /// `ack_cancel_member`, read-model, operator control, budget /
704    /// quota, `list_pending_waitpoints`). See
705    /// `docs/POSTGRES_PARITY_MATRIX.md` for the per-row breakdown.
706    fn capabilities(&self) -> ff_core::capability::Capabilities {
707        ff_core::capability::Capabilities::new(
708            ff_core::capability::BackendIdentity::new(
709                "postgres",
710                ff_core::capability::Version::new(0, 10, 0),
711                "E-shipped",
712            ),
713            postgres_supports_base(),
714        )
715    }
716
717    /// Issue #281: no-op. Schema migrations are applied out-of-band
718    /// per `rfcs/drafts/v0.7-migration-master.md §Q12` (operator runs
719    /// `sqlx migrate run` or the future `ff-migrate` CLI). Boot runs a
720    /// schema-version check at connect time
721    /// ([`crate::version::check_schema_version`]) and refuses to
722    /// start on mismatch, so by the time `prepare()` is callable
723    /// there is nothing further to do.
724    async fn prepare(
725        &self,
726    ) -> Result<ff_core::backend::PrepareOutcome, EngineError> {
727        Ok(ff_core::backend::PrepareOutcome::NoOp)
728    }
729
730    /// RFC-017 Wave 8 Stage E3 (§4 row 9, §7): forward the claim to the
731    /// Postgres-native admission pipeline. Returns `NoWork` when no
732    /// eligible execution is admissible this scan cycle. Budget
733    /// breaches surface as `NoWork` (leaving the row eligible for a
734    /// retry by another worker); validation-class rejections
735    /// (malformed partition, unknown kid) surface as typed
736    /// [`EngineError`] variants mapped to the Server's 400/503 arms.
737    #[cfg(feature = "core")]
738    #[tracing::instrument(name = "pg.claim_for_worker", skip_all)]
739    async fn claim_for_worker(
740        &self,
741        args: ff_core::contracts::ClaimForWorkerArgs,
742    ) -> Result<ff_core::contracts::ClaimForWorkerOutcome, EngineError> {
743        let sched = scheduler::PostgresScheduler::new(self.pool.clone());
744        let grant_opt = sched
745            .claim_for_worker(
746                &args.lane_id,
747                &args.worker_id,
748                &args.worker_instance_id,
749                &args.worker_capabilities,
750                args.grant_ttl_ms,
751            )
752            .await?;
753        Ok(match grant_opt {
754            Some(g) => ff_core::contracts::ClaimForWorkerOutcome::granted(g),
755            None => ff_core::contracts::ClaimForWorkerOutcome::no_work(),
756        })
757    }
758
759    async fn ping(&self) -> Result<(), EngineError> {
760        // Postgres analogue to Valkey PING — single-round-trip pool
761        // liveness. Errors propagate as transport-class EngineError via
762        // the existing sqlx→EngineError map.
763        let _ = sqlx::query_scalar::<_, i32>("SELECT 1")
764            .fetch_one(&self.pool)
765            .await
766            .map_err(error::map_sqlx_error)?;
767        Ok(())
768    }
769
770    /// RFC-017 Wave 8 Stage E3: drain the scanner supervisor's
771    /// reconciler tasks up to `grace`, then close the sqlx pool.
772    /// Matches the Valkey backend's shutdown_prepare contract —
773    /// bounded best-effort drain, never returns an error.
774    async fn shutdown_prepare(&self, grace: Duration) -> Result<(), EngineError> {
775        #[cfg(feature = "core")]
776        if let Some(handle) = self.scanner_handle.as_ref() {
777            let timed_out = handle.shutdown(grace).await;
778            if timed_out > 0 {
779                tracing::warn!(
780                    timed_out,
781                    ?grace,
782                    "postgres scanner supervisor exceeded grace on shutdown"
783                );
784            }
785        }
786        Ok(())
787    }
788
789    #[tracing::instrument(name = "pg.cancel_flow", skip_all)]
790    async fn cancel_flow(
791        &self,
792        id: &FlowId,
793        policy: CancelFlowPolicy,
794        wait: CancelFlowWait,
795    ) -> Result<CancelFlowResult, EngineError> {
796        let result = flow::cancel_flow(&self.pool, &self.partition_config, id, policy).await?;
797        if let Some(deadline) = ff_core::engine_backend::cancel_flow_wait_deadline(wait) {
798            ff_core::engine_backend::wait_for_flow_cancellation(self, id, deadline).await?;
799        }
800        Ok(result)
801    }
802
803    #[cfg(feature = "core")]
804    #[tracing::instrument(name = "pg.set_edge_group_policy", skip_all)]
805    async fn set_edge_group_policy(
806        &self,
807        flow_id: &FlowId,
808        downstream_execution_id: &ExecutionId,
809        policy: EdgeDependencyPolicy,
810    ) -> Result<SetEdgeGroupPolicyResult, EngineError> {
811        flow::set_edge_group_policy(
812            &self.pool,
813            &self.partition_config,
814            flow_id,
815            downstream_execution_id,
816            policy,
817        )
818        .await
819    }
820
821    // ── Budget ──
822
823    #[tracing::instrument(name = "pg.report_usage", skip_all)]
824    async fn report_usage(
825        &self,
826        _handle: &Handle,
827        budget: &BudgetId,
828        dimensions: UsageDimensions,
829    ) -> Result<ReportUsageResult, EngineError> {
830        budget::report_usage_impl(&self.pool, &self.partition_config, budget, dimensions).await
831    }
832
833    // ── HMAC secret rotation (v0.7 migration-master Q4) ──
834    //
835    // Wave 4 replaces this stub with a single INSERT into
836    // `ff_waitpoint_hmac(kid, secret, rotated_at)`. Wave 0/1 keep
837    // the `Unavailable` shape so a running Postgres backend surfaces
838    // the unimplemented op loudly rather than silently no-op'ing.
839    #[tracing::instrument(name = "pg.rotate_waitpoint_hmac_secret_all", skip_all)]
840    async fn rotate_waitpoint_hmac_secret_all(
841        &self,
842        args: RotateWaitpointHmacSecretAllArgs,
843    ) -> Result<RotateWaitpointHmacSecretAllResult, EngineError> {
844        // Wave 4 Agent D: Q4 single-global-row write against
845        // `ff_waitpoint_hmac`. `now_ms` is captured here (not
846        // inside the impl) so tests can inject a deterministic
847        // clock via the pool layer in the future.
848        let now_ms = std::time::SystemTime::now()
849            .duration_since(std::time::UNIX_EPOCH)
850            .map(|d| d.as_millis() as i64)
851            .unwrap_or(0);
852        signal::rotate_waitpoint_hmac_secret_all_impl(&self.pool, args, now_ms).await
853    }
854
855    #[tracing::instrument(name = "pg.seed_waitpoint_hmac_secret", skip_all)]
856    async fn seed_waitpoint_hmac_secret(
857        &self,
858        args: SeedWaitpointHmacSecretArgs,
859    ) -> Result<SeedOutcome, EngineError> {
860        // Issue #280: install-only boot-time seed against the global
861        // `ff_waitpoint_hmac` table. Idempotent — cairn calls this on
862        // every boot and the backend decides whether to INSERT.
863        let now_ms = std::time::SystemTime::now()
864            .duration_since(std::time::UNIX_EPOCH)
865            .map(|d| d.as_millis() as i64)
866            .unwrap_or(0);
867        signal::seed_waitpoint_hmac_secret_impl(&self.pool, args, now_ms).await
868    }
869
870    // ── Stream reads (streaming feature) ──
871
872    #[cfg(feature = "streaming")]
873    #[tracing::instrument(name = "pg.read_stream", skip_all)]
874    async fn read_stream(
875        &self,
876        execution_id: &ExecutionId,
877        attempt_index: AttemptIndex,
878        from: StreamCursor,
879        to: StreamCursor,
880        count_limit: u64,
881    ) -> Result<StreamFrames, EngineError> {
882        stream::read_stream(&self.pool, execution_id, attempt_index, from, to, count_limit).await
883    }
884
885    #[cfg(feature = "streaming")]
886    #[tracing::instrument(name = "pg.tail_stream", skip_all)]
887    async fn tail_stream(
888        &self,
889        execution_id: &ExecutionId,
890        attempt_index: AttemptIndex,
891        after: StreamCursor,
892        block_ms: u64,
893        count_limit: u64,
894        visibility: TailVisibility,
895    ) -> Result<StreamFrames, EngineError> {
896        let notifier = self
897            .stream_notifier
898            .as_ref()
899            .ok_or(EngineError::Unavailable {
900                op: "pg.tail_stream (notifier not initialised)",
901            })?;
902        stream::tail_stream(
903            &self.pool,
904            notifier,
905            execution_id,
906            attempt_index,
907            after,
908            block_ms,
909            count_limit,
910            visibility,
911        )
912        .await
913    }
914
915    #[cfg(feature = "streaming")]
916    #[tracing::instrument(name = "pg.read_summary", skip_all)]
917    async fn read_summary(
918        &self,
919        execution_id: &ExecutionId,
920        attempt_index: AttemptIndex,
921    ) -> Result<Option<SummaryDocument>, EngineError> {
922        stream::read_summary(&self.pool, execution_id, attempt_index).await
923    }
924
925    // ── RFC-019 Stage A — `subscribe_completion` ──────────────────
926    //
927    // Postgres real impl. Wraps the existing `ff_completion_event`
928    // outbox + `LISTEN ff_completion` machinery
929    // (see `completion::subscribe`) and adapts each completion
930    // payload into a `stream_subscribe::StreamEvent`.
931    //
932    // Cursor encoding: `POSTGRES_CURSOR_PREFIX (0x02)` + `event_id`
933    // (i64 BE). Stage A resume-from-cursor is not plumbed through the
934    // adapter (the existing subscriber tails from `max(event_id)`);
935    // Stage B threads the cursor into the replay path. The surface is
936    // correct today for consumers that subscribe from tail and
937    // persist cursors for future resume.
938    #[tracing::instrument(name = "pg.subscribe_completion", skip_all)]
939    async fn subscribe_completion(
940        &self,
941        _cursor: ff_core::stream_subscribe::StreamCursor,
942        filter: &ff_core::backend::ScannerFilter,
943    ) -> Result<ff_core::stream_events::CompletionSubscription, EngineError> {
944        use ff_core::stream_events::{CompletionEvent, CompletionOutcome};
945        use ff_core::stream_subscribe::encode_postgres_event_cursor;
946        use futures_core::Stream;
947        use std::pin::Pin;
948        use std::task::{Context, Poll};
949
950        // Delegate to the existing CompletionBackend implementation so
951        // the LISTEN/replay machinery is shared. When a non-noop
952        // `ScannerFilter` (#282) is supplied, route through the
953        // `_filtered` variant so the outbox-inline SQL filter applies.
954        // Resume-from-cursor is still unwired (Stage A surface tails
955        // from tail).
956        let inner = if filter.is_noop() {
957            ff_core::completion_backend::CompletionBackend::subscribe_completions(self).await?
958        } else {
959            ff_core::completion_backend::CompletionBackend::subscribe_completions_filtered(
960                self, filter,
961            )
962            .await?
963        };
964
965        struct Adapter {
966            inner: ff_core::completion_backend::CompletionStream,
967        }
968
969        impl Stream for Adapter {
970            type Item = Result<CompletionEvent, EngineError>;
971            fn poll_next(
972                mut self: Pin<&mut Self>,
973                cx: &mut Context<'_>,
974            ) -> Poll<Option<Self::Item>> {
975                match Pin::new(&mut self.inner).poll_next(cx) {
976                    Poll::Pending => Poll::Pending,
977                    Poll::Ready(None) => Poll::Ready(None),
978                    Poll::Ready(Some(payload)) => {
979                        // Placeholder cursor (0-event_id) because
980                        // `CompletionPayload` does not surface
981                        // `event_id` today. Family prefix stays stable
982                        // so persistence is forward-compatible.
983                        let cursor = encode_postgres_event_cursor(0);
984                        let event = CompletionEvent::new(
985                            cursor,
986                            payload.execution_id.clone(),
987                            CompletionOutcome::from_wire(&payload.outcome),
988                            payload.produced_at_ms,
989                        );
990                        Poll::Ready(Some(Ok(event)))
991                    }
992                }
993            }
994        }
995
996        Ok(Box::pin(Adapter { inner }))
997    }
998
999    // ── RFC-019 Stage B — `subscribe_lease_history` ──────────────
1000    //
1001    // Real Postgres impl. Tails the `ff_lease_event` outbox (written
1002    // by producer sites in `attempt.rs`, `flow.rs`, `suspend_ops.rs`,
1003    // and the `attempt_timeout` / `lease_expiry` reconcilers) via
1004    // `LISTEN ff_lease_event` + catch-up SELECT. Cursor encoding
1005    // matches `subscribe_completion`: `0x02 ++ event_id(BE8)`.
1006    //
1007    // Partition scope: hardcoded to partition 0 — mirrors the Valkey
1008    // Stage A impl, which tails partition 0's aggregate stream key.
1009    // Cross-partition consumers instantiate one backend per
1010    // partition + merge streams consumer-side (RFC-019 §Backend
1011    // Semantics).
1012    #[tracing::instrument(name = "pg.subscribe_lease_history", skip_all)]
1013    async fn subscribe_lease_history(
1014        &self,
1015        cursor: ff_core::stream_subscribe::StreamCursor,
1016        filter: &ff_core::backend::ScannerFilter,
1017    ) -> Result<ff_core::stream_events::LeaseHistorySubscription, EngineError> {
1018        lease_event_subscribe::subscribe(&self.pool, 0, cursor, filter.clone()).await
1019    }
1020
1021    // ── RFC-019 Stage B — `subscribe_signal_delivery` (#310) ─────
1022    //
1023    // Tails the `ff_signal_event` outbox (written by the producer
1024    // INSERT in `suspend_ops::deliver_signal_impl`) via
1025    // `LISTEN ff_signal_event` + catch-up SELECT. Cursor encoding
1026    // matches `subscribe_lease_history`: `0x02 ++ event_id(BE8)`.
1027    //
1028    // Partition scope: hardcoded to partition 0 — mirrors the Valkey
1029    // Stage B impl which tails partition 0's aggregate stream key.
1030    #[tracing::instrument(name = "pg.subscribe_signal_delivery", skip_all)]
1031    async fn subscribe_signal_delivery(
1032        &self,
1033        cursor: ff_core::stream_subscribe::StreamCursor,
1034        filter: &ff_core::backend::ScannerFilter,
1035    ) -> Result<ff_core::stream_events::SignalDeliverySubscription, EngineError> {
1036        signal_delivery_subscribe::subscribe(&self.pool, 0, cursor, filter.clone()).await
1037    }
1038}
1039
1040/// Minimum recommended `max_locks_per_transaction`. Partition-heavy
1041/// schemas (256 hash partitions per logical table) can exceed the
1042/// Postgres default of `64` per tx under modest concurrent bench
1043/// load — the Wave 7c bench hit `out of shared memory` at 16 workers
1044/// × 10k tasks with the default and unblocked at `512`. We warn at
1045/// boot rather than hard-fail because operators may legitimately
1046/// run with a tuned value that still exceeds 64 but sits below our
1047/// threshold.
1048const MIN_MAX_LOCKS_PER_TRANSACTION: i64 = 256;
1049
1050/// Probe `max_locks_per_transaction` at connect time + log a warning
1051/// when the current value is below the production-safe threshold.
1052/// Never fails the connect — probe errors are logged at debug and
1053/// swallowed (pg_show may be restricted on exotic deploys).
1054async fn warn_if_max_locks_low(pool: &PgPool) {
1055    let row: Result<(String,), sqlx::Error> =
1056        sqlx::query_as("SHOW max_locks_per_transaction")
1057            .fetch_one(pool)
1058            .await;
1059    match row {
1060        Ok((raw,)) => emit_max_locks_decision(&raw),
1061        Err(e) => {
1062            tracing::debug!("failed to probe max_locks_per_transaction: {e}");
1063        }
1064    }
1065}
1066
1067/// Pure decision surface for the max-locks probe — extracted for
1068/// unit-testability (the live probe is gated by a running Postgres).
1069/// Returns the integer value when a warning SHOULD fire, `None`
1070/// otherwise (either the raw is valid + at/above threshold, or the
1071/// raw is unparseable — the latter is debug-only).
1072fn max_locks_warn_value(raw: &str) -> Option<i64> {
1073    match raw.parse::<i64>() {
1074        Ok(v) if v < MIN_MAX_LOCKS_PER_TRANSACTION => Some(v),
1075        Ok(_) => None,
1076        Err(e) => {
1077            tracing::debug!(raw, "failed to parse max_locks_per_transaction: {e}");
1078            None
1079        }
1080    }
1081}
1082
1083fn emit_max_locks_decision(raw: &str) {
1084    if let Some(v) = max_locks_warn_value(raw) {
1085        tracing::warn!(
1086            current = v,
1087            recommended = MIN_MAX_LOCKS_PER_TRANSACTION,
1088            "postgres max_locks_per_transaction={v} is below the recommended \
1089             minimum ({MIN_MAX_LOCKS_PER_TRANSACTION}); partition-heavy workloads \
1090             may hit 'out of shared memory' under concurrent load. \
1091             See docs/operator-guide-postgres.md."
1092        );
1093    }
1094}
1095
1096#[cfg(test)]
1097mod max_locks_tests {
1098    use super::{max_locks_warn_value, MIN_MAX_LOCKS_PER_TRANSACTION};
1099
1100    #[test]
1101    fn warns_when_below_threshold() {
1102        assert_eq!(max_locks_warn_value("64"), Some(64));
1103        assert_eq!(
1104            max_locks_warn_value(&(MIN_MAX_LOCKS_PER_TRANSACTION - 1).to_string()),
1105            Some(MIN_MAX_LOCKS_PER_TRANSACTION - 1)
1106        );
1107    }
1108
1109    #[test]
1110    fn silent_at_or_above_threshold() {
1111        assert_eq!(
1112            max_locks_warn_value(&MIN_MAX_LOCKS_PER_TRANSACTION.to_string()),
1113            None
1114        );
1115        assert_eq!(max_locks_warn_value("1024"), None);
1116    }
1117
1118    #[test]
1119    fn silent_for_unparseable_raw() {
1120        assert_eq!(max_locks_warn_value("not-a-number"), None);
1121    }
1122}