Skip to main content

ff_backend_postgres/
lib.rs

1//! `EngineBackend` implementation backed by Postgres.
2//!
3//! **RFC-v0.7 Wave 0 — scaffold.** This crate lands the
4//! [`PostgresBackend`] struct + the `impl EngineBackend` block with
5//! every method stubbed to return `EngineError::Unavailable`.
6//! Subsequent waves fill in bodies:
7//!
8//! * Wave 1: cross-cutting error/helpers.
9//! * Wave 2: describe / list / resolve (read surface).
10//! * Wave 3: schema migrations (replaces the Wave 0 placeholder).
11//! * Wave 4: LISTEN/NOTIFY wiring + stream reads/tails.
12//! * Wave 5-7: hot-path write methods.
13//! * Wave 8: ff-server wire-up + dual-backend running.
14//!
15//! The trait stays object-safe; consumers can hold
16//! `Arc<dyn EngineBackend>`. No ferriskey dep — this crate's
17//! transport is `sqlx`.
18
19#![allow(clippy::result_large_err)]
20
21use std::sync::Arc;
22use std::time::Duration;
23
24use async_trait::async_trait;
25
26use ff_core::backend::{
27    AppendFrameOutcome, BackendConfig, CancelFlowPolicy, CancelFlowWait, CapabilitySet,
28    ClaimPolicy, FailOutcome, FailureClass, FailureReason, Frame, Handle, LeaseRenewal,
29    PendingWaitpoint, ReclaimToken, ResumeSignal, SummaryDocument, TailVisibility,
30    UsageDimensions,
31};
32#[cfg(feature = "core")]
33use ff_core::contracts::{
34    AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
35    ApplyDependencyToChildResult, ClaimResumedExecutionArgs, ClaimResumedExecutionResult,
36    CreateExecutionArgs, CreateExecutionResult, CreateFlowArgs, CreateFlowResult,
37    DeliverSignalArgs, DeliverSignalResult, EdgeDependencyPolicy, EdgeDirection, EdgeSnapshot,
38    ListExecutionsPage, ListFlowsPage, ListLanesPage, ListPendingWaitpointsArgs,
39    ListPendingWaitpointsResult, ListSuspendedPage, SetEdgeGroupPolicyResult,
40    StageDependencyEdgeArgs, StageDependencyEdgeResult,
41};
42#[cfg(feature = "core")]
43use ff_core::state::PublicState;
44use ff_core::contracts::{
45    CancelFlowResult, ExecutionSnapshot, FlowSnapshot, ReportUsageResult,
46    RotateWaitpointHmacSecretAllArgs, RotateWaitpointHmacSecretAllResult, SeedOutcome,
47    SeedWaitpointHmacSecretArgs, SuspendArgs, SuspendOutcome,
48};
49#[cfg(feature = "core")]
50use ff_core::contracts::ExecutionInfo;
51// RFC-020 Wave 9 Spine-A pt.1 — operator-control mutating surfaces.
52#[cfg(feature = "core")]
53use ff_core::contracts::{
54    CancelExecutionArgs, CancelExecutionResult, RevokeLeaseArgs, RevokeLeaseResult,
55};
56// RFC-020 Wave 9 Spine-A pt.2 — operator-control + flow-cancel mutating surfaces.
57#[cfg(feature = "core")]
58use ff_core::contracts::{
59    CancelFlowArgs, CancelFlowHeader, ChangePriorityArgs, ChangePriorityResult,
60    ReplayExecutionArgs, ReplayExecutionResult,
61};
62// RFC-020 Wave 9 Standalone-1 — budget/quota admin surfaces.
63#[cfg(feature = "core")]
64use ff_core::contracts::{
65    BudgetStatus, CreateBudgetArgs, CreateBudgetResult, CreateQuotaPolicyArgs,
66    CreateQuotaPolicyResult, ReportUsageAdminArgs, ResetBudgetArgs, ResetBudgetResult,
67};
68#[cfg(feature = "streaming")]
69use ff_core::contracts::{StreamCursor, StreamFrames};
70use ff_core::engine_backend::EngineBackend;
71use ff_core::engine_error::EngineError;
72#[cfg(feature = "core")]
73use ff_core::partition::PartitionKey;
74use ff_core::partition::PartitionConfig;
75#[cfg(feature = "streaming")]
76use ff_core::types::AttemptIndex;
77#[cfg(feature = "core")]
78use ff_core::types::EdgeId;
79use ff_core::types::{BudgetId, ExecutionId, FlowId, LaneId, LeaseFence, TimestampMs};
80// Wave 5a — re-export `PgPool` so crates that depend on
81// `ff-backend-postgres` (and not `sqlx` directly) can name the pool
82// type in their own APIs (e.g. `ff-engine::dispatch_via_postgres`).
83pub use sqlx::PgPool;
84
85#[cfg(feature = "core")]
86mod admin;
87pub mod attempt;
88pub mod budget;
89pub mod completion;
90#[cfg(feature = "core")]
91pub mod dispatch;
92pub mod error;
93pub mod exec_core;
94pub mod flow;
95#[cfg(feature = "core")]
96pub mod flow_staging;
97pub mod handle_codec;
98mod lease_event;
99mod lease_event_subscribe;
100pub mod listener;
101pub mod migrate;
102#[cfg(feature = "core")]
103pub mod operator;
104#[cfg(feature = "core")]
105mod operator_event;
106pub mod pool;
107#[cfg(feature = "core")]
108pub mod reconcilers;
109#[cfg(feature = "core")]
110pub mod scanner_supervisor;
111#[cfg(feature = "core")]
112pub mod scheduler;
113pub mod signal;
114mod signal_delivery_subscribe;
115mod signal_event;
116#[cfg(feature = "streaming")]
117pub mod stream;
118pub mod suspend;
119pub mod suspend_ops;
120pub mod version;
121
122pub use completion::{PostgresCompletionStream, COMPLETION_CHANNEL};
123pub use error::{map_sqlx_error, PostgresTransportError};
124pub use listener::StreamNotifier;
125pub use migrate::{apply_migrations, MigrationError};
126#[cfg(feature = "core")]
127pub use scanner_supervisor::{PostgresScannerConfig, PostgresScannerHandle};
128pub use version::check_schema_version;
129
130// Re-export the new `PostgresConnection` shape so consumers can name
131// it from this crate directly without dipping into `ff_core::backend`.
132// `BackendConfig` is already imported above and is part of the
133// `connect()` signature, so it re-exports transparently via
134// rustdoc — no explicit `pub use` needed.
135pub use ff_core::backend::PostgresConnection;
136
137/// Postgres-backed `EngineBackend`.
138///
139/// Wave 0 shape: holds a `sqlx::PgPool`, the deployment's
140/// [`PartitionConfig`] (Q5 — partition column survives on Postgres
141/// with hash partitioning across the same 256 slots Valkey uses),
142/// and an optional `ff_observability::Metrics` handle mirroring
143/// [`ff_backend_valkey::ValkeyBackend`]. Future waves add the
144/// [`StreamNotifier`] handle once Wave 4 wires up LISTEN/NOTIFY.
145/// RFC-018 Stage A: build a [`ff_core::capability::Supports`]
146/// snapshot for the Postgres backend at v0.9. `true` fields correspond
147/// to trait methods `PostgresBackend` overrides with a real body
148/// (ingress, scheduler, seed + rotate HMAC, flow cancel bulk path,
149/// stream reads, RFC-019 subscriptions, cross-cutting). `false` fields
150/// correspond to trait methods that still return
151/// `EngineError::Unavailable` on Postgres today — Wave 9 follow-up
152/// scope. See `docs/POSTGRES_PARITY_MATRIX.md` for the authoritative
153/// per-row status.
154///
155/// `prepare` is `true` on Postgres even though `prepare()` returns
156/// `PrepareOutcome::NoOp` (schema migrations are applied out-of-band).
157/// `Supports.prepare` means "can the consumer call `backend.prepare()`
158/// without getting `EngineError::Unavailable`?" — for Postgres the
159/// answer is yes; NoOp is a successful well-defined outcome. Gating
160/// the call off in consumer UIs based on a `false` bool would hide
161/// a callable + correct method.
162///
163/// `Supports` is `#[non_exhaustive]` so struct-literal construction
164/// from this crate is forbidden; we start from
165/// [`ff_core::capability::Supports::none`] and mutate named fields.
166fn postgres_supports_base() -> ff_core::capability::Supports {
167    let mut s = ff_core::capability::Supports::none();
168
169    // ── Flow bulk cancel (impl) ──
170    s.cancel_flow_wait_timeout = true;
171    s.cancel_flow_wait_indefinite = true;
172
173    // ── Admin seed + rotate HMAC (impl) ──
174    s.rotate_waitpoint_hmac_secret_all = true;
175    s.seed_waitpoint_hmac_secret = true;
176
177    // ── Scheduler ──
178    s.claim_for_worker = true;
179
180    // ── RFC-019 subscriptions ──
181    s.subscribe_lease_history = true;
182    s.subscribe_completion = true;
183    s.subscribe_signal_delivery = true;
184    s.subscribe_instance_tags = false;
185
186    // ── Streaming (RFC-015) ──
187    s.stream_durable_summary = true;
188    s.stream_best_effort_live = true;
189
190    // ── Boot (Postgres returns NoOp but call is callable + correct) ──
191    s.prepare = true;
192
193    // ── Wave 9 (v0.11) — operator control + read model + budget/quota
194    //    admin + list_pending_waitpoints + cancel_flow_header +
195    //    ack_cancel_member all ship concretely on Postgres via
196    //    RFC-020 Rev 7. subscribe_instance_tags remains `false` per
197    //    #311 (speculative demand, served by list_executions +
198    //    ScannerFilter::with_instance_tag today).
199    s.cancel_execution = true;
200    s.change_priority = true;
201    s.replay_execution = true;
202    s.revoke_lease = true;
203    s.read_execution_state = true;
204    s.read_execution_info = true;
205    s.get_execution_result = true;
206    s.budget_admin = true;
207    s.quota_admin = true;
208    s.list_pending_waitpoints = true;
209    s.cancel_flow_header = true;
210    s.ack_cancel_member = true;
211
212    s
213}
214
215pub struct PostgresBackend {
216    #[allow(dead_code)] // filled in across waves 2-7
217    pool: PgPool,
218    #[allow(dead_code)]
219    partition_config: PartitionConfig,
220    #[allow(dead_code)]
221    metrics: Option<Arc<ff_observability::Metrics>>,
222    /// Wave 4: shared LISTEN notifier. Present on `connect()`-built
223    /// backends; `None` on bare `from_pool` constructions that skip
224    /// LISTEN wiring (tests that only exercise the write path).
225    #[allow(dead_code)]
226    stream_notifier: Option<Arc<StreamNotifier>>,
227    /// RFC-017 Wave 8 Stage E3: scanner supervisor handle. Spawned
228    /// during [`Self::connect_with_metrics`] when the caller opts in
229    /// via [`Self::spawn_scanners_during_connect`]; drained on
230    /// [`EngineBackend::shutdown_prepare`]. `None` on `from_pool` /
231    /// test builds that don't want background reconcilers.
232    #[cfg(feature = "core")]
233    scanner_handle: Option<Arc<scanner_supervisor::PostgresScannerHandle>>,
234}
235
236impl PostgresBackend {
237    /// Dial Postgres with [`BackendConfig`] and return the backend as
238    /// `Arc<dyn EngineBackend>`. Modeled on
239    /// [`ff_backend_valkey::ValkeyBackend::connect`] so ff-server /
240    /// SDK call sites can swap backends without changing the
241    /// constructor shape.
242    ///
243    /// **Wave 0:** builds the pool and constructs the backend. Does
244    /// NOT run migrations (Q12 — operator out-of-band). Does NOT run
245    /// the schema-version check (Wave 3 adds the version const and
246    /// wires [`check_schema_version`] in). Does NOT start the LISTEN
247    /// task (Wave 4).
248    ///
249    /// Returns `EngineError::Unavailable` when the config's
250    /// connection arm is not Postgres.
251    pub async fn connect(config: BackendConfig) -> Result<Arc<dyn EngineBackend>, EngineError> {
252        let pool = pool::build_pool(&config).await?;
253        warn_if_max_locks_low(&pool).await;
254        let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
255        let backend = Self {
256            pool,
257            partition_config: PartitionConfig::default(),
258            metrics: None,
259            stream_notifier,
260            #[cfg(feature = "core")]
261            scanner_handle: None,
262        };
263        Ok(Arc::new(backend))
264    }
265
266    /// Test / advanced constructor: build a `PostgresBackend` from an
267    /// already-constructed `PgPool` + explicit partition config. No
268    /// network I/O. Useful for integration tests against a shared
269    /// pool and for a future migration CLI that wants to reuse a pool
270    /// across migrate-run + smoke-check.
271    pub fn from_pool(pool: PgPool, partition_config: PartitionConfig) -> Arc<Self> {
272        let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
273        Arc::new(Self {
274            pool,
275            partition_config,
276            metrics: None,
277            stream_notifier,
278            #[cfg(feature = "core")]
279            scanner_handle: None,
280        })
281    }
282
283    /// RFC-017 Wave 8 Stage E1: dial Postgres with an explicit
284    /// [`PartitionConfig`] + shared [`ff_observability::Metrics`].
285    /// Mirrors [`ff_backend_valkey::ValkeyBackend::connect_with_metrics`]
286    /// so `ff-server::Server::start_with_metrics` can wire the Postgres
287    /// branch without reaching into the pool builder directly.
288    ///
289    /// Returns a concrete `Arc<Self>` rather than `Arc<dyn EngineBackend>`
290    /// so the caller can cast to the trait object after any additional
291    /// field installs (parallel to the Valkey path which calls
292    /// `with_scheduler` / `with_stream_semaphore_permits` before the
293    /// cast). Stage E1 does NOT run `apply_migrations` — schema
294    /// provisioning is an operator concern (matches the Wave 0 contract
295    /// on [`Self::connect`]).
296    pub async fn connect_with_metrics(
297        config: BackendConfig,
298        partition_config: PartitionConfig,
299        metrics: Arc<ff_observability::Metrics>,
300    ) -> Result<Arc<Self>, EngineError> {
301        let pool = pool::build_pool(&config).await?;
302        warn_if_max_locks_low(&pool).await;
303        let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
304        Ok(Arc::new(Self {
305            pool,
306            partition_config,
307            metrics: Some(metrics),
308            stream_notifier,
309            #[cfg(feature = "core")]
310            scanner_handle: None,
311        }))
312    }
313
314    /// RFC-017 Wave 8 Stage E3: spawn the six Postgres reconcilers as
315    /// background tick loops. Returns `true` if the scanner handle
316    /// was installed; `false` if the `Arc<Self>` has outstanding
317    /// clones (mirrors the Valkey `with_*` pattern). Callers must
318    /// invoke this before publishing the `Arc<dyn EngineBackend>` so
319    /// the underlying `Arc::get_mut` succeeds.
320    #[cfg(feature = "core")]
321    pub fn with_scanners(
322        self: &mut Arc<Self>,
323        cfg: scanner_supervisor::PostgresScannerConfig,
324    ) -> bool {
325        let Some(inner) = Arc::get_mut(self) else {
326            return false;
327        };
328        let handle = scanner_supervisor::spawn_scanners(inner.pool.clone(), cfg);
329        inner.scanner_handle = Some(Arc::new(handle));
330        true
331    }
332
333    /// Accessor for the underlying `PgPool`. Stage E1 uses this so
334    /// `ff-server::Server::start_with_metrics` can run
335    /// [`apply_migrations`] on the same pool before handing the backend
336    /// out as `Arc<dyn EngineBackend>`.
337    pub fn pool(&self) -> &PgPool {
338        &self.pool
339    }
340
341    /// Create one execution row (+ seed the lane registry if new).
342    ///
343    /// **RFC-017 Stage A:** this inherent method is retained as a
344    /// thin wrapper around the module-level impl so existing in-tree
345    /// callers (ff-server request handlers, integration tests) keep
346    /// compiling. The trait-lifted entry point is
347    /// [`EngineBackend::create_execution`] below, which calls the
348    /// same impl. Return shape differs — inherent returns
349    /// `ExecutionId`, trait returns
350    /// [`CreateExecutionResult`] per RFC-017 §5 — so we cannot simply
351    /// replace the inherent method. A follow-up PR may deprecate
352    /// this inherent alongside the broader ingress shape alignment.
353    #[cfg(feature = "core")]
354    #[tracing::instrument(name = "pg.create_execution", skip_all)]
355    pub async fn create_execution(
356        &self,
357        args: CreateExecutionArgs,
358    ) -> Result<ExecutionId, EngineError> {
359        exec_core::create_execution_impl(&self.pool, &self.partition_config, args).await
360    }
361
362    // ── RFC-017 Stage A: inherent ingress methods retained for
363    // back-compat with in-tree test harnesses + ff-server direct
364    // calls. The trait-lifted peers (`EngineBackend::create_flow`
365    // etc.) delegate to the SAME module-level impls under the hood.
366    // Follow-up PR may sunset these inherents once all in-tree
367    // consumers route through `Arc<dyn EngineBackend>`.
368
369    #[cfg(feature = "core")]
370    #[tracing::instrument(name = "pg.create_flow.inherent", skip_all)]
371    pub async fn create_flow(
372        &self,
373        args: &CreateFlowArgs,
374    ) -> Result<CreateFlowResult, EngineError> {
375        flow_staging::create_flow(&self.pool, &self.partition_config, args).await
376    }
377
378    #[cfg(feature = "core")]
379    #[tracing::instrument(name = "pg.add_execution_to_flow.inherent", skip_all)]
380    pub async fn add_execution_to_flow(
381        &self,
382        args: &AddExecutionToFlowArgs,
383    ) -> Result<AddExecutionToFlowResult, EngineError> {
384        flow_staging::add_execution_to_flow(&self.pool, &self.partition_config, args).await
385    }
386
387    #[cfg(feature = "core")]
388    #[tracing::instrument(name = "pg.stage_dependency_edge.inherent", skip_all)]
389    pub async fn stage_dependency_edge(
390        &self,
391        args: &StageDependencyEdgeArgs,
392    ) -> Result<StageDependencyEdgeResult, EngineError> {
393        flow_staging::stage_dependency_edge(&self.pool, &self.partition_config, args).await
394    }
395
396    #[cfg(feature = "core")]
397    #[tracing::instrument(name = "pg.apply_dependency_to_child.inherent", skip_all)]
398    pub async fn apply_dependency_to_child(
399        &self,
400        args: &ApplyDependencyToChildArgs,
401    ) -> Result<ApplyDependencyToChildResult, EngineError> {
402        flow_staging::apply_dependency_to_child(&self.pool, &self.partition_config, args).await
403    }
404}
405
406/// Short helper: every stub method returns this. Kept as a function
407/// (rather than a macro) so rust-analyzer / IDE jumps show a single
408/// definition site for the Wave 0 stub pattern.
409#[inline]
410fn unavailable<T>(op: &'static str) -> Result<T, EngineError> {
411    Err(EngineError::Unavailable { op })
412}
413
414#[async_trait]
415impl EngineBackend for PostgresBackend {
416    // ── Claim + lifecycle ──
417
418    #[tracing::instrument(name = "pg.claim", skip_all)]
419    async fn claim(
420        &self,
421        lane: &LaneId,
422        capabilities: &CapabilitySet,
423        policy: ClaimPolicy,
424    ) -> Result<Option<Handle>, EngineError> {
425        attempt::claim(&self.pool, lane, capabilities, &policy).await
426    }
427
428    #[tracing::instrument(name = "pg.renew", skip_all)]
429    async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError> {
430        attempt::renew(&self.pool, handle).await
431    }
432
433    #[tracing::instrument(name = "pg.progress", skip_all)]
434    async fn progress(
435        &self,
436        handle: &Handle,
437        percent: Option<u8>,
438        message: Option<String>,
439    ) -> Result<(), EngineError> {
440        attempt::progress(&self.pool, handle, percent, message).await
441    }
442
443    #[tracing::instrument(name = "pg.append_frame", skip_all)]
444    async fn append_frame(
445        &self,
446        handle: &Handle,
447        frame: Frame,
448    ) -> Result<AppendFrameOutcome, EngineError> {
449        #[cfg(feature = "streaming")]
450        {
451            stream::append_frame(&self.pool, &self.partition_config, handle, frame).await
452        }
453        #[cfg(not(feature = "streaming"))]
454        {
455            let _ = (handle, frame);
456            unavailable("pg.append_frame")
457        }
458    }
459
460    #[tracing::instrument(name = "pg.complete", skip_all)]
461    async fn complete(
462        &self,
463        handle: &Handle,
464        payload: Option<Vec<u8>>,
465    ) -> Result<(), EngineError> {
466        attempt::complete(&self.pool, handle, payload).await
467    }
468
469    #[tracing::instrument(name = "pg.fail", skip_all)]
470    async fn fail(
471        &self,
472        handle: &Handle,
473        reason: FailureReason,
474        classification: FailureClass,
475    ) -> Result<FailOutcome, EngineError> {
476        attempt::fail(&self.pool, handle, reason, classification).await
477    }
478
479    #[tracing::instrument(name = "pg.cancel", skip_all)]
480    async fn cancel(&self, handle: &Handle, reason: &str) -> Result<(), EngineError> {
481        let payload = handle_codec::decode_handle(handle)?;
482        exec_core::cancel_impl(
483            &self.pool,
484            &self.partition_config,
485            &payload.execution_id,
486            reason,
487        )
488        .await
489    }
490
491    #[tracing::instrument(name = "pg.suspend", skip_all)]
492    async fn suspend(
493        &self,
494        handle: &Handle,
495        args: SuspendArgs,
496    ) -> Result<SuspendOutcome, EngineError> {
497        suspend_ops::suspend_impl(&self.pool, &self.partition_config, handle, args).await
498    }
499
500    #[tracing::instrument(name = "pg.suspend_by_triple", skip_all)]
501    async fn suspend_by_triple(
502        &self,
503        exec_id: ExecutionId,
504        triple: LeaseFence,
505        args: SuspendArgs,
506    ) -> Result<SuspendOutcome, EngineError> {
507        suspend_ops::suspend_by_triple_impl(
508            &self.pool,
509            &self.partition_config,
510            exec_id,
511            triple,
512            args,
513        )
514        .await
515    }
516
517    #[tracing::instrument(name = "pg.create_waitpoint", skip_all)]
518    async fn create_waitpoint(
519        &self,
520        _handle: &Handle,
521        _waitpoint_key: &str,
522        _expires_in: Duration,
523    ) -> Result<PendingWaitpoint, EngineError> {
524        unavailable("pg.create_waitpoint")
525    }
526
527    #[tracing::instrument(name = "pg.observe_signals", skip_all)]
528    async fn observe_signals(
529        &self,
530        handle: &Handle,
531    ) -> Result<Vec<ResumeSignal>, EngineError> {
532        suspend_ops::observe_signals_impl(&self.pool, handle).await
533    }
534
535    #[tracing::instrument(name = "pg.claim_from_reclaim", skip_all)]
536    async fn claim_from_reclaim(
537        &self,
538        token: ReclaimToken,
539    ) -> Result<Option<Handle>, EngineError> {
540        attempt::claim_from_reclaim(&self.pool, token).await
541    }
542
543    #[tracing::instrument(name = "pg.delay", skip_all)]
544    async fn delay(
545        &self,
546        handle: &Handle,
547        delay_until: TimestampMs,
548    ) -> Result<(), EngineError> {
549        attempt::delay(&self.pool, handle, delay_until).await
550    }
551
552    #[tracing::instrument(name = "pg.wait_children", skip_all)]
553    async fn wait_children(&self, handle: &Handle) -> Result<(), EngineError> {
554        attempt::wait_children(&self.pool, handle).await
555    }
556
557    // ── Read / admin ──
558
559    #[tracing::instrument(name = "pg.describe_execution", skip_all)]
560    async fn describe_execution(
561        &self,
562        id: &ExecutionId,
563    ) -> Result<Option<ExecutionSnapshot>, EngineError> {
564        exec_core::describe_execution_impl(&self.pool, &self.partition_config, id).await
565    }
566
567    #[tracing::instrument(name = "pg.describe_flow", skip_all)]
568    async fn describe_flow(
569        &self,
570        id: &FlowId,
571    ) -> Result<Option<FlowSnapshot>, EngineError> {
572        flow::describe_flow(&self.pool, &self.partition_config, id).await
573    }
574
575    #[cfg(feature = "core")]
576    #[tracing::instrument(name = "pg.list_edges", skip_all)]
577    async fn list_edges(
578        &self,
579        flow_id: &FlowId,
580        direction: EdgeDirection,
581    ) -> Result<Vec<EdgeSnapshot>, EngineError> {
582        flow::list_edges(&self.pool, &self.partition_config, flow_id, direction).await
583    }
584
585    #[cfg(feature = "core")]
586    #[tracing::instrument(name = "pg.describe_edge", skip_all)]
587    async fn describe_edge(
588        &self,
589        flow_id: &FlowId,
590        edge_id: &EdgeId,
591    ) -> Result<Option<EdgeSnapshot>, EngineError> {
592        flow::describe_edge(&self.pool, &self.partition_config, flow_id, edge_id).await
593    }
594
595    #[cfg(feature = "core")]
596    #[tracing::instrument(name = "pg.resolve_execution_flow_id", skip_all)]
597    async fn resolve_execution_flow_id(
598        &self,
599        eid: &ExecutionId,
600    ) -> Result<Option<FlowId>, EngineError> {
601        exec_core::resolve_execution_flow_id_impl(&self.pool, &self.partition_config, eid).await
602    }
603
604    #[cfg(feature = "core")]
605    #[tracing::instrument(name = "pg.list_flows", skip_all)]
606    async fn list_flows(
607        &self,
608        partition: PartitionKey,
609        cursor: Option<FlowId>,
610        limit: usize,
611    ) -> Result<ListFlowsPage, EngineError> {
612        flow::list_flows(&self.pool, partition, cursor, limit).await
613    }
614
615    #[cfg(feature = "core")]
616    #[tracing::instrument(name = "pg.list_lanes", skip_all)]
617    async fn list_lanes(
618        &self,
619        cursor: Option<LaneId>,
620        limit: usize,
621    ) -> Result<ListLanesPage, EngineError> {
622        admin::list_lanes_impl(&self.pool, cursor, limit).await
623    }
624
625    #[cfg(feature = "core")]
626    #[tracing::instrument(name = "pg.list_suspended", skip_all)]
627    async fn list_suspended(
628        &self,
629        partition: PartitionKey,
630        cursor: Option<ExecutionId>,
631        limit: usize,
632    ) -> Result<ListSuspendedPage, EngineError> {
633        admin::list_suspended_impl(&self.pool, partition, cursor, limit).await
634    }
635
636    #[cfg(feature = "core")]
637    #[tracing::instrument(name = "pg.list_executions", skip_all)]
638    async fn list_executions(
639        &self,
640        partition: PartitionKey,
641        cursor: Option<ExecutionId>,
642        limit: usize,
643    ) -> Result<ListExecutionsPage, EngineError> {
644        exec_core::list_executions_impl(
645            &self.pool,
646            &self.partition_config,
647            partition,
648            cursor,
649            limit,
650        )
651        .await
652    }
653
654    // ── Trigger ops (issue #150) ──
655
656    #[cfg(feature = "core")]
657    #[tracing::instrument(name = "pg.deliver_signal", skip_all)]
658    async fn deliver_signal(
659        &self,
660        args: DeliverSignalArgs,
661    ) -> Result<DeliverSignalResult, EngineError> {
662        suspend_ops::deliver_signal_impl(&self.pool, &self.partition_config, args).await
663    }
664
665    #[cfg(feature = "core")]
666    #[tracing::instrument(name = "pg.claim_resumed_execution", skip_all)]
667    async fn claim_resumed_execution(
668        &self,
669        args: ClaimResumedExecutionArgs,
670    ) -> Result<ClaimResumedExecutionResult, EngineError> {
671        suspend_ops::claim_resumed_execution_impl(&self.pool, &self.partition_config, args).await
672    }
673
674    // ── RFC-020 Wave 9 Spine-B — read model (3 methods, §4.1) ────────
675    //
676    // Partition-local single-row reads against `ff_exec_core` (+ LATERAL
677    // join on `ff_attempt` for `read_execution_info`). READ COMMITTED
678    // (no CAS; all three are read-only). `get_execution_result` returns
679    // current-attempt semantics per §7.8 (matches Valkey's
680    // `GET ctx.result()` primitive). Capability flips land at the Wave 9
681    // release PR per RFC §6.3.
682
683    #[cfg(feature = "core")]
684    #[tracing::instrument(name = "pg.read_execution_state", skip_all)]
685    async fn read_execution_state(
686        &self,
687        id: &ExecutionId,
688    ) -> Result<Option<PublicState>, EngineError> {
689        exec_core::read_execution_state_impl(&self.pool, &self.partition_config, id).await
690    }
691
692    #[cfg(feature = "core")]
693    #[tracing::instrument(name = "pg.read_execution_info", skip_all)]
694    async fn read_execution_info(
695        &self,
696        id: &ExecutionId,
697    ) -> Result<Option<ExecutionInfo>, EngineError> {
698        exec_core::read_execution_info_impl(&self.pool, &self.partition_config, id).await
699    }
700
701    #[tracing::instrument(name = "pg.get_execution_result", skip_all)]
702    async fn get_execution_result(
703        &self,
704        id: &ExecutionId,
705    ) -> Result<Option<Vec<u8>>, EngineError> {
706        exec_core::get_execution_result_impl(&self.pool, &self.partition_config, id).await
707    }
708
709    // ── RFC-020 Wave 9 Standalone-2 — list_pending_waitpoints (§4.5) ─
710    //
711    // Read-only projection of `ff_waitpoint_pending` serving the 10-
712    // field `PendingWaitpointInfo` contract. Producer-side writes of
713    // the 3 new 0011 columns (`state`, `required_signal_names`,
714    // `activated_at_ms`) land alongside this method in the same PR —
715    // see `suspend_ops::suspend_core` INSERT site. Capability flip
716    // deferred to Wave 9 release PR per RFC §6.3.
717    #[cfg(feature = "core")]
718    #[tracing::instrument(name = "pg.list_pending_waitpoints", skip_all)]
719    async fn list_pending_waitpoints(
720        &self,
721        args: ListPendingWaitpointsArgs,
722    ) -> Result<ListPendingWaitpointsResult, EngineError> {
723        suspend_ops::list_pending_waitpoints_impl(&self.pool, args).await
724    }
725
726    // ── RFC-020 Wave 9 Spine-A pt.1 — operator-control mutations (§4.2) ─
727    //
728    // Two methods landing behind `Supports.cancel_execution` +
729    // `Supports.revoke_lease` (both stay `false` until the Wave 9
730    // release PR flips them atomically, RFC §6.3). SERIALIZABLE + CAS +
731    // `ff_lease_event` outbox emit on the same tx (§4.2.6 + §4.2.7).
732
733    #[cfg(feature = "core")]
734    #[tracing::instrument(name = "pg.cancel_execution", skip_all)]
735    async fn cancel_execution(
736        &self,
737        args: CancelExecutionArgs,
738    ) -> Result<CancelExecutionResult, EngineError> {
739        operator::cancel_execution_impl(&self.pool, args).await
740    }
741
742    #[cfg(feature = "core")]
743    #[tracing::instrument(name = "pg.revoke_lease", skip_all)]
744    async fn revoke_lease(
745        &self,
746        args: RevokeLeaseArgs,
747    ) -> Result<RevokeLeaseResult, EngineError> {
748        operator::revoke_lease_impl(&self.pool, args).await
749    }
750
751    // ── RFC-020 Wave 9 Spine-A pt.2 — operator control + flow cancel (§4.2.3 + §4.2.4 + §4.2.5) ─
752    //
753    // Four methods landing behind `Supports.change_priority` +
754    // `Supports.replay_execution` + `Supports.cancel_flow_header` +
755    // `Supports.ack_cancel_member` (all stay `false` until the Wave 9
756    // release PR flips them atomically, RFC §6.3). SERIALIZABLE + CAS +
757    // `ff_operator_event` outbox emit on the same tx (§4.2.6 + §4.2.7).
758    // `ack_cancel_member` is silent on the outbox (Valkey-parity).
759
760    #[cfg(feature = "core")]
761    #[tracing::instrument(name = "pg.change_priority", skip_all)]
762    async fn change_priority(
763        &self,
764        args: ChangePriorityArgs,
765    ) -> Result<ChangePriorityResult, EngineError> {
766        operator::change_priority_impl(&self.pool, args).await
767    }
768
769    #[cfg(feature = "core")]
770    #[tracing::instrument(name = "pg.replay_execution", skip_all)]
771    async fn replay_execution(
772        &self,
773        args: ReplayExecutionArgs,
774    ) -> Result<ReplayExecutionResult, EngineError> {
775        operator::replay_execution_impl(&self.pool, args).await
776    }
777
778    #[cfg(feature = "core")]
779    #[tracing::instrument(name = "pg.cancel_flow_header", skip_all)]
780    async fn cancel_flow_header(
781        &self,
782        args: CancelFlowArgs,
783    ) -> Result<CancelFlowHeader, EngineError> {
784        operator::cancel_flow_header_impl(&self.pool, &self.partition_config, args).await
785    }
786
787    #[cfg(feature = "core")]
788    #[tracing::instrument(name = "pg.ack_cancel_member", skip_all)]
789    async fn ack_cancel_member(
790        &self,
791        flow_id: &FlowId,
792        execution_id: &ExecutionId,
793    ) -> Result<(), EngineError> {
794        operator::ack_cancel_member_impl(
795            &self.pool,
796            &self.partition_config,
797            flow_id.clone(),
798            execution_id.clone(),
799        )
800        .await
801    }
802
803    // ── RFC-017 Stage A — ingress (promoted from inherent) ────
804
805    /// RFC-017 Wave 8 Stage E1: lift the inherent
806    /// [`PostgresBackend::create_execution`] onto the trait so
807    /// ff-server's migrated HTTP handler can dispatch to Postgres.
808    /// Post-insert the row is idempotent; the Postgres impl does not
809    /// distinguish `Created` from `Duplicate` at the helper level
810    /// (both paths commit and return the execution id), so we always
811    /// surface `Created { public_state: Waiting }` here. A follow-up
812    /// may lift the distinction if a consumer relies on it.
813    #[cfg(feature = "core")]
814    #[tracing::instrument(name = "pg.create_execution.trait", skip_all)]
815    async fn create_execution(
816        &self,
817        args: CreateExecutionArgs,
818    ) -> Result<CreateExecutionResult, EngineError> {
819        let eid = args.execution_id.clone();
820        exec_core::create_execution_impl(&self.pool, &self.partition_config, args).await?;
821        Ok(CreateExecutionResult::Created {
822            execution_id: eid,
823            public_state: PublicState::Waiting,
824        })
825    }
826
827    #[cfg(feature = "core")]
828    #[tracing::instrument(name = "pg.create_flow", skip_all)]
829    async fn create_flow(
830        &self,
831        args: CreateFlowArgs,
832    ) -> Result<CreateFlowResult, EngineError> {
833        flow_staging::create_flow(&self.pool, &self.partition_config, &args).await
834    }
835
836    #[cfg(feature = "core")]
837    #[tracing::instrument(name = "pg.add_execution_to_flow", skip_all)]
838    async fn add_execution_to_flow(
839        &self,
840        args: AddExecutionToFlowArgs,
841    ) -> Result<AddExecutionToFlowResult, EngineError> {
842        flow_staging::add_execution_to_flow(&self.pool, &self.partition_config, &args).await
843    }
844
845    #[cfg(feature = "core")]
846    #[tracing::instrument(name = "pg.stage_dependency_edge", skip_all)]
847    async fn stage_dependency_edge(
848        &self,
849        args: StageDependencyEdgeArgs,
850    ) -> Result<StageDependencyEdgeResult, EngineError> {
851        flow_staging::stage_dependency_edge(&self.pool, &self.partition_config, &args).await
852    }
853
854    #[cfg(feature = "core")]
855    #[tracing::instrument(name = "pg.apply_dependency_to_child", skip_all)]
856    async fn apply_dependency_to_child(
857        &self,
858        args: ApplyDependencyToChildArgs,
859    ) -> Result<ApplyDependencyToChildResult, EngineError> {
860        flow_staging::apply_dependency_to_child(&self.pool, &self.partition_config, &args).await
861    }
862
863    fn backend_label(&self) -> &'static str {
864        "postgres"
865    }
866
867    /// RFC-018 Stage A: populate the `Capabilities` snapshot from the
868    /// static [`postgres_supports_base`] shape. The Postgres backend
869    /// landed through RFC-017 Stage E4 at v0.8.0; fields still `false`
870    /// correspond to Wave-9 follow-up work (`cancel_flow_header`,
871    /// `ack_cancel_member`, read-model, operator control, budget /
872    /// quota, `list_pending_waitpoints`). See
873    /// `docs/POSTGRES_PARITY_MATRIX.md` for the per-row breakdown.
874    fn capabilities(&self) -> ff_core::capability::Capabilities {
875        ff_core::capability::Capabilities::new(
876            ff_core::capability::BackendIdentity::new(
877                "postgres",
878                ff_core::capability::Version::new(0, 11, 0),
879                "E-shipped",
880            ),
881            postgres_supports_base(),
882        )
883    }
884
885    /// Issue #281: no-op. Schema migrations are applied out-of-band
886    /// per `rfcs/drafts/v0.7-migration-master.md §Q12` (operator runs
887    /// `sqlx migrate run` or the future `ff-migrate` CLI). Boot runs a
888    /// schema-version check at connect time
889    /// ([`crate::version::check_schema_version`]) and refuses to
890    /// start on mismatch, so by the time `prepare()` is callable
891    /// there is nothing further to do.
892    async fn prepare(
893        &self,
894    ) -> Result<ff_core::backend::PrepareOutcome, EngineError> {
895        Ok(ff_core::backend::PrepareOutcome::NoOp)
896    }
897
898    /// RFC-017 Wave 8 Stage E3 (§4 row 9, §7): forward the claim to the
899    /// Postgres-native admission pipeline. Returns `NoWork` when no
900    /// eligible execution is admissible this scan cycle. Budget
901    /// breaches surface as `NoWork` (leaving the row eligible for a
902    /// retry by another worker); validation-class rejections
903    /// (malformed partition, unknown kid) surface as typed
904    /// [`EngineError`] variants mapped to the Server's 400/503 arms.
905    #[cfg(feature = "core")]
906    #[tracing::instrument(name = "pg.claim_for_worker", skip_all)]
907    async fn claim_for_worker(
908        &self,
909        args: ff_core::contracts::ClaimForWorkerArgs,
910    ) -> Result<ff_core::contracts::ClaimForWorkerOutcome, EngineError> {
911        let sched = scheduler::PostgresScheduler::new(self.pool.clone());
912        let grant_opt = sched
913            .claim_for_worker(
914                &args.lane_id,
915                &args.worker_id,
916                &args.worker_instance_id,
917                &args.worker_capabilities,
918                args.grant_ttl_ms,
919            )
920            .await?;
921        Ok(match grant_opt {
922            Some(g) => ff_core::contracts::ClaimForWorkerOutcome::granted(g),
923            None => ff_core::contracts::ClaimForWorkerOutcome::no_work(),
924        })
925    }
926
927    async fn ping(&self) -> Result<(), EngineError> {
928        // Postgres analogue to Valkey PING — single-round-trip pool
929        // liveness. Errors propagate as transport-class EngineError via
930        // the existing sqlx→EngineError map.
931        let _ = sqlx::query_scalar::<_, i32>("SELECT 1")
932            .fetch_one(&self.pool)
933            .await
934            .map_err(error::map_sqlx_error)?;
935        Ok(())
936    }
937
938    /// RFC-017 Wave 8 Stage E3: drain the scanner supervisor's
939    /// reconciler tasks up to `grace`, then close the sqlx pool.
940    /// Matches the Valkey backend's shutdown_prepare contract —
941    /// bounded best-effort drain, never returns an error.
942    async fn shutdown_prepare(&self, grace: Duration) -> Result<(), EngineError> {
943        #[cfg(feature = "core")]
944        if let Some(handle) = self.scanner_handle.as_ref() {
945            let timed_out = handle.shutdown(grace).await;
946            if timed_out > 0 {
947                tracing::warn!(
948                    timed_out,
949                    ?grace,
950                    "postgres scanner supervisor exceeded grace on shutdown"
951                );
952            }
953        }
954        Ok(())
955    }
956
957    #[tracing::instrument(name = "pg.cancel_flow", skip_all)]
958    async fn cancel_flow(
959        &self,
960        id: &FlowId,
961        policy: CancelFlowPolicy,
962        wait: CancelFlowWait,
963    ) -> Result<CancelFlowResult, EngineError> {
964        let result = flow::cancel_flow(&self.pool, &self.partition_config, id, policy).await?;
965        if let Some(deadline) = ff_core::engine_backend::cancel_flow_wait_deadline(wait) {
966            ff_core::engine_backend::wait_for_flow_cancellation(self, id, deadline).await?;
967        }
968        Ok(result)
969    }
970
971    #[cfg(feature = "core")]
972    #[tracing::instrument(name = "pg.set_edge_group_policy", skip_all)]
973    async fn set_edge_group_policy(
974        &self,
975        flow_id: &FlowId,
976        downstream_execution_id: &ExecutionId,
977        policy: EdgeDependencyPolicy,
978    ) -> Result<SetEdgeGroupPolicyResult, EngineError> {
979        flow::set_edge_group_policy(
980            &self.pool,
981            &self.partition_config,
982            flow_id,
983            downstream_execution_id,
984            policy,
985        )
986        .await
987    }
988
989    // ── Budget ──
990
991    #[tracing::instrument(name = "pg.report_usage", skip_all)]
992    async fn report_usage(
993        &self,
994        _handle: &Handle,
995        budget: &BudgetId,
996        dimensions: UsageDimensions,
997    ) -> Result<ReportUsageResult, EngineError> {
998        budget::report_usage_impl(&self.pool, &self.partition_config, budget, dimensions).await
999    }
1000
1001    // ── RFC-020 Wave 9 Standalone-1 — budget/quota admin (§4.4) ─────
1002    //
1003    // Five methods landing behind capability flags that stay `false`
1004    // until the Wave 9 release PR flips them atomically (RFC §6.3).
1005    // Schema + trait impls land now; capability-surface flip is one
1006    // PR later.
1007
1008    #[cfg(feature = "core")]
1009    #[tracing::instrument(name = "pg.create_budget", skip_all)]
1010    async fn create_budget(
1011        &self,
1012        args: CreateBudgetArgs,
1013    ) -> Result<CreateBudgetResult, EngineError> {
1014        budget::create_budget_impl(&self.pool, &self.partition_config, args).await
1015    }
1016
1017    #[cfg(feature = "core")]
1018    #[tracing::instrument(name = "pg.reset_budget", skip_all)]
1019    async fn reset_budget(
1020        &self,
1021        args: ResetBudgetArgs,
1022    ) -> Result<ResetBudgetResult, EngineError> {
1023        budget::reset_budget_impl(&self.pool, &self.partition_config, args).await
1024    }
1025
1026    #[cfg(feature = "core")]
1027    #[tracing::instrument(name = "pg.create_quota_policy", skip_all)]
1028    async fn create_quota_policy(
1029        &self,
1030        args: CreateQuotaPolicyArgs,
1031    ) -> Result<CreateQuotaPolicyResult, EngineError> {
1032        budget::create_quota_policy_impl(&self.pool, &self.partition_config, args).await
1033    }
1034
1035    #[cfg(feature = "core")]
1036    #[tracing::instrument(name = "pg.get_budget_status", skip_all)]
1037    async fn get_budget_status(
1038        &self,
1039        id: &BudgetId,
1040    ) -> Result<BudgetStatus, EngineError> {
1041        budget::get_budget_status_impl(&self.pool, &self.partition_config, id).await
1042    }
1043
1044    #[cfg(feature = "core")]
1045    #[tracing::instrument(name = "pg.report_usage_admin", skip_all)]
1046    async fn report_usage_admin(
1047        &self,
1048        budget_id: &BudgetId,
1049        args: ReportUsageAdminArgs,
1050    ) -> Result<ReportUsageResult, EngineError> {
1051        budget::report_usage_admin_impl(&self.pool, &self.partition_config, budget_id, args).await
1052    }
1053
1054    // ── HMAC secret rotation (v0.7 migration-master Q4) ──
1055    //
1056    // Wave 4 replaces this stub with a single INSERT into
1057    // `ff_waitpoint_hmac(kid, secret, rotated_at)`. Wave 0/1 keep
1058    // the `Unavailable` shape so a running Postgres backend surfaces
1059    // the unimplemented op loudly rather than silently no-op'ing.
1060    #[tracing::instrument(name = "pg.rotate_waitpoint_hmac_secret_all", skip_all)]
1061    async fn rotate_waitpoint_hmac_secret_all(
1062        &self,
1063        args: RotateWaitpointHmacSecretAllArgs,
1064    ) -> Result<RotateWaitpointHmacSecretAllResult, EngineError> {
1065        // Wave 4 Agent D: Q4 single-global-row write against
1066        // `ff_waitpoint_hmac`. `now_ms` is captured here (not
1067        // inside the impl) so tests can inject a deterministic
1068        // clock via the pool layer in the future.
1069        let now_ms = std::time::SystemTime::now()
1070            .duration_since(std::time::UNIX_EPOCH)
1071            .map(|d| d.as_millis() as i64)
1072            .unwrap_or(0);
1073        signal::rotate_waitpoint_hmac_secret_all_impl(&self.pool, args, now_ms).await
1074    }
1075
1076    #[tracing::instrument(name = "pg.seed_waitpoint_hmac_secret", skip_all)]
1077    async fn seed_waitpoint_hmac_secret(
1078        &self,
1079        args: SeedWaitpointHmacSecretArgs,
1080    ) -> Result<SeedOutcome, EngineError> {
1081        // Issue #280: install-only boot-time seed against the global
1082        // `ff_waitpoint_hmac` table. Idempotent — cairn calls this on
1083        // every boot and the backend decides whether to INSERT.
1084        let now_ms = std::time::SystemTime::now()
1085            .duration_since(std::time::UNIX_EPOCH)
1086            .map(|d| d.as_millis() as i64)
1087            .unwrap_or(0);
1088        signal::seed_waitpoint_hmac_secret_impl(&self.pool, args, now_ms).await
1089    }
1090
1091    // ── Stream reads (streaming feature) ──
1092
1093    #[cfg(feature = "streaming")]
1094    #[tracing::instrument(name = "pg.read_stream", skip_all)]
1095    async fn read_stream(
1096        &self,
1097        execution_id: &ExecutionId,
1098        attempt_index: AttemptIndex,
1099        from: StreamCursor,
1100        to: StreamCursor,
1101        count_limit: u64,
1102    ) -> Result<StreamFrames, EngineError> {
1103        stream::read_stream(&self.pool, execution_id, attempt_index, from, to, count_limit).await
1104    }
1105
1106    #[cfg(feature = "streaming")]
1107    #[tracing::instrument(name = "pg.tail_stream", skip_all)]
1108    async fn tail_stream(
1109        &self,
1110        execution_id: &ExecutionId,
1111        attempt_index: AttemptIndex,
1112        after: StreamCursor,
1113        block_ms: u64,
1114        count_limit: u64,
1115        visibility: TailVisibility,
1116    ) -> Result<StreamFrames, EngineError> {
1117        let notifier = self
1118            .stream_notifier
1119            .as_ref()
1120            .ok_or(EngineError::Unavailable {
1121                op: "pg.tail_stream (notifier not initialised)",
1122            })?;
1123        stream::tail_stream(
1124            &self.pool,
1125            notifier,
1126            execution_id,
1127            attempt_index,
1128            after,
1129            block_ms,
1130            count_limit,
1131            visibility,
1132        )
1133        .await
1134    }
1135
1136    #[cfg(feature = "streaming")]
1137    #[tracing::instrument(name = "pg.read_summary", skip_all)]
1138    async fn read_summary(
1139        &self,
1140        execution_id: &ExecutionId,
1141        attempt_index: AttemptIndex,
1142    ) -> Result<Option<SummaryDocument>, EngineError> {
1143        stream::read_summary(&self.pool, execution_id, attempt_index).await
1144    }
1145
1146    // ── RFC-019 Stage A — `subscribe_completion` ──────────────────
1147    //
1148    // Postgres real impl. Wraps the existing `ff_completion_event`
1149    // outbox + `LISTEN ff_completion` machinery
1150    // (see `completion::subscribe`) and adapts each completion
1151    // payload into a `stream_subscribe::StreamEvent`.
1152    //
1153    // Cursor encoding: `POSTGRES_CURSOR_PREFIX (0x02)` + `event_id`
1154    // (i64 BE). Stage A resume-from-cursor is not plumbed through the
1155    // adapter (the existing subscriber tails from `max(event_id)`);
1156    // Stage B threads the cursor into the replay path. The surface is
1157    // correct today for consumers that subscribe from tail and
1158    // persist cursors for future resume.
1159    #[tracing::instrument(name = "pg.subscribe_completion", skip_all)]
1160    async fn subscribe_completion(
1161        &self,
1162        _cursor: ff_core::stream_subscribe::StreamCursor,
1163        filter: &ff_core::backend::ScannerFilter,
1164    ) -> Result<ff_core::stream_events::CompletionSubscription, EngineError> {
1165        use ff_core::stream_events::{CompletionEvent, CompletionOutcome};
1166        use ff_core::stream_subscribe::encode_postgres_event_cursor;
1167        use futures_core::Stream;
1168        use std::pin::Pin;
1169        use std::task::{Context, Poll};
1170
1171        // Delegate to the existing CompletionBackend implementation so
1172        // the LISTEN/replay machinery is shared. When a non-noop
1173        // `ScannerFilter` (#282) is supplied, route through the
1174        // `_filtered` variant so the outbox-inline SQL filter applies.
1175        // Resume-from-cursor is still unwired (Stage A surface tails
1176        // from tail).
1177        let inner = if filter.is_noop() {
1178            ff_core::completion_backend::CompletionBackend::subscribe_completions(self).await?
1179        } else {
1180            ff_core::completion_backend::CompletionBackend::subscribe_completions_filtered(
1181                self, filter,
1182            )
1183            .await?
1184        };
1185
1186        struct Adapter {
1187            inner: ff_core::completion_backend::CompletionStream,
1188        }
1189
1190        impl Stream for Adapter {
1191            type Item = Result<CompletionEvent, EngineError>;
1192            fn poll_next(
1193                mut self: Pin<&mut Self>,
1194                cx: &mut Context<'_>,
1195            ) -> Poll<Option<Self::Item>> {
1196                match Pin::new(&mut self.inner).poll_next(cx) {
1197                    Poll::Pending => Poll::Pending,
1198                    Poll::Ready(None) => Poll::Ready(None),
1199                    Poll::Ready(Some(payload)) => {
1200                        // Placeholder cursor (0-event_id) because
1201                        // `CompletionPayload` does not surface
1202                        // `event_id` today. Family prefix stays stable
1203                        // so persistence is forward-compatible.
1204                        let cursor = encode_postgres_event_cursor(0);
1205                        let event = CompletionEvent::new(
1206                            cursor,
1207                            payload.execution_id.clone(),
1208                            CompletionOutcome::from_wire(&payload.outcome),
1209                            payload.produced_at_ms,
1210                        );
1211                        Poll::Ready(Some(Ok(event)))
1212                    }
1213                }
1214            }
1215        }
1216
1217        Ok(Box::pin(Adapter { inner }))
1218    }
1219
1220    // ── RFC-019 Stage B — `subscribe_lease_history` ──────────────
1221    //
1222    // Real Postgres impl. Tails the `ff_lease_event` outbox (written
1223    // by producer sites in `attempt.rs`, `flow.rs`, `suspend_ops.rs`,
1224    // and the `attempt_timeout` / `lease_expiry` reconcilers) via
1225    // `LISTEN ff_lease_event` + catch-up SELECT. Cursor encoding
1226    // matches `subscribe_completion`: `0x02 ++ event_id(BE8)`.
1227    //
1228    // Partition scope: hardcoded to partition 0 — mirrors the Valkey
1229    // Stage A impl, which tails partition 0's aggregate stream key.
1230    // Cross-partition consumers instantiate one backend per
1231    // partition + merge streams consumer-side (RFC-019 §Backend
1232    // Semantics).
1233    #[tracing::instrument(name = "pg.subscribe_lease_history", skip_all)]
1234    async fn subscribe_lease_history(
1235        &self,
1236        cursor: ff_core::stream_subscribe::StreamCursor,
1237        filter: &ff_core::backend::ScannerFilter,
1238    ) -> Result<ff_core::stream_events::LeaseHistorySubscription, EngineError> {
1239        lease_event_subscribe::subscribe(&self.pool, 0, cursor, filter.clone()).await
1240    }
1241
1242    // ── RFC-019 Stage B — `subscribe_signal_delivery` (#310) ─────
1243    //
1244    // Tails the `ff_signal_event` outbox (written by the producer
1245    // INSERT in `suspend_ops::deliver_signal_impl`) via
1246    // `LISTEN ff_signal_event` + catch-up SELECT. Cursor encoding
1247    // matches `subscribe_lease_history`: `0x02 ++ event_id(BE8)`.
1248    //
1249    // Partition scope: hardcoded to partition 0 — mirrors the Valkey
1250    // Stage B impl which tails partition 0's aggregate stream key.
1251    #[tracing::instrument(name = "pg.subscribe_signal_delivery", skip_all)]
1252    async fn subscribe_signal_delivery(
1253        &self,
1254        cursor: ff_core::stream_subscribe::StreamCursor,
1255        filter: &ff_core::backend::ScannerFilter,
1256    ) -> Result<ff_core::stream_events::SignalDeliverySubscription, EngineError> {
1257        signal_delivery_subscribe::subscribe(&self.pool, 0, cursor, filter.clone()).await
1258    }
1259}
1260
1261/// Minimum recommended `max_locks_per_transaction`. Partition-heavy
1262/// schemas (256 hash partitions per logical table) can exceed the
1263/// Postgres default of `64` per tx under modest concurrent bench
1264/// load — the Wave 7c bench hit `out of shared memory` at 16 workers
1265/// × 10k tasks with the default and unblocked at `512`. We warn at
1266/// boot rather than hard-fail because operators may legitimately
1267/// run with a tuned value that still exceeds 64 but sits below our
1268/// threshold.
1269const MIN_MAX_LOCKS_PER_TRANSACTION: i64 = 256;
1270
1271/// Probe `max_locks_per_transaction` at connect time + log a warning
1272/// when the current value is below the production-safe threshold.
1273/// Never fails the connect — probe errors are logged at debug and
1274/// swallowed (pg_show may be restricted on exotic deploys).
1275async fn warn_if_max_locks_low(pool: &PgPool) {
1276    let row: Result<(String,), sqlx::Error> =
1277        sqlx::query_as("SHOW max_locks_per_transaction")
1278            .fetch_one(pool)
1279            .await;
1280    match row {
1281        Ok((raw,)) => emit_max_locks_decision(&raw),
1282        Err(e) => {
1283            tracing::debug!("failed to probe max_locks_per_transaction: {e}");
1284        }
1285    }
1286}
1287
1288/// Pure decision surface for the max-locks probe — extracted for
1289/// unit-testability (the live probe is gated by a running Postgres).
1290/// Returns the integer value when a warning SHOULD fire, `None`
1291/// otherwise (either the raw is valid + at/above threshold, or the
1292/// raw is unparseable — the latter is debug-only).
1293fn max_locks_warn_value(raw: &str) -> Option<i64> {
1294    match raw.parse::<i64>() {
1295        Ok(v) if v < MIN_MAX_LOCKS_PER_TRANSACTION => Some(v),
1296        Ok(_) => None,
1297        Err(e) => {
1298            tracing::debug!(raw, "failed to parse max_locks_per_transaction: {e}");
1299            None
1300        }
1301    }
1302}
1303
1304fn emit_max_locks_decision(raw: &str) {
1305    if let Some(v) = max_locks_warn_value(raw) {
1306        tracing::warn!(
1307            current = v,
1308            recommended = MIN_MAX_LOCKS_PER_TRANSACTION,
1309            "postgres max_locks_per_transaction={v} is below the recommended \
1310             minimum ({MIN_MAX_LOCKS_PER_TRANSACTION}); partition-heavy workloads \
1311             may hit 'out of shared memory' under concurrent load. \
1312             See docs/operator-guide-postgres.md."
1313        );
1314    }
1315}
1316
1317#[cfg(test)]
1318mod max_locks_tests {
1319    use super::{max_locks_warn_value, MIN_MAX_LOCKS_PER_TRANSACTION};
1320
1321    #[test]
1322    fn warns_when_below_threshold() {
1323        assert_eq!(max_locks_warn_value("64"), Some(64));
1324        assert_eq!(
1325            max_locks_warn_value(&(MIN_MAX_LOCKS_PER_TRANSACTION - 1).to_string()),
1326            Some(MIN_MAX_LOCKS_PER_TRANSACTION - 1)
1327        );
1328    }
1329
1330    #[test]
1331    fn silent_at_or_above_threshold() {
1332        assert_eq!(
1333            max_locks_warn_value(&MIN_MAX_LOCKS_PER_TRANSACTION.to_string()),
1334            None
1335        );
1336        assert_eq!(max_locks_warn_value("1024"), None);
1337    }
1338
1339    #[test]
1340    fn silent_for_unparseable_raw() {
1341        assert_eq!(max_locks_warn_value("not-a-number"), None);
1342    }
1343}