Skip to main content

ff_backend_postgres/
lib.rs

1//! `EngineBackend` implementation backed by Postgres.
2//!
3//! **RFC-v0.7 Wave 0 — scaffold.** This crate lands the
4//! [`PostgresBackend`] struct + the `impl EngineBackend` block with
5//! every method stubbed to return `EngineError::Unavailable`.
6//! Subsequent waves fill in bodies:
7//!
8//! * Wave 1: cross-cutting error/helpers.
9//! * Wave 2: describe / list / resolve (read surface).
10//! * Wave 3: schema migrations (replaces the Wave 0 placeholder).
11//! * Wave 4: LISTEN/NOTIFY wiring + stream reads/tails.
12//! * Wave 5-7: hot-path write methods.
13//! * Wave 8: ff-server wire-up + dual-backend running.
14//!
15//! The trait stays object-safe; consumers can hold
16//! `Arc<dyn EngineBackend>`. No ferriskey dep — this crate's
17//! transport is `sqlx`.
18
19#![allow(clippy::result_large_err)]
20
21use std::sync::Arc;
22use std::time::Duration;
23
24use async_trait::async_trait;
25
26use ff_core::backend::{
27    AppendFrameOutcome, BackendConfig, CancelFlowPolicy, CancelFlowWait, CapabilitySet,
28    ClaimPolicy, FailOutcome, FailureClass, FailureReason, Frame, Handle, LeaseRenewal,
29    PendingWaitpoint, ReclaimToken, ResumeSignal, SummaryDocument, TailVisibility,
30    UsageDimensions,
31};
32#[cfg(feature = "core")]
33use ff_core::contracts::{
34    AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
35    ApplyDependencyToChildResult, ClaimResumedExecutionArgs, ClaimResumedExecutionResult,
36    CreateExecutionArgs, CreateExecutionResult, CreateFlowArgs, CreateFlowResult,
37    DeliverSignalArgs, DeliverSignalResult, EdgeDependencyPolicy, EdgeDirection, EdgeSnapshot,
38    ListExecutionsPage, ListFlowsPage, ListLanesPage, ListSuspendedPage,
39    SetEdgeGroupPolicyResult, StageDependencyEdgeArgs, StageDependencyEdgeResult,
40};
41#[cfg(feature = "core")]
42use ff_core::state::PublicState;
43use ff_core::contracts::{
44    CancelFlowResult, ExecutionSnapshot, FlowSnapshot, ReportUsageResult,
45    RotateWaitpointHmacSecretAllArgs, RotateWaitpointHmacSecretAllResult, SuspendArgs,
46    SuspendOutcome,
47};
48#[cfg(feature = "streaming")]
49use ff_core::contracts::{StreamCursor, StreamFrames};
50use ff_core::engine_backend::EngineBackend;
51use ff_core::engine_error::EngineError;
52#[cfg(feature = "core")]
53use ff_core::partition::PartitionKey;
54use ff_core::partition::PartitionConfig;
55#[cfg(feature = "streaming")]
56use ff_core::types::AttemptIndex;
57#[cfg(feature = "core")]
58use ff_core::types::EdgeId;
59use ff_core::types::{BudgetId, ExecutionId, FlowId, LaneId, TimestampMs};
60// Wave 5a — re-export `PgPool` so crates that depend on
61// `ff-backend-postgres` (and not `sqlx` directly) can name the pool
62// type in their own APIs (e.g. `ff-engine::dispatch_via_postgres`).
63pub use sqlx::PgPool;
64
65#[cfg(feature = "core")]
66mod admin;
67pub mod attempt;
68pub mod budget;
69pub mod completion;
70#[cfg(feature = "core")]
71pub mod dispatch;
72pub mod error;
73pub mod exec_core;
74pub mod flow;
75#[cfg(feature = "core")]
76pub mod flow_staging;
77pub mod handle_codec;
78pub mod listener;
79pub mod migrate;
80pub mod pool;
81#[cfg(feature = "core")]
82pub mod reconcilers;
83#[cfg(feature = "core")]
84pub mod scanner_supervisor;
85#[cfg(feature = "core")]
86pub mod scheduler;
87pub mod signal;
88#[cfg(feature = "streaming")]
89pub mod stream;
90pub mod suspend;
91pub mod suspend_ops;
92pub mod version;
93
94pub use completion::{PostgresCompletionStream, COMPLETION_CHANNEL};
95pub use error::{map_sqlx_error, PostgresTransportError};
96pub use listener::StreamNotifier;
97pub use migrate::{apply_migrations, MigrationError};
98#[cfg(feature = "core")]
99pub use scanner_supervisor::{PostgresScannerConfig, PostgresScannerHandle};
100pub use version::check_schema_version;
101
102// Re-export the new `PostgresConnection` shape so consumers can name
103// it from this crate directly without dipping into `ff_core::backend`.
104// `BackendConfig` is already imported above and is part of the
105// `connect()` signature, so it re-exports transparently via
106// rustdoc — no explicit `pub use` needed.
107pub use ff_core::backend::PostgresConnection;
108
109/// Postgres-backed `EngineBackend`.
110///
111/// Wave 0 shape: holds a `sqlx::PgPool`, the deployment's
112/// [`PartitionConfig`] (Q5 — partition column survives on Postgres
113/// with hash partitioning across the same 256 slots Valkey uses),
114/// and an optional `ff_observability::Metrics` handle mirroring
115/// [`ff_backend_valkey::ValkeyBackend`]. Future waves add the
116/// [`StreamNotifier`] handle once Wave 4 wires up LISTEN/NOTIFY.
117pub struct PostgresBackend {
118    #[allow(dead_code)] // filled in across waves 2-7
119    pool: PgPool,
120    #[allow(dead_code)]
121    partition_config: PartitionConfig,
122    #[allow(dead_code)]
123    metrics: Option<Arc<ff_observability::Metrics>>,
124    /// Wave 4: shared LISTEN notifier. Present on `connect()`-built
125    /// backends; `None` on bare `from_pool` constructions that skip
126    /// LISTEN wiring (tests that only exercise the write path).
127    #[allow(dead_code)]
128    stream_notifier: Option<Arc<StreamNotifier>>,
129    /// RFC-017 Wave 8 Stage E3: scanner supervisor handle. Spawned
130    /// during [`Self::connect_with_metrics`] when the caller opts in
131    /// via [`Self::spawn_scanners_during_connect`]; drained on
132    /// [`EngineBackend::shutdown_prepare`]. `None` on `from_pool` /
133    /// test builds that don't want background reconcilers.
134    #[cfg(feature = "core")]
135    scanner_handle: Option<Arc<scanner_supervisor::PostgresScannerHandle>>,
136}
137
138impl PostgresBackend {
139    /// Dial Postgres with [`BackendConfig`] and return the backend as
140    /// `Arc<dyn EngineBackend>`. Modeled on
141    /// [`ff_backend_valkey::ValkeyBackend::connect`] so ff-server /
142    /// SDK call sites can swap backends without changing the
143    /// constructor shape.
144    ///
145    /// **Wave 0:** builds the pool and constructs the backend. Does
146    /// NOT run migrations (Q12 — operator out-of-band). Does NOT run
147    /// the schema-version check (Wave 3 adds the version const and
148    /// wires [`check_schema_version`] in). Does NOT start the LISTEN
149    /// task (Wave 4).
150    ///
151    /// Returns `EngineError::Unavailable` when the config's
152    /// connection arm is not Postgres.
153    pub async fn connect(config: BackendConfig) -> Result<Arc<dyn EngineBackend>, EngineError> {
154        let pool = pool::build_pool(&config).await?;
155        warn_if_max_locks_low(&pool).await;
156        let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
157        let backend = Self {
158            pool,
159            partition_config: PartitionConfig::default(),
160            metrics: None,
161            stream_notifier,
162            #[cfg(feature = "core")]
163            scanner_handle: None,
164        };
165        Ok(Arc::new(backend))
166    }
167
168    /// Test / advanced constructor: build a `PostgresBackend` from an
169    /// already-constructed `PgPool` + explicit partition config. No
170    /// network I/O. Useful for integration tests against a shared
171    /// pool and for a future migration CLI that wants to reuse a pool
172    /// across migrate-run + smoke-check.
173    pub fn from_pool(pool: PgPool, partition_config: PartitionConfig) -> Arc<Self> {
174        let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
175        Arc::new(Self {
176            pool,
177            partition_config,
178            metrics: None,
179            stream_notifier,
180            #[cfg(feature = "core")]
181            scanner_handle: None,
182        })
183    }
184
185    /// RFC-017 Wave 8 Stage E1: dial Postgres with an explicit
186    /// [`PartitionConfig`] + shared [`ff_observability::Metrics`].
187    /// Mirrors [`ff_backend_valkey::ValkeyBackend::connect_with_metrics`]
188    /// so `ff-server::Server::start_with_metrics` can wire the Postgres
189    /// branch without reaching into the pool builder directly.
190    ///
191    /// Returns a concrete `Arc<Self>` rather than `Arc<dyn EngineBackend>`
192    /// so the caller can cast to the trait object after any additional
193    /// field installs (parallel to the Valkey path which calls
194    /// `with_scheduler` / `with_stream_semaphore_permits` before the
195    /// cast). Stage E1 does NOT run `apply_migrations` — schema
196    /// provisioning is an operator concern (matches the Wave 0 contract
197    /// on [`Self::connect`]).
198    pub async fn connect_with_metrics(
199        config: BackendConfig,
200        partition_config: PartitionConfig,
201        metrics: Arc<ff_observability::Metrics>,
202    ) -> Result<Arc<Self>, EngineError> {
203        let pool = pool::build_pool(&config).await?;
204        warn_if_max_locks_low(&pool).await;
205        let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
206        Ok(Arc::new(Self {
207            pool,
208            partition_config,
209            metrics: Some(metrics),
210            stream_notifier,
211            #[cfg(feature = "core")]
212            scanner_handle: None,
213        }))
214    }
215
216    /// RFC-017 Wave 8 Stage E3: spawn the six Postgres reconcilers as
217    /// background tick loops. Returns `true` if the scanner handle
218    /// was installed; `false` if the `Arc<Self>` has outstanding
219    /// clones (mirrors the Valkey `with_*` pattern). Callers must
220    /// invoke this before publishing the `Arc<dyn EngineBackend>` so
221    /// the underlying `Arc::get_mut` succeeds.
222    #[cfg(feature = "core")]
223    pub fn with_scanners(
224        self: &mut Arc<Self>,
225        cfg: scanner_supervisor::PostgresScannerConfig,
226    ) -> bool {
227        let Some(inner) = Arc::get_mut(self) else {
228            return false;
229        };
230        let handle = scanner_supervisor::spawn_scanners(inner.pool.clone(), cfg);
231        inner.scanner_handle = Some(Arc::new(handle));
232        true
233    }
234
235    /// Accessor for the underlying `PgPool`. Stage E1 uses this so
236    /// `ff-server::Server::start_with_metrics` can run
237    /// [`apply_migrations`] on the same pool before handing the backend
238    /// out as `Arc<dyn EngineBackend>`.
239    pub fn pool(&self) -> &PgPool {
240        &self.pool
241    }
242
243    /// Create one execution row (+ seed the lane registry if new).
244    ///
245    /// **RFC-017 Stage A:** this inherent method is retained as a
246    /// thin wrapper around the module-level impl so existing in-tree
247    /// callers (ff-server request handlers, integration tests) keep
248    /// compiling. The trait-lifted entry point is
249    /// [`EngineBackend::create_execution`] below, which calls the
250    /// same impl. Return shape differs — inherent returns
251    /// `ExecutionId`, trait returns
252    /// [`CreateExecutionResult`] per RFC-017 §5 — so we cannot simply
253    /// replace the inherent method. A follow-up PR may deprecate
254    /// this inherent alongside the broader ingress shape alignment.
255    #[cfg(feature = "core")]
256    #[tracing::instrument(name = "pg.create_execution", skip_all)]
257    pub async fn create_execution(
258        &self,
259        args: CreateExecutionArgs,
260    ) -> Result<ExecutionId, EngineError> {
261        exec_core::create_execution_impl(&self.pool, &self.partition_config, args).await
262    }
263
264    // ── RFC-017 Stage A: inherent ingress methods retained for
265    // back-compat with in-tree test harnesses + ff-server direct
266    // calls. The trait-lifted peers (`EngineBackend::create_flow`
267    // etc.) delegate to the SAME module-level impls under the hood.
268    // Follow-up PR may sunset these inherents once all in-tree
269    // consumers route through `Arc<dyn EngineBackend>`.
270
271    #[cfg(feature = "core")]
272    #[tracing::instrument(name = "pg.create_flow.inherent", skip_all)]
273    pub async fn create_flow(
274        &self,
275        args: &CreateFlowArgs,
276    ) -> Result<CreateFlowResult, EngineError> {
277        flow_staging::create_flow(&self.pool, &self.partition_config, args).await
278    }
279
280    #[cfg(feature = "core")]
281    #[tracing::instrument(name = "pg.add_execution_to_flow.inherent", skip_all)]
282    pub async fn add_execution_to_flow(
283        &self,
284        args: &AddExecutionToFlowArgs,
285    ) -> Result<AddExecutionToFlowResult, EngineError> {
286        flow_staging::add_execution_to_flow(&self.pool, &self.partition_config, args).await
287    }
288
289    #[cfg(feature = "core")]
290    #[tracing::instrument(name = "pg.stage_dependency_edge.inherent", skip_all)]
291    pub async fn stage_dependency_edge(
292        &self,
293        args: &StageDependencyEdgeArgs,
294    ) -> Result<StageDependencyEdgeResult, EngineError> {
295        flow_staging::stage_dependency_edge(&self.pool, &self.partition_config, args).await
296    }
297
298    #[cfg(feature = "core")]
299    #[tracing::instrument(name = "pg.apply_dependency_to_child.inherent", skip_all)]
300    pub async fn apply_dependency_to_child(
301        &self,
302        args: &ApplyDependencyToChildArgs,
303    ) -> Result<ApplyDependencyToChildResult, EngineError> {
304        flow_staging::apply_dependency_to_child(&self.pool, &self.partition_config, args).await
305    }
306}
307
308/// Short helper: every stub method returns this. Kept as a function
309/// (rather than a macro) so rust-analyzer / IDE jumps show a single
310/// definition site for the Wave 0 stub pattern.
311#[inline]
312fn unavailable<T>(op: &'static str) -> Result<T, EngineError> {
313    Err(EngineError::Unavailable { op })
314}
315
316#[async_trait]
317impl EngineBackend for PostgresBackend {
318    // ── Claim + lifecycle ──
319
320    #[tracing::instrument(name = "pg.claim", skip_all)]
321    async fn claim(
322        &self,
323        lane: &LaneId,
324        capabilities: &CapabilitySet,
325        policy: ClaimPolicy,
326    ) -> Result<Option<Handle>, EngineError> {
327        attempt::claim(&self.pool, lane, capabilities, &policy).await
328    }
329
330    #[tracing::instrument(name = "pg.renew", skip_all)]
331    async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError> {
332        attempt::renew(&self.pool, handle).await
333    }
334
335    #[tracing::instrument(name = "pg.progress", skip_all)]
336    async fn progress(
337        &self,
338        handle: &Handle,
339        percent: Option<u8>,
340        message: Option<String>,
341    ) -> Result<(), EngineError> {
342        attempt::progress(&self.pool, handle, percent, message).await
343    }
344
345    #[tracing::instrument(name = "pg.append_frame", skip_all)]
346    async fn append_frame(
347        &self,
348        handle: &Handle,
349        frame: Frame,
350    ) -> Result<AppendFrameOutcome, EngineError> {
351        #[cfg(feature = "streaming")]
352        {
353            stream::append_frame(&self.pool, &self.partition_config, handle, frame).await
354        }
355        #[cfg(not(feature = "streaming"))]
356        {
357            let _ = (handle, frame);
358            unavailable("pg.append_frame")
359        }
360    }
361
362    #[tracing::instrument(name = "pg.complete", skip_all)]
363    async fn complete(
364        &self,
365        handle: &Handle,
366        payload: Option<Vec<u8>>,
367    ) -> Result<(), EngineError> {
368        attempt::complete(&self.pool, handle, payload).await
369    }
370
371    #[tracing::instrument(name = "pg.fail", skip_all)]
372    async fn fail(
373        &self,
374        handle: &Handle,
375        reason: FailureReason,
376        classification: FailureClass,
377    ) -> Result<FailOutcome, EngineError> {
378        attempt::fail(&self.pool, handle, reason, classification).await
379    }
380
381    #[tracing::instrument(name = "pg.cancel", skip_all)]
382    async fn cancel(&self, handle: &Handle, reason: &str) -> Result<(), EngineError> {
383        let payload = handle_codec::decode_handle(handle)?;
384        exec_core::cancel_impl(
385            &self.pool,
386            &self.partition_config,
387            &payload.execution_id,
388            reason,
389        )
390        .await
391    }
392
393    #[tracing::instrument(name = "pg.suspend", skip_all)]
394    async fn suspend(
395        &self,
396        handle: &Handle,
397        args: SuspendArgs,
398    ) -> Result<SuspendOutcome, EngineError> {
399        suspend_ops::suspend_impl(&self.pool, &self.partition_config, handle, args).await
400    }
401
402    #[tracing::instrument(name = "pg.create_waitpoint", skip_all)]
403    async fn create_waitpoint(
404        &self,
405        _handle: &Handle,
406        _waitpoint_key: &str,
407        _expires_in: Duration,
408    ) -> Result<PendingWaitpoint, EngineError> {
409        unavailable("pg.create_waitpoint")
410    }
411
412    #[tracing::instrument(name = "pg.observe_signals", skip_all)]
413    async fn observe_signals(
414        &self,
415        handle: &Handle,
416    ) -> Result<Vec<ResumeSignal>, EngineError> {
417        suspend_ops::observe_signals_impl(&self.pool, handle).await
418    }
419
420    #[tracing::instrument(name = "pg.claim_from_reclaim", skip_all)]
421    async fn claim_from_reclaim(
422        &self,
423        token: ReclaimToken,
424    ) -> Result<Option<Handle>, EngineError> {
425        attempt::claim_from_reclaim(&self.pool, token).await
426    }
427
428    #[tracing::instrument(name = "pg.delay", skip_all)]
429    async fn delay(
430        &self,
431        handle: &Handle,
432        delay_until: TimestampMs,
433    ) -> Result<(), EngineError> {
434        attempt::delay(&self.pool, handle, delay_until).await
435    }
436
437    #[tracing::instrument(name = "pg.wait_children", skip_all)]
438    async fn wait_children(&self, handle: &Handle) -> Result<(), EngineError> {
439        attempt::wait_children(&self.pool, handle).await
440    }
441
442    // ── Read / admin ──
443
444    #[tracing::instrument(name = "pg.describe_execution", skip_all)]
445    async fn describe_execution(
446        &self,
447        id: &ExecutionId,
448    ) -> Result<Option<ExecutionSnapshot>, EngineError> {
449        exec_core::describe_execution_impl(&self.pool, &self.partition_config, id).await
450    }
451
452    #[tracing::instrument(name = "pg.describe_flow", skip_all)]
453    async fn describe_flow(
454        &self,
455        id: &FlowId,
456    ) -> Result<Option<FlowSnapshot>, EngineError> {
457        flow::describe_flow(&self.pool, &self.partition_config, id).await
458    }
459
460    #[cfg(feature = "core")]
461    #[tracing::instrument(name = "pg.list_edges", skip_all)]
462    async fn list_edges(
463        &self,
464        flow_id: &FlowId,
465        direction: EdgeDirection,
466    ) -> Result<Vec<EdgeSnapshot>, EngineError> {
467        flow::list_edges(&self.pool, &self.partition_config, flow_id, direction).await
468    }
469
470    #[cfg(feature = "core")]
471    #[tracing::instrument(name = "pg.describe_edge", skip_all)]
472    async fn describe_edge(
473        &self,
474        flow_id: &FlowId,
475        edge_id: &EdgeId,
476    ) -> Result<Option<EdgeSnapshot>, EngineError> {
477        flow::describe_edge(&self.pool, &self.partition_config, flow_id, edge_id).await
478    }
479
480    #[cfg(feature = "core")]
481    #[tracing::instrument(name = "pg.resolve_execution_flow_id", skip_all)]
482    async fn resolve_execution_flow_id(
483        &self,
484        eid: &ExecutionId,
485    ) -> Result<Option<FlowId>, EngineError> {
486        exec_core::resolve_execution_flow_id_impl(&self.pool, &self.partition_config, eid).await
487    }
488
489    #[cfg(feature = "core")]
490    #[tracing::instrument(name = "pg.list_flows", skip_all)]
491    async fn list_flows(
492        &self,
493        partition: PartitionKey,
494        cursor: Option<FlowId>,
495        limit: usize,
496    ) -> Result<ListFlowsPage, EngineError> {
497        flow::list_flows(&self.pool, partition, cursor, limit).await
498    }
499
500    #[cfg(feature = "core")]
501    #[tracing::instrument(name = "pg.list_lanes", skip_all)]
502    async fn list_lanes(
503        &self,
504        cursor: Option<LaneId>,
505        limit: usize,
506    ) -> Result<ListLanesPage, EngineError> {
507        admin::list_lanes_impl(&self.pool, cursor, limit).await
508    }
509
510    #[cfg(feature = "core")]
511    #[tracing::instrument(name = "pg.list_suspended", skip_all)]
512    async fn list_suspended(
513        &self,
514        partition: PartitionKey,
515        cursor: Option<ExecutionId>,
516        limit: usize,
517    ) -> Result<ListSuspendedPage, EngineError> {
518        admin::list_suspended_impl(&self.pool, partition, cursor, limit).await
519    }
520
521    #[cfg(feature = "core")]
522    #[tracing::instrument(name = "pg.list_executions", skip_all)]
523    async fn list_executions(
524        &self,
525        partition: PartitionKey,
526        cursor: Option<ExecutionId>,
527        limit: usize,
528    ) -> Result<ListExecutionsPage, EngineError> {
529        exec_core::list_executions_impl(
530            &self.pool,
531            &self.partition_config,
532            partition,
533            cursor,
534            limit,
535        )
536        .await
537    }
538
539    // ── Trigger ops (issue #150) ──
540
541    #[cfg(feature = "core")]
542    #[tracing::instrument(name = "pg.deliver_signal", skip_all)]
543    async fn deliver_signal(
544        &self,
545        args: DeliverSignalArgs,
546    ) -> Result<DeliverSignalResult, EngineError> {
547        suspend_ops::deliver_signal_impl(&self.pool, &self.partition_config, args).await
548    }
549
550    #[cfg(feature = "core")]
551    #[tracing::instrument(name = "pg.claim_resumed_execution", skip_all)]
552    async fn claim_resumed_execution(
553        &self,
554        args: ClaimResumedExecutionArgs,
555    ) -> Result<ClaimResumedExecutionResult, EngineError> {
556        suspend_ops::claim_resumed_execution_impl(&self.pool, &self.partition_config, args).await
557    }
558
559    // ── RFC-017 Stage A — ingress (promoted from inherent) ────
560
561    /// RFC-017 Wave 8 Stage E1: lift the inherent
562    /// [`PostgresBackend::create_execution`] onto the trait so
563    /// ff-server's migrated HTTP handler can dispatch to Postgres.
564    /// Post-insert the row is idempotent; the Postgres impl does not
565    /// distinguish `Created` from `Duplicate` at the helper level
566    /// (both paths commit and return the execution id), so we always
567    /// surface `Created { public_state: Waiting }` here. A follow-up
568    /// may lift the distinction if a consumer relies on it.
569    #[cfg(feature = "core")]
570    #[tracing::instrument(name = "pg.create_execution.trait", skip_all)]
571    async fn create_execution(
572        &self,
573        args: CreateExecutionArgs,
574    ) -> Result<CreateExecutionResult, EngineError> {
575        let eid = args.execution_id.clone();
576        exec_core::create_execution_impl(&self.pool, &self.partition_config, args).await?;
577        Ok(CreateExecutionResult::Created {
578            execution_id: eid,
579            public_state: PublicState::Waiting,
580        })
581    }
582
583    #[cfg(feature = "core")]
584    #[tracing::instrument(name = "pg.create_flow", skip_all)]
585    async fn create_flow(
586        &self,
587        args: CreateFlowArgs,
588    ) -> Result<CreateFlowResult, EngineError> {
589        flow_staging::create_flow(&self.pool, &self.partition_config, &args).await
590    }
591
592    #[cfg(feature = "core")]
593    #[tracing::instrument(name = "pg.add_execution_to_flow", skip_all)]
594    async fn add_execution_to_flow(
595        &self,
596        args: AddExecutionToFlowArgs,
597    ) -> Result<AddExecutionToFlowResult, EngineError> {
598        flow_staging::add_execution_to_flow(&self.pool, &self.partition_config, &args).await
599    }
600
601    #[cfg(feature = "core")]
602    #[tracing::instrument(name = "pg.stage_dependency_edge", skip_all)]
603    async fn stage_dependency_edge(
604        &self,
605        args: StageDependencyEdgeArgs,
606    ) -> Result<StageDependencyEdgeResult, EngineError> {
607        flow_staging::stage_dependency_edge(&self.pool, &self.partition_config, &args).await
608    }
609
610    #[cfg(feature = "core")]
611    #[tracing::instrument(name = "pg.apply_dependency_to_child", skip_all)]
612    async fn apply_dependency_to_child(
613        &self,
614        args: ApplyDependencyToChildArgs,
615    ) -> Result<ApplyDependencyToChildResult, EngineError> {
616        flow_staging::apply_dependency_to_child(&self.pool, &self.partition_config, &args).await
617    }
618
619    fn backend_label(&self) -> &'static str {
620        "postgres"
621    }
622
623    /// RFC-017 Wave 8 Stage E3 (§4 row 9, §7): forward the claim to the
624    /// Postgres-native admission pipeline. Returns `NoWork` when no
625    /// eligible execution is admissible this scan cycle. Budget
626    /// breaches surface as `NoWork` (leaving the row eligible for a
627    /// retry by another worker); validation-class rejections
628    /// (malformed partition, unknown kid) surface as typed
629    /// [`EngineError`] variants mapped to the Server's 400/503 arms.
630    #[cfg(feature = "core")]
631    #[tracing::instrument(name = "pg.claim_for_worker", skip_all)]
632    async fn claim_for_worker(
633        &self,
634        args: ff_core::contracts::ClaimForWorkerArgs,
635    ) -> Result<ff_core::contracts::ClaimForWorkerOutcome, EngineError> {
636        let sched = scheduler::PostgresScheduler::new(self.pool.clone());
637        let grant_opt = sched
638            .claim_for_worker(
639                &args.lane_id,
640                &args.worker_id,
641                &args.worker_instance_id,
642                &args.worker_capabilities,
643                args.grant_ttl_ms,
644            )
645            .await?;
646        Ok(match grant_opt {
647            Some(g) => ff_core::contracts::ClaimForWorkerOutcome::granted(g),
648            None => ff_core::contracts::ClaimForWorkerOutcome::no_work(),
649        })
650    }
651
652    async fn ping(&self) -> Result<(), EngineError> {
653        // Postgres analogue to Valkey PING — single-round-trip pool
654        // liveness. Errors propagate as transport-class EngineError via
655        // the existing sqlx→EngineError map.
656        let _ = sqlx::query_scalar::<_, i32>("SELECT 1")
657            .fetch_one(&self.pool)
658            .await
659            .map_err(error::map_sqlx_error)?;
660        Ok(())
661    }
662
663    /// RFC-017 Wave 8 Stage E3: drain the scanner supervisor's
664    /// reconciler tasks up to `grace`, then close the sqlx pool.
665    /// Matches the Valkey backend's shutdown_prepare contract —
666    /// bounded best-effort drain, never returns an error.
667    async fn shutdown_prepare(&self, grace: Duration) -> Result<(), EngineError> {
668        #[cfg(feature = "core")]
669        if let Some(handle) = self.scanner_handle.as_ref() {
670            let timed_out = handle.shutdown(grace).await;
671            if timed_out > 0 {
672                tracing::warn!(
673                    timed_out,
674                    ?grace,
675                    "postgres scanner supervisor exceeded grace on shutdown"
676                );
677            }
678        }
679        Ok(())
680    }
681
682    #[tracing::instrument(name = "pg.cancel_flow", skip_all)]
683    async fn cancel_flow(
684        &self,
685        id: &FlowId,
686        policy: CancelFlowPolicy,
687        wait: CancelFlowWait,
688    ) -> Result<CancelFlowResult, EngineError> {
689        flow::cancel_flow(&self.pool, &self.partition_config, id, policy, wait).await
690    }
691
692    #[cfg(feature = "core")]
693    #[tracing::instrument(name = "pg.set_edge_group_policy", skip_all)]
694    async fn set_edge_group_policy(
695        &self,
696        flow_id: &FlowId,
697        downstream_execution_id: &ExecutionId,
698        policy: EdgeDependencyPolicy,
699    ) -> Result<SetEdgeGroupPolicyResult, EngineError> {
700        flow::set_edge_group_policy(
701            &self.pool,
702            &self.partition_config,
703            flow_id,
704            downstream_execution_id,
705            policy,
706        )
707        .await
708    }
709
710    // ── Budget ──
711
712    #[tracing::instrument(name = "pg.report_usage", skip_all)]
713    async fn report_usage(
714        &self,
715        _handle: &Handle,
716        budget: &BudgetId,
717        dimensions: UsageDimensions,
718    ) -> Result<ReportUsageResult, EngineError> {
719        budget::report_usage_impl(&self.pool, &self.partition_config, budget, dimensions).await
720    }
721
722    // ── HMAC secret rotation (v0.7 migration-master Q4) ──
723    //
724    // Wave 4 replaces this stub with a single INSERT into
725    // `ff_waitpoint_hmac(kid, secret, rotated_at)`. Wave 0/1 keep
726    // the `Unavailable` shape so a running Postgres backend surfaces
727    // the unimplemented op loudly rather than silently no-op'ing.
728    #[tracing::instrument(name = "pg.rotate_waitpoint_hmac_secret_all", skip_all)]
729    async fn rotate_waitpoint_hmac_secret_all(
730        &self,
731        args: RotateWaitpointHmacSecretAllArgs,
732    ) -> Result<RotateWaitpointHmacSecretAllResult, EngineError> {
733        // Wave 4 Agent D: Q4 single-global-row write against
734        // `ff_waitpoint_hmac`. `now_ms` is captured here (not
735        // inside the impl) so tests can inject a deterministic
736        // clock via the pool layer in the future.
737        let now_ms = std::time::SystemTime::now()
738            .duration_since(std::time::UNIX_EPOCH)
739            .map(|d| d.as_millis() as i64)
740            .unwrap_or(0);
741        signal::rotate_waitpoint_hmac_secret_all_impl(&self.pool, args, now_ms).await
742    }
743
744    // ── Stream reads (streaming feature) ──
745
746    #[cfg(feature = "streaming")]
747    #[tracing::instrument(name = "pg.read_stream", skip_all)]
748    async fn read_stream(
749        &self,
750        execution_id: &ExecutionId,
751        attempt_index: AttemptIndex,
752        from: StreamCursor,
753        to: StreamCursor,
754        count_limit: u64,
755    ) -> Result<StreamFrames, EngineError> {
756        stream::read_stream(&self.pool, execution_id, attempt_index, from, to, count_limit).await
757    }
758
759    #[cfg(feature = "streaming")]
760    #[tracing::instrument(name = "pg.tail_stream", skip_all)]
761    async fn tail_stream(
762        &self,
763        execution_id: &ExecutionId,
764        attempt_index: AttemptIndex,
765        after: StreamCursor,
766        block_ms: u64,
767        count_limit: u64,
768        visibility: TailVisibility,
769    ) -> Result<StreamFrames, EngineError> {
770        let notifier = self
771            .stream_notifier
772            .as_ref()
773            .ok_or(EngineError::Unavailable {
774                op: "pg.tail_stream (notifier not initialised)",
775            })?;
776        stream::tail_stream(
777            &self.pool,
778            notifier,
779            execution_id,
780            attempt_index,
781            after,
782            block_ms,
783            count_limit,
784            visibility,
785        )
786        .await
787    }
788
789    #[cfg(feature = "streaming")]
790    #[tracing::instrument(name = "pg.read_summary", skip_all)]
791    async fn read_summary(
792        &self,
793        execution_id: &ExecutionId,
794        attempt_index: AttemptIndex,
795    ) -> Result<Option<SummaryDocument>, EngineError> {
796        stream::read_summary(&self.pool, execution_id, attempt_index).await
797    }
798}
799
800/// Minimum recommended `max_locks_per_transaction`. Partition-heavy
801/// schemas (256 hash partitions per logical table) can exceed the
802/// Postgres default of `64` per tx under modest concurrent bench
803/// load — the Wave 7c bench hit `out of shared memory` at 16 workers
804/// × 10k tasks with the default and unblocked at `512`. We warn at
805/// boot rather than hard-fail because operators may legitimately
806/// run with a tuned value that still exceeds 64 but sits below our
807/// threshold.
808const MIN_MAX_LOCKS_PER_TRANSACTION: i64 = 256;
809
810/// Probe `max_locks_per_transaction` at connect time + log a warning
811/// when the current value is below the production-safe threshold.
812/// Never fails the connect — probe errors are logged at debug and
813/// swallowed (pg_show may be restricted on exotic deploys).
814async fn warn_if_max_locks_low(pool: &PgPool) {
815    let row: Result<(String,), sqlx::Error> =
816        sqlx::query_as("SHOW max_locks_per_transaction")
817            .fetch_one(pool)
818            .await;
819    match row {
820        Ok((raw,)) => emit_max_locks_decision(&raw),
821        Err(e) => {
822            tracing::debug!("failed to probe max_locks_per_transaction: {e}");
823        }
824    }
825}
826
827/// Pure decision surface for the max-locks probe — extracted for
828/// unit-testability (the live probe is gated by a running Postgres).
829/// Returns the integer value when a warning SHOULD fire, `None`
830/// otherwise (either the raw is valid + at/above threshold, or the
831/// raw is unparseable — the latter is debug-only).
832fn max_locks_warn_value(raw: &str) -> Option<i64> {
833    match raw.parse::<i64>() {
834        Ok(v) if v < MIN_MAX_LOCKS_PER_TRANSACTION => Some(v),
835        Ok(_) => None,
836        Err(e) => {
837            tracing::debug!(raw, "failed to parse max_locks_per_transaction: {e}");
838            None
839        }
840    }
841}
842
843fn emit_max_locks_decision(raw: &str) {
844    if let Some(v) = max_locks_warn_value(raw) {
845        tracing::warn!(
846            current = v,
847            recommended = MIN_MAX_LOCKS_PER_TRANSACTION,
848            "postgres max_locks_per_transaction={v} is below the recommended \
849             minimum ({MIN_MAX_LOCKS_PER_TRANSACTION}); partition-heavy workloads \
850             may hit 'out of shared memory' under concurrent load. \
851             See docs/operator-guide-postgres.md."
852        );
853    }
854}
855
856#[cfg(test)]
857mod max_locks_tests {
858    use super::{max_locks_warn_value, MIN_MAX_LOCKS_PER_TRANSACTION};
859
860    #[test]
861    fn warns_when_below_threshold() {
862        assert_eq!(max_locks_warn_value("64"), Some(64));
863        assert_eq!(
864            max_locks_warn_value(&(MIN_MAX_LOCKS_PER_TRANSACTION - 1).to_string()),
865            Some(MIN_MAX_LOCKS_PER_TRANSACTION - 1)
866        );
867    }
868
869    #[test]
870    fn silent_at_or_above_threshold() {
871        assert_eq!(
872            max_locks_warn_value(&MIN_MAX_LOCKS_PER_TRANSACTION.to_string()),
873            None
874        );
875        assert_eq!(max_locks_warn_value("1024"), None);
876    }
877
878    #[test]
879    fn silent_for_unparseable_raw() {
880        assert_eq!(max_locks_warn_value("not-a-number"), None);
881    }
882}