Skip to main content

ff_backend_postgres/
lib.rs

1//! `EngineBackend` implementation backed by Postgres.
2//!
3//! **RFC-v0.7 Wave 0 — scaffold.** This crate lands the
4//! [`PostgresBackend`] struct + the `impl EngineBackend` block with
5//! every method stubbed to return `EngineError::Unavailable`.
6//! Subsequent waves fill in bodies:
7//!
8//! * Wave 1: cross-cutting error/helpers.
9//! * Wave 2: describe / list / resolve (read surface).
10//! * Wave 3: schema migrations (replaces the Wave 0 placeholder).
11//! * Wave 4: LISTEN/NOTIFY wiring + stream reads/tails.
12//! * Wave 5-7: hot-path write methods.
13//! * Wave 8: ff-server wire-up + dual-backend running.
14//!
15//! The trait stays object-safe; consumers can hold
16//! `Arc<dyn EngineBackend>`. No ferriskey dep — this crate's
17//! transport is `sqlx`.
18
19#![allow(clippy::result_large_err)]
20
21use std::sync::Arc;
22use std::time::Duration;
23
24use async_trait::async_trait;
25
26use ff_core::backend::{
27    AppendFrameOutcome, BackendConfig, CancelFlowPolicy, CancelFlowWait, CapabilitySet,
28    ClaimPolicy, FailOutcome, FailureClass, FailureReason, Frame, Handle, LeaseRenewal,
29    PendingWaitpoint, ResumeToken, ResumeSignal, SummaryDocument, TailVisibility,
30    UsageDimensions,
31};
32#[cfg(feature = "core")]
33use ff_core::contracts::{
34    AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
35    ApplyDependencyToChildResult, ClaimResumedExecutionArgs, ClaimResumedExecutionResult,
36    CreateExecutionArgs, CreateExecutionResult, CreateFlowArgs, CreateFlowResult,
37    DeliverApprovalSignalArgs, DeliverSignalArgs, DeliverSignalResult, EdgeDependencyPolicy,
38    EdgeDirection, EdgeSnapshot,
39    ListExecutionsPage, ListFlowsPage, ListLanesPage, ListPendingWaitpointsArgs,
40    ListPendingWaitpointsResult, ListSuspendedPage, SetEdgeGroupPolicyResult,
41    StageDependencyEdgeArgs, StageDependencyEdgeResult,
42};
43#[cfg(feature = "core")]
44use ff_core::state::PublicState;
45use ff_core::contracts::{
46    CancelFlowResult, ExecutionContext, ExecutionSnapshot, FlowSnapshot, IssueReclaimGrantArgs,
47    IssueReclaimGrantOutcome, ReclaimExecutionArgs, ReclaimExecutionOutcome, ReportUsageResult,
48    RotateWaitpointHmacSecretAllArgs, RotateWaitpointHmacSecretAllResult, SeedOutcome,
49    SeedWaitpointHmacSecretArgs, SuspendArgs, SuspendOutcome,
50};
51#[cfg(feature = "core")]
52use ff_core::contracts::ExecutionInfo;
53// RFC-020 Wave 9 Spine-A pt.1 — operator-control mutating surfaces.
54#[cfg(feature = "core")]
55use ff_core::contracts::{
56    CancelExecutionArgs, CancelExecutionResult, RevokeLeaseArgs, RevokeLeaseResult,
57};
58// RFC-020 Wave 9 Spine-A pt.2 — operator-control + flow-cancel mutating surfaces.
59#[cfg(feature = "core")]
60use ff_core::contracts::{
61    CancelFlowArgs, CancelFlowHeader, ChangePriorityArgs, ChangePriorityResult,
62    ReplayExecutionArgs, ReplayExecutionResult,
63};
64// RFC-020 Wave 9 Standalone-1 — budget/quota admin surfaces.
65#[cfg(feature = "core")]
66use ff_core::contracts::{
67    BudgetStatus, CreateBudgetArgs, CreateBudgetResult, CreateQuotaPolicyArgs,
68    CreateQuotaPolicyResult, ReportUsageAdminArgs, ResetBudgetArgs, ResetBudgetResult,
69};
70#[cfg(feature = "streaming")]
71use ff_core::contracts::{StreamCursor, StreamFrames};
72use ff_core::engine_backend::{EngineBackend, ExpirePhase};
73use ff_core::engine_error::EngineError;
74#[cfg(feature = "core")]
75use ff_core::partition::PartitionKey;
76use ff_core::partition::{Partition, PartitionConfig};
77#[cfg(feature = "streaming")]
78use ff_core::types::AttemptIndex;
79#[cfg(feature = "core")]
80use ff_core::types::EdgeId;
81use ff_core::types::{BudgetId, ExecutionId, FlowId, LaneId, LeaseFence, TimestampMs};
82// Wave 5a — re-export `PgPool` so crates that depend on
83// `ff-backend-postgres` (and not `sqlx` directly) can name the pool
84// type in their own APIs (e.g. `ff-engine::dispatch_via_postgres`).
85pub use sqlx::PgPool;
86
87#[cfg(feature = "core")]
88mod admin;
89pub mod attempt;
90pub mod budget;
91pub mod claim_grant;
92pub mod completion;
93#[cfg(feature = "core")]
94pub mod dispatch;
95pub mod error;
96pub mod exec_core;
97pub mod flow;
98#[cfg(feature = "core")]
99pub mod flow_staging;
100pub mod handle_codec;
101mod lease_event;
102mod lease_event_subscribe;
103pub mod listener;
104pub mod migrate;
105#[cfg(feature = "core")]
106pub mod operator;
107#[cfg(feature = "core")]
108mod operator_event;
109pub mod pool;
110#[cfg(feature = "core")]
111pub mod reconcilers;
112#[cfg(feature = "core")]
113pub mod scanner_supervisor;
114#[cfg(feature = "core")]
115pub mod scheduler;
116pub mod signal;
117mod signal_delivery_subscribe;
118mod signal_event;
119#[cfg(feature = "streaming")]
120pub mod stream;
121pub mod suspend;
122pub mod suspend_ops;
123#[cfg(feature = "core")]
124pub(crate) mod typed_ops;
125pub mod version;
126#[cfg(feature = "core")]
127pub mod worker_registry;
128
129pub use completion::{PostgresCompletionStream, COMPLETION_CHANNEL};
130pub use error::{map_sqlx_error, PostgresTransportError};
131pub use listener::StreamNotifier;
132pub use migrate::{apply_migrations, MigrationError};
133#[cfg(feature = "core")]
134pub use scanner_supervisor::{PostgresScannerConfig, PostgresScannerHandle};
135pub use version::check_schema_version;
136
137// Re-export the new `PostgresConnection` shape so consumers can name
138// it from this crate directly without dipping into `ff_core::backend`.
139// `BackendConfig` is already imported above and is part of the
140// `connect()` signature, so it re-exports transparently via
141// rustdoc — no explicit `pub use` needed.
142pub use ff_core::backend::PostgresConnection;
143
144/// Postgres-backed `EngineBackend`.
145///
146/// Wave 0 shape: holds a `sqlx::PgPool`, the deployment's
147/// [`PartitionConfig`] (Q5 — partition column survives on Postgres
148/// with hash partitioning across the same 256 slots Valkey uses),
149/// and an optional `ff_observability::Metrics` handle mirroring
150/// [`ff_backend_valkey::ValkeyBackend`]. Future waves add the
151/// [`StreamNotifier`] handle once Wave 4 wires up LISTEN/NOTIFY.
152/// RFC-018 Stage A: build a [`ff_core::capability::Supports`]
153/// snapshot for the Postgres backend at v0.9. `true` fields correspond
154/// to trait methods `PostgresBackend` overrides with a real body
155/// (ingress, scheduler, seed + rotate HMAC, flow cancel bulk path,
156/// stream reads, RFC-019 subscriptions, cross-cutting). `false` fields
157/// correspond to trait methods that still return
158/// `EngineError::Unavailable` on Postgres today — Wave 9 follow-up
159/// scope. See `docs/POSTGRES_PARITY_MATRIX.md` for the authoritative
160/// per-row status.
161///
162/// `prepare` is `true` on Postgres even though `prepare()` returns
163/// `PrepareOutcome::NoOp` (schema migrations are applied out-of-band).
164/// `Supports.prepare` means "can the consumer call `backend.prepare()`
165/// without getting `EngineError::Unavailable`?" — for Postgres the
166/// answer is yes; NoOp is a successful well-defined outcome. Gating
167/// the call off in consumer UIs based on a `false` bool would hide
168/// a callable + correct method.
169///
170/// `Supports` is `#[non_exhaustive]` so struct-literal construction
171/// from this crate is forbidden; we start from
172/// [`ff_core::capability::Supports::none`] and mutate named fields.
173fn postgres_supports_base() -> ff_core::capability::Supports {
174    let mut s = ff_core::capability::Supports::none();
175
176    // ── Flow bulk cancel (impl) ──
177    s.cancel_flow_wait_timeout = true;
178    s.cancel_flow_wait_indefinite = true;
179
180    // ── Admin seed + rotate HMAC (impl) ──
181    s.rotate_waitpoint_hmac_secret_all = true;
182    s.seed_waitpoint_hmac_secret = true;
183
184    // ── Scheduler ──
185    s.claim_for_worker = true;
186
187    // ── RFC-019 subscriptions ──
188    s.subscribe_lease_history = true;
189    s.subscribe_completion = true;
190    s.subscribe_signal_delivery = true;
191    s.subscribe_instance_tags = false;
192
193    // ── Streaming (RFC-015) ──
194    s.stream_durable_summary = true;
195    s.stream_best_effort_live = true;
196
197    // ── Boot (Postgres returns NoOp but call is callable + correct) ──
198    s.prepare = true;
199
200    // ── Wave 9 (v0.11) — operator control + read model + budget/quota
201    //    admin + list_pending_waitpoints + cancel_flow_header +
202    //    ack_cancel_member all ship concretely on Postgres via
203    //    RFC-020 Rev 7. subscribe_instance_tags remains `false` per
204    //    #311 (speculative demand, served by list_executions +
205    //    ScannerFilter::with_instance_tag today).
206    s.cancel_execution = true;
207    s.change_priority = true;
208    s.replay_execution = true;
209    s.revoke_lease = true;
210    s.read_execution_state = true;
211    s.read_execution_info = true;
212    s.get_execution_result = true;
213    s.budget_admin = true;
214    s.quota_admin = true;
215    s.list_pending_waitpoints = true;
216    s.cancel_flow_header = true;
217    s.ack_cancel_member = true;
218
219    // ── RFC-025 Phase 3 — worker registry ──
220    s.register_worker = true;
221    s.heartbeat_worker = true;
222    s.mark_worker_dead = true;
223    s.list_expired_leases = true;
224    s.list_workers = true;
225
226    s
227}
228
229pub struct PostgresBackend {
230    #[allow(dead_code)] // filled in across waves 2-7
231    pool: PgPool,
232    #[allow(dead_code)]
233    partition_config: PartitionConfig,
234    #[allow(dead_code)]
235    metrics: Option<Arc<ff_observability::Metrics>>,
236    /// Wave 4: shared LISTEN notifier. Present on `connect()`-built
237    /// backends; `None` on bare `from_pool` constructions that skip
238    /// LISTEN wiring (tests that only exercise the write path).
239    #[allow(dead_code)]
240    stream_notifier: Option<Arc<StreamNotifier>>,
241    /// RFC-017 Wave 8 Stage E3: scanner supervisor handle. Spawned
242    /// during [`Self::connect_with_metrics`] when the caller opts in
243    /// via [`Self::spawn_scanners_during_connect`]; drained on
244    /// [`EngineBackend::shutdown_prepare`]. `None` on `from_pool` /
245    /// test builds that don't want background reconcilers.
246    #[cfg(feature = "core")]
247    scanner_handle: Option<Arc<scanner_supervisor::PostgresScannerHandle>>,
248}
249
250impl PostgresBackend {
251    /// Dial Postgres with [`BackendConfig`] and return the backend as
252    /// `Arc<dyn EngineBackend>`. Modeled on
253    /// [`ff_backend_valkey::ValkeyBackend::connect`] so ff-server /
254    /// SDK call sites can swap backends without changing the
255    /// constructor shape.
256    ///
257    /// **Wave 0:** builds the pool and constructs the backend. Does
258    /// NOT run migrations (Q12 — operator out-of-band). Does NOT run
259    /// the schema-version check (Wave 3 adds the version const and
260    /// wires [`check_schema_version`] in). Does NOT start the LISTEN
261    /// task (Wave 4).
262    ///
263    /// Returns `EngineError::Unavailable` when the config's
264    /// connection arm is not Postgres.
265    pub async fn connect(config: BackendConfig) -> Result<Arc<dyn EngineBackend>, EngineError> {
266        let pool = pool::build_pool(&config).await?;
267        warn_if_max_locks_low(&pool).await;
268        let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
269        let backend = Self {
270            pool,
271            partition_config: PartitionConfig::default(),
272            metrics: None,
273            stream_notifier,
274            #[cfg(feature = "core")]
275            scanner_handle: None,
276        };
277        Ok(Arc::new(backend))
278    }
279
280    /// Test / advanced constructor: build a `PostgresBackend` from an
281    /// already-constructed `PgPool` + explicit partition config. No
282    /// network I/O. Useful for integration tests against a shared
283    /// pool and for a future migration CLI that wants to reuse a pool
284    /// across migrate-run + smoke-check.
285    pub fn from_pool(pool: PgPool, partition_config: PartitionConfig) -> Arc<Self> {
286        let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
287        Arc::new(Self {
288            pool,
289            partition_config,
290            metrics: None,
291            stream_notifier,
292            #[cfg(feature = "core")]
293            scanner_handle: None,
294        })
295    }
296
297    /// RFC-017 Wave 8 Stage E1: dial Postgres with an explicit
298    /// [`PartitionConfig`] + shared [`ff_observability::Metrics`].
299    /// Mirrors [`ff_backend_valkey::ValkeyBackend::connect_with_metrics`]
300    /// so `ff-server::Server::start_with_metrics` can wire the Postgres
301    /// branch without reaching into the pool builder directly.
302    ///
303    /// Returns a concrete `Arc<Self>` rather than `Arc<dyn EngineBackend>`
304    /// so the caller can cast to the trait object after any additional
305    /// field installs (parallel to the Valkey path which calls
306    /// `with_scheduler` / `with_stream_semaphore_permits` before the
307    /// cast). Stage E1 does NOT run `apply_migrations` — schema
308    /// provisioning is an operator concern (matches the Wave 0 contract
309    /// on [`Self::connect`]).
310    pub async fn connect_with_metrics(
311        config: BackendConfig,
312        partition_config: PartitionConfig,
313        metrics: Arc<ff_observability::Metrics>,
314    ) -> Result<Arc<Self>, EngineError> {
315        let pool = pool::build_pool(&config).await?;
316        warn_if_max_locks_low(&pool).await;
317        let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
318        Ok(Arc::new(Self {
319            pool,
320            partition_config,
321            metrics: Some(metrics),
322            stream_notifier,
323            #[cfg(feature = "core")]
324            scanner_handle: None,
325        }))
326    }
327
328    /// RFC-017 Wave 8 Stage E3: spawn the six Postgres reconcilers as
329    /// background tick loops. Returns `true` if the scanner handle
330    /// was installed; `false` if the `Arc<Self>` has outstanding
331    /// clones (mirrors the Valkey `with_*` pattern). Callers must
332    /// invoke this before publishing the `Arc<dyn EngineBackend>` so
333    /// the underlying `Arc::get_mut` succeeds.
334    #[cfg(feature = "core")]
335    pub fn with_scanners(
336        self: &mut Arc<Self>,
337        cfg: scanner_supervisor::PostgresScannerConfig,
338    ) -> bool {
339        let Some(inner) = Arc::get_mut(self) else {
340            return false;
341        };
342        let handle = scanner_supervisor::spawn_scanners(inner.pool.clone(), cfg);
343        inner.scanner_handle = Some(Arc::new(handle));
344        true
345    }
346
347    /// Accessor for the underlying `PgPool`. Stage E1 uses this so
348    /// `ff-server::Server::start_with_metrics` can run
349    /// [`apply_migrations`] on the same pool before handing the backend
350    /// out as `Arc<dyn EngineBackend>`.
351    pub fn pool(&self) -> &PgPool {
352        &self.pool
353    }
354
355    /// Create one execution row (+ seed the lane registry if new).
356    ///
357    /// **RFC-017 Stage A:** this inherent method is retained as a
358    /// thin wrapper around the module-level impl so existing in-tree
359    /// callers (ff-server request handlers, integration tests) keep
360    /// compiling. The trait-lifted entry point is
361    /// [`EngineBackend::create_execution`] below, which calls the
362    /// same impl. Return shape differs — inherent returns
363    /// `ExecutionId`, trait returns
364    /// [`CreateExecutionResult`] per RFC-017 §5 — so we cannot simply
365    /// replace the inherent method. A follow-up PR may deprecate
366    /// this inherent alongside the broader ingress shape alignment.
367    #[cfg(feature = "core")]
368    #[tracing::instrument(name = "pg.create_execution", skip_all)]
369    pub async fn create_execution(
370        &self,
371        args: CreateExecutionArgs,
372    ) -> Result<ExecutionId, EngineError> {
373        exec_core::create_execution_impl(&self.pool, &self.partition_config, args).await
374    }
375
376    // ── RFC-017 Stage A: inherent ingress methods retained for
377    // back-compat with in-tree test harnesses + ff-server direct
378    // calls. The trait-lifted peers (`EngineBackend::create_flow`
379    // etc.) delegate to the SAME module-level impls under the hood.
380    // Follow-up PR may sunset these inherents once all in-tree
381    // consumers route through `Arc<dyn EngineBackend>`.
382
383    #[cfg(feature = "core")]
384    #[tracing::instrument(name = "pg.create_flow.inherent", skip_all)]
385    pub async fn create_flow(
386        &self,
387        args: &CreateFlowArgs,
388    ) -> Result<CreateFlowResult, EngineError> {
389        flow_staging::create_flow(&self.pool, &self.partition_config, args).await
390    }
391
392    #[cfg(feature = "core")]
393    #[tracing::instrument(name = "pg.add_execution_to_flow.inherent", skip_all)]
394    pub async fn add_execution_to_flow(
395        &self,
396        args: &AddExecutionToFlowArgs,
397    ) -> Result<AddExecutionToFlowResult, EngineError> {
398        flow_staging::add_execution_to_flow(&self.pool, &self.partition_config, args).await
399    }
400
401    #[cfg(feature = "core")]
402    #[tracing::instrument(name = "pg.stage_dependency_edge.inherent", skip_all)]
403    pub async fn stage_dependency_edge(
404        &self,
405        args: &StageDependencyEdgeArgs,
406    ) -> Result<StageDependencyEdgeResult, EngineError> {
407        flow_staging::stage_dependency_edge(&self.pool, &self.partition_config, args).await
408    }
409
410    #[cfg(feature = "core")]
411    #[tracing::instrument(name = "pg.apply_dependency_to_child.inherent", skip_all)]
412    pub async fn apply_dependency_to_child(
413        &self,
414        args: &ApplyDependencyToChildArgs,
415    ) -> Result<ApplyDependencyToChildResult, EngineError> {
416        flow_staging::apply_dependency_to_child(&self.pool, &self.partition_config, args).await
417    }
418}
419
420/// Short helper: every stub method returns this. Kept as a function
421/// (rather than a macro) so rust-analyzer / IDE jumps show a single
422/// definition site for the Wave 0 stub pattern.
423#[inline]
424#[cfg_attr(feature = "streaming", allow(dead_code))]
425fn unavailable<T>(op: &'static str) -> Result<T, EngineError> {
426    Err(EngineError::Unavailable { op })
427}
428
429#[async_trait]
430impl EngineBackend for PostgresBackend {
431    // ── Claim + lifecycle ──
432
433    #[tracing::instrument(name = "pg.claim", skip_all)]
434    async fn claim(
435        &self,
436        lane: &LaneId,
437        capabilities: &CapabilitySet,
438        policy: ClaimPolicy,
439    ) -> Result<Option<Handle>, EngineError> {
440        attempt::claim(&self.pool, lane, capabilities, &policy).await
441    }
442
443    #[tracing::instrument(name = "pg.renew", skip_all)]
444    async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError> {
445        attempt::renew(&self.pool, handle).await
446    }
447
448    #[tracing::instrument(name = "pg.progress", skip_all)]
449    async fn progress(
450        &self,
451        handle: &Handle,
452        percent: Option<u8>,
453        message: Option<String>,
454    ) -> Result<(), EngineError> {
455        attempt::progress(&self.pool, handle, percent, message).await
456    }
457
458    #[tracing::instrument(name = "pg.append_frame", skip_all)]
459    async fn append_frame(
460        &self,
461        handle: &Handle,
462        frame: Frame,
463    ) -> Result<AppendFrameOutcome, EngineError> {
464        #[cfg(feature = "streaming")]
465        {
466            stream::append_frame(&self.pool, &self.partition_config, handle, frame).await
467        }
468        #[cfg(not(feature = "streaming"))]
469        {
470            let _ = (handle, frame);
471            unavailable("pg.append_frame")
472        }
473    }
474
475    #[tracing::instrument(name = "pg.complete", skip_all)]
476    async fn complete(
477        &self,
478        handle: &Handle,
479        payload: Option<Vec<u8>>,
480    ) -> Result<(), EngineError> {
481        attempt::complete(&self.pool, handle, payload).await
482    }
483
484    #[tracing::instrument(name = "pg.fail", skip_all)]
485    async fn fail(
486        &self,
487        handle: &Handle,
488        reason: FailureReason,
489        classification: FailureClass,
490    ) -> Result<FailOutcome, EngineError> {
491        attempt::fail(&self.pool, handle, reason, classification).await
492    }
493
494    #[tracing::instrument(name = "pg.cancel", skip_all)]
495    async fn cancel(&self, handle: &Handle, reason: &str) -> Result<(), EngineError> {
496        let payload = handle_codec::decode_handle(handle)?;
497        exec_core::cancel_impl(
498            &self.pool,
499            &self.partition_config,
500            &payload.execution_id,
501            reason,
502        )
503        .await
504    }
505
506    #[tracing::instrument(name = "pg.suspend", skip_all)]
507    async fn suspend(
508        &self,
509        handle: &Handle,
510        args: SuspendArgs,
511    ) -> Result<SuspendOutcome, EngineError> {
512        suspend_ops::suspend_impl(&self.pool, &self.partition_config, handle, args).await
513    }
514
515    #[tracing::instrument(name = "pg.suspend_by_triple", skip_all)]
516    async fn suspend_by_triple(
517        &self,
518        exec_id: ExecutionId,
519        triple: LeaseFence,
520        args: SuspendArgs,
521    ) -> Result<SuspendOutcome, EngineError> {
522        suspend_ops::suspend_by_triple_impl(
523            &self.pool,
524            &self.partition_config,
525            exec_id,
526            triple,
527            args,
528        )
529        .await
530    }
531
532    #[tracing::instrument(name = "pg.create_waitpoint", skip_all)]
533    async fn create_waitpoint(
534        &self,
535        handle: &Handle,
536        waitpoint_key: &str,
537        expires_in: Duration,
538    ) -> Result<PendingWaitpoint, EngineError> {
539        suspend_ops::create_waitpoint_impl(&self.pool, handle, waitpoint_key, expires_in).await
540    }
541
542    #[cfg(feature = "core")]
543    #[tracing::instrument(name = "pg.read_waitpoint_token", skip_all)]
544    async fn read_waitpoint_token(
545        &self,
546        partition: PartitionKey,
547        waitpoint_id: &ff_core::types::WaitpointId,
548    ) -> Result<Option<String>, EngineError> {
549        suspend_ops::read_waitpoint_token_impl(&self.pool, &partition, waitpoint_id).await
550    }
551
552    #[tracing::instrument(name = "pg.observe_signals", skip_all)]
553    async fn observe_signals(
554        &self,
555        handle: &Handle,
556    ) -> Result<Vec<ResumeSignal>, EngineError> {
557        suspend_ops::observe_signals_impl(&self.pool, handle).await
558    }
559
560    #[tracing::instrument(name = "pg.claim_from_resume_grant", skip_all)]
561    async fn claim_from_resume_grant(
562        &self,
563        token: ResumeToken,
564    ) -> Result<Option<Handle>, EngineError> {
565        attempt::claim_from_resume_grant(&self.pool, token).await
566    }
567
568    // RFC-024 PR-D — lease-reclaim grant issuance + consumption.
569
570    #[tracing::instrument(name = "pg.issue_reclaim_grant", skip_all)]
571    async fn issue_reclaim_grant(
572        &self,
573        args: IssueReclaimGrantArgs,
574    ) -> Result<IssueReclaimGrantOutcome, EngineError> {
575        claim_grant::issue_reclaim_grant_impl(&self.pool, args).await
576    }
577
578    #[tracing::instrument(name = "pg.reclaim_execution", skip_all)]
579    async fn reclaim_execution(
580        &self,
581        args: ReclaimExecutionArgs,
582    ) -> Result<ReclaimExecutionOutcome, EngineError> {
583        claim_grant::reclaim_execution_impl(&self.pool, args).await
584    }
585
586    #[tracing::instrument(name = "pg.delay", skip_all)]
587    async fn delay(
588        &self,
589        handle: &Handle,
590        delay_until: TimestampMs,
591    ) -> Result<(), EngineError> {
592        attempt::delay(&self.pool, handle, delay_until).await
593    }
594
595    #[tracing::instrument(name = "pg.wait_children", skip_all)]
596    async fn wait_children(&self, handle: &Handle) -> Result<(), EngineError> {
597        attempt::wait_children(&self.pool, handle).await
598    }
599
600    // ── Read / admin ──
601
602    #[tracing::instrument(name = "pg.describe_execution", skip_all)]
603    async fn describe_execution(
604        &self,
605        id: &ExecutionId,
606    ) -> Result<Option<ExecutionSnapshot>, EngineError> {
607        exec_core::describe_execution_impl(&self.pool, &self.partition_config, id).await
608    }
609
610    #[tracing::instrument(name = "pg.read_execution_context", skip_all)]
611    async fn read_execution_context(
612        &self,
613        execution_id: &ExecutionId,
614    ) -> Result<ExecutionContext, EngineError> {
615        exec_core::read_execution_context_impl(&self.pool, &self.partition_config, execution_id)
616            .await
617    }
618
619    #[tracing::instrument(name = "pg.read_current_attempt_index", skip_all)]
620    async fn read_current_attempt_index(
621        &self,
622        execution_id: &ExecutionId,
623    ) -> Result<ff_core::types::AttemptIndex, EngineError> {
624        exec_core::read_current_attempt_index_impl(
625            &self.pool,
626            &self.partition_config,
627            execution_id,
628        )
629        .await
630    }
631
632    #[tracing::instrument(name = "pg.read_total_attempt_count", skip_all)]
633    async fn read_total_attempt_count(
634        &self,
635        execution_id: &ExecutionId,
636    ) -> Result<ff_core::types::AttemptIndex, EngineError> {
637        exec_core::read_total_attempt_count_impl(
638            &self.pool,
639            &self.partition_config,
640            execution_id,
641        )
642        .await
643    }
644
645    #[tracing::instrument(name = "pg.describe_flow", skip_all)]
646    async fn describe_flow(
647        &self,
648        id: &FlowId,
649    ) -> Result<Option<FlowSnapshot>, EngineError> {
650        flow::describe_flow(&self.pool, &self.partition_config, id).await
651    }
652
653    #[tracing::instrument(name = "pg.set_execution_tag", skip_all)]
654    async fn set_execution_tag(
655        &self,
656        execution_id: &ExecutionId,
657        key: &str,
658        value: &str,
659    ) -> Result<(), EngineError> {
660        ff_core::engine_backend::validate_tag_key(key)?;
661        exec_core::set_execution_tag_impl(&self.pool, execution_id, key, value).await
662    }
663
664    #[tracing::instrument(name = "pg.set_flow_tag", skip_all)]
665    async fn set_flow_tag(
666        &self,
667        flow_id: &FlowId,
668        key: &str,
669        value: &str,
670    ) -> Result<(), EngineError> {
671        ff_core::engine_backend::validate_tag_key(key)?;
672        flow::set_flow_tag_impl(&self.pool, &self.partition_config, flow_id, key, value).await
673    }
674
675    #[tracing::instrument(name = "pg.get_execution_tag", skip_all)]
676    async fn get_execution_tag(
677        &self,
678        execution_id: &ExecutionId,
679        key: &str,
680    ) -> Result<Option<String>, EngineError> {
681        ff_core::engine_backend::validate_tag_key(key)?;
682        exec_core::get_execution_tag_impl(&self.pool, execution_id, key).await
683    }
684
685    #[tracing::instrument(name = "pg.get_flow_tag", skip_all)]
686    async fn get_flow_tag(
687        &self,
688        flow_id: &FlowId,
689        key: &str,
690    ) -> Result<Option<String>, EngineError> {
691        ff_core::engine_backend::validate_tag_key(key)?;
692        flow::get_flow_tag_impl(&self.pool, &self.partition_config, flow_id, key).await
693    }
694
695    #[tracing::instrument(name = "pg.get_execution_namespace", skip_all)]
696    async fn get_execution_namespace(
697        &self,
698        execution_id: &ExecutionId,
699    ) -> Result<Option<String>, EngineError> {
700        exec_core::get_execution_namespace_impl(&self.pool, execution_id).await
701    }
702
703    #[cfg(feature = "core")]
704    #[tracing::instrument(name = "pg.list_edges", skip_all)]
705    async fn list_edges(
706        &self,
707        flow_id: &FlowId,
708        direction: EdgeDirection,
709    ) -> Result<Vec<EdgeSnapshot>, EngineError> {
710        flow::list_edges(&self.pool, &self.partition_config, flow_id, direction).await
711    }
712
713    #[cfg(feature = "core")]
714    #[tracing::instrument(name = "pg.describe_edge", skip_all)]
715    async fn describe_edge(
716        &self,
717        flow_id: &FlowId,
718        edge_id: &EdgeId,
719    ) -> Result<Option<EdgeSnapshot>, EngineError> {
720        flow::describe_edge(&self.pool, &self.partition_config, flow_id, edge_id).await
721    }
722
723    #[cfg(feature = "core")]
724    #[tracing::instrument(name = "pg.resolve_execution_flow_id", skip_all)]
725    async fn resolve_execution_flow_id(
726        &self,
727        eid: &ExecutionId,
728    ) -> Result<Option<FlowId>, EngineError> {
729        exec_core::resolve_execution_flow_id_impl(&self.pool, &self.partition_config, eid).await
730    }
731
732    #[cfg(feature = "core")]
733    #[tracing::instrument(name = "pg.list_flows", skip_all)]
734    async fn list_flows(
735        &self,
736        partition: PartitionKey,
737        cursor: Option<FlowId>,
738        limit: usize,
739    ) -> Result<ListFlowsPage, EngineError> {
740        flow::list_flows(&self.pool, partition, cursor, limit).await
741    }
742
743    #[cfg(feature = "core")]
744    #[tracing::instrument(name = "pg.list_lanes", skip_all)]
745    async fn list_lanes(
746        &self,
747        cursor: Option<LaneId>,
748        limit: usize,
749    ) -> Result<ListLanesPage, EngineError> {
750        admin::list_lanes_impl(&self.pool, cursor, limit).await
751    }
752
753    #[cfg(feature = "core")]
754    #[tracing::instrument(name = "pg.list_suspended", skip_all)]
755    async fn list_suspended(
756        &self,
757        partition: PartitionKey,
758        cursor: Option<ExecutionId>,
759        limit: usize,
760    ) -> Result<ListSuspendedPage, EngineError> {
761        admin::list_suspended_impl(&self.pool, partition, cursor, limit).await
762    }
763
764    #[cfg(feature = "core")]
765    #[tracing::instrument(name = "pg.list_executions", skip_all)]
766    async fn list_executions(
767        &self,
768        partition: PartitionKey,
769        cursor: Option<ExecutionId>,
770        limit: usize,
771    ) -> Result<ListExecutionsPage, EngineError> {
772        exec_core::list_executions_impl(
773            &self.pool,
774            &self.partition_config,
775            partition,
776            cursor,
777            limit,
778        )
779        .await
780    }
781
782    // ── Trigger ops (issue #150) ──
783
784    #[cfg(feature = "core")]
785    #[tracing::instrument(name = "pg.deliver_signal", skip_all)]
786    async fn deliver_signal(
787        &self,
788        args: DeliverSignalArgs,
789    ) -> Result<DeliverSignalResult, EngineError> {
790        suspend_ops::deliver_signal_impl(&self.pool, &self.partition_config, args).await
791    }
792
793    #[cfg(feature = "core")]
794    #[tracing::instrument(name = "pg.deliver_approval_signal", skip_all)]
795    async fn deliver_approval_signal(
796        &self,
797        args: DeliverApprovalSignalArgs,
798    ) -> Result<DeliverSignalResult, EngineError> {
799        suspend_ops::deliver_approval_signal_impl(&self.pool, &self.partition_config, args).await
800    }
801
802    #[cfg(feature = "core")]
803    #[tracing::instrument(name = "pg.claim_resumed_execution", skip_all)]
804    async fn claim_resumed_execution(
805        &self,
806        args: ClaimResumedExecutionArgs,
807    ) -> Result<ClaimResumedExecutionResult, EngineError> {
808        suspend_ops::claim_resumed_execution_impl(&self.pool, &self.partition_config, args).await
809    }
810
811    // ── RFC-020 Wave 9 Spine-B — read model (3 methods, §4.1) ────────
812    //
813    // Partition-local single-row reads against `ff_exec_core` (+ LATERAL
814    // join on `ff_attempt` for `read_execution_info`). READ COMMITTED
815    // (no CAS; all three are read-only). `get_execution_result` returns
816    // current-attempt semantics per §7.8 (matches Valkey's
817    // `GET ctx.result()` primitive). Capability flips land at the Wave 9
818    // release PR per RFC §6.3.
819
820    #[cfg(feature = "core")]
821    #[tracing::instrument(name = "pg.read_execution_state", skip_all)]
822    async fn read_execution_state(
823        &self,
824        id: &ExecutionId,
825    ) -> Result<Option<PublicState>, EngineError> {
826        exec_core::read_execution_state_impl(&self.pool, &self.partition_config, id).await
827    }
828
829    #[cfg(feature = "core")]
830    #[tracing::instrument(name = "pg.read_execution_info", skip_all)]
831    async fn read_execution_info(
832        &self,
833        id: &ExecutionId,
834    ) -> Result<Option<ExecutionInfo>, EngineError> {
835        exec_core::read_execution_info_impl(&self.pool, &self.partition_config, id).await
836    }
837
838    #[tracing::instrument(name = "pg.get_execution_result", skip_all)]
839    async fn get_execution_result(
840        &self,
841        id: &ExecutionId,
842    ) -> Result<Option<Vec<u8>>, EngineError> {
843        exec_core::get_execution_result_impl(&self.pool, &self.partition_config, id).await
844    }
845
846    // ── RFC-020 Wave 9 Standalone-2 — list_pending_waitpoints (§4.5) ─
847    //
848    // Read-only projection of `ff_waitpoint_pending` serving the 10-
849    // field `PendingWaitpointInfo` contract. Producer-side writes of
850    // the 3 new 0011 columns (`state`, `required_signal_names`,
851    // `activated_at_ms`) land alongside this method in the same PR —
852    // see `suspend_ops::suspend_core` INSERT site. Capability flip
853    // deferred to Wave 9 release PR per RFC §6.3.
854    #[cfg(feature = "core")]
855    #[tracing::instrument(name = "pg.list_pending_waitpoints", skip_all)]
856    async fn list_pending_waitpoints(
857        &self,
858        args: ListPendingWaitpointsArgs,
859    ) -> Result<ListPendingWaitpointsResult, EngineError> {
860        suspend_ops::list_pending_waitpoints_impl(&self.pool, args).await
861    }
862
863    // ── RFC-020 Wave 9 Spine-A pt.1 — operator-control mutations (§4.2) ─
864    //
865    // Two methods landing behind `Supports.cancel_execution` +
866    // `Supports.revoke_lease` (both stay `false` until the Wave 9
867    // release PR flips them atomically, RFC §6.3). SERIALIZABLE + CAS +
868    // `ff_lease_event` outbox emit on the same tx (§4.2.6 + §4.2.7).
869
870    #[cfg(feature = "core")]
871    #[tracing::instrument(name = "pg.cancel_execution", skip_all)]
872    async fn cancel_execution(
873        &self,
874        args: CancelExecutionArgs,
875    ) -> Result<CancelExecutionResult, EngineError> {
876        operator::cancel_execution_impl(&self.pool, args).await
877    }
878
879    #[cfg(feature = "core")]
880    #[tracing::instrument(name = "pg.revoke_lease", skip_all)]
881    async fn revoke_lease(
882        &self,
883        args: RevokeLeaseArgs,
884    ) -> Result<RevokeLeaseResult, EngineError> {
885        operator::revoke_lease_impl(&self.pool, args).await
886    }
887
888    // ── RFC-020 Wave 9 Spine-A pt.2 — operator control + flow cancel (§4.2.3 + §4.2.4 + §4.2.5) ─
889    //
890    // Four methods landing behind `Supports.change_priority` +
891    // `Supports.replay_execution` + `Supports.cancel_flow_header` +
892    // `Supports.ack_cancel_member` (all stay `false` until the Wave 9
893    // release PR flips them atomically, RFC §6.3). SERIALIZABLE + CAS +
894    // `ff_operator_event` outbox emit on the same tx (§4.2.6 + §4.2.7).
895    // `ack_cancel_member` is silent on the outbox (Valkey-parity).
896
897    #[cfg(feature = "core")]
898    #[tracing::instrument(name = "pg.change_priority", skip_all)]
899    async fn change_priority(
900        &self,
901        args: ChangePriorityArgs,
902    ) -> Result<ChangePriorityResult, EngineError> {
903        operator::change_priority_impl(&self.pool, args).await
904    }
905
906    #[cfg(feature = "core")]
907    #[tracing::instrument(name = "pg.replay_execution", skip_all)]
908    async fn replay_execution(
909        &self,
910        args: ReplayExecutionArgs,
911    ) -> Result<ReplayExecutionResult, EngineError> {
912        operator::replay_execution_impl(&self.pool, args).await
913    }
914
915    #[cfg(feature = "core")]
916    #[tracing::instrument(name = "pg.cancel_flow_header", skip_all)]
917    async fn cancel_flow_header(
918        &self,
919        args: CancelFlowArgs,
920    ) -> Result<CancelFlowHeader, EngineError> {
921        operator::cancel_flow_header_impl(&self.pool, &self.partition_config, args).await
922    }
923
924    #[cfg(feature = "core")]
925    #[tracing::instrument(name = "pg.ack_cancel_member", skip_all)]
926    async fn ack_cancel_member(
927        &self,
928        flow_id: &FlowId,
929        execution_id: &ExecutionId,
930    ) -> Result<(), EngineError> {
931        operator::ack_cancel_member_impl(
932            &self.pool,
933            &self.partition_config,
934            flow_id.clone(),
935            execution_id.clone(),
936        )
937        .await
938    }
939
940    // ── RFC-017 Stage A — ingress (promoted from inherent) ────
941
942    /// RFC-017 Wave 8 Stage E1: lift the inherent
943    /// [`PostgresBackend::create_execution`] onto the trait so
944    /// ff-server's migrated HTTP handler can dispatch to Postgres.
945    /// Post-insert the row is idempotent; the Postgres impl does not
946    /// distinguish `Created` from `Duplicate` at the helper level
947    /// (both paths commit and return the execution id), so we always
948    /// surface `Created { public_state: Waiting }` here. A follow-up
949    /// may lift the distinction if a consumer relies on it.
950    #[cfg(feature = "core")]
951    #[tracing::instrument(name = "pg.create_execution.trait", skip_all)]
952    async fn create_execution(
953        &self,
954        args: CreateExecutionArgs,
955    ) -> Result<CreateExecutionResult, EngineError> {
956        let eid = args.execution_id.clone();
957        exec_core::create_execution_impl(&self.pool, &self.partition_config, args).await?;
958        Ok(CreateExecutionResult::Created {
959            execution_id: eid,
960            public_state: PublicState::Waiting,
961        })
962    }
963
964    #[cfg(feature = "core")]
965    #[tracing::instrument(name = "pg.create_flow", skip_all)]
966    async fn create_flow(
967        &self,
968        args: CreateFlowArgs,
969    ) -> Result<CreateFlowResult, EngineError> {
970        flow_staging::create_flow(&self.pool, &self.partition_config, &args).await
971    }
972
973    #[cfg(feature = "core")]
974    #[tracing::instrument(name = "pg.add_execution_to_flow", skip_all)]
975    async fn add_execution_to_flow(
976        &self,
977        args: AddExecutionToFlowArgs,
978    ) -> Result<AddExecutionToFlowResult, EngineError> {
979        flow_staging::add_execution_to_flow(&self.pool, &self.partition_config, &args).await
980    }
981
982    #[cfg(feature = "core")]
983    #[tracing::instrument(name = "pg.stage_dependency_edge", skip_all)]
984    async fn stage_dependency_edge(
985        &self,
986        args: StageDependencyEdgeArgs,
987    ) -> Result<StageDependencyEdgeResult, EngineError> {
988        flow_staging::stage_dependency_edge(&self.pool, &self.partition_config, &args).await
989    }
990
991    #[cfg(feature = "core")]
992    #[tracing::instrument(name = "pg.apply_dependency_to_child", skip_all)]
993    async fn apply_dependency_to_child(
994        &self,
995        args: ApplyDependencyToChildArgs,
996    ) -> Result<ApplyDependencyToChildResult, EngineError> {
997        flow_staging::apply_dependency_to_child(&self.pool, &self.partition_config, &args).await
998    }
999
1000    // PR-7b Cluster 4. Async-via-outbox cascade — see trait rustdoc
1001    // "Timing semantics" for the Valkey-sync vs PG-outbox divergence.
1002    // This resolves the payload to its `ff_completion_event.event_id`
1003    // and invokes the existing Wave-5a `dispatch_completion`; further
1004    // hops ride their own outbox events emitted by the per-hop tx.
1005    #[cfg(feature = "core")]
1006    async fn cascade_completion(
1007        &self,
1008        payload: &ff_core::backend::CompletionPayload,
1009    ) -> Result<ff_core::contracts::CascadeOutcome, EngineError> {
1010        let event_id = match resolve_event_id(&self.pool, payload).await {
1011            Some(id) => id,
1012            None => {
1013                // Event not materialised (outbox race or pre-subscribe
1014                // payload) — reconciler is the backstop.
1015                tracing::warn!(
1016                    execution_id = %payload.execution_id,
1017                    produced_at_ms = payload.produced_at_ms.0,
1018                    "pg.cascade_completion: could not resolve event_id; reconciler will claim"
1019                );
1020                return Ok(ff_core::contracts::CascadeOutcome::async_dispatched(0));
1021            }
1022        };
1023        let outcome = crate::dispatch::dispatch_completion(&self.pool, event_id).await?;
1024        let advanced = match outcome {
1025            crate::dispatch::DispatchOutcome::NoOp => 0,
1026            crate::dispatch::DispatchOutcome::Advanced(n) => n,
1027        };
1028        Ok(ff_core::contracts::CascadeOutcome::async_dispatched(advanced))
1029    }
1030
1031    fn backend_label(&self) -> &'static str {
1032        "postgres"
1033    }
1034
1035    /// RFC-018 Stage A: populate the `Capabilities` snapshot from the
1036    /// static [`postgres_supports_base`] shape. The Postgres backend
1037    /// landed through RFC-017 Stage E4 at v0.8.0; fields still `false`
1038    /// correspond to Wave-9 follow-up work (`cancel_flow_header`,
1039    /// `ack_cancel_member`, read-model, operator control, budget /
1040    /// quota, `list_pending_waitpoints`). See
1041    /// `docs/POSTGRES_PARITY_MATRIX.md` for the per-row breakdown.
1042    fn capabilities(&self) -> ff_core::capability::Capabilities {
1043        ff_core::capability::Capabilities::new(
1044            ff_core::capability::BackendIdentity::new(
1045                "postgres",
1046                ff_core::capability::Version::new(0, 11, 0),
1047                "E-shipped",
1048            ),
1049            postgres_supports_base(),
1050        )
1051    }
1052
1053    /// Issue #281: no-op. Schema migrations are applied out-of-band
1054    /// per `rfcs/drafts/v0.7-migration-master.md §Q12` (operator runs
1055    /// `sqlx migrate run` or the future `ff-migrate` CLI). Boot runs a
1056    /// schema-version check at connect time
1057    /// ([`crate::version::check_schema_version`]) and refuses to
1058    /// start on mismatch, so by the time `prepare()` is callable
1059    /// there is nothing further to do.
1060    async fn prepare(
1061        &self,
1062    ) -> Result<ff_core::backend::PrepareOutcome, EngineError> {
1063        Ok(ff_core::backend::PrepareOutcome::NoOp)
1064    }
1065
1066    /// RFC-017 Wave 8 Stage E3 (§4 row 9, §7): forward the claim to the
1067    /// Postgres-native admission pipeline. Returns `NoWork` when no
1068    /// eligible execution is admissible this scan cycle. Budget
1069    /// breaches surface as `NoWork` (leaving the row eligible for a
1070    /// retry by another worker); validation-class rejections
1071    /// (malformed partition, unknown kid) surface as typed
1072    /// [`EngineError`] variants mapped to the Server's 400/503 arms.
1073    #[cfg(feature = "core")]
1074    #[tracing::instrument(name = "pg.claim_for_worker", skip_all)]
1075    async fn claim_for_worker(
1076        &self,
1077        args: ff_core::contracts::ClaimForWorkerArgs,
1078    ) -> Result<ff_core::contracts::ClaimForWorkerOutcome, EngineError> {
1079        let sched = scheduler::PostgresScheduler::new(self.pool.clone());
1080        let grant_opt = sched
1081            .claim_for_worker(
1082                &args.lane_id,
1083                &args.worker_id,
1084                &args.worker_instance_id,
1085                &args.worker_capabilities,
1086                args.grant_ttl_ms,
1087            )
1088            .await?;
1089        Ok(match grant_opt {
1090            Some(g) => ff_core::contracts::ClaimForWorkerOutcome::granted(g),
1091            None => ff_core::contracts::ClaimForWorkerOutcome::no_work(),
1092        })
1093    }
1094
1095    async fn ping(&self) -> Result<(), EngineError> {
1096        // Postgres analogue to Valkey PING — single-round-trip pool
1097        // liveness. Errors propagate as transport-class EngineError via
1098        // the existing sqlx→EngineError map.
1099        let _ = sqlx::query_scalar::<_, i32>("SELECT 1")
1100            .fetch_one(&self.pool)
1101            .await
1102            .map_err(error::map_sqlx_error)?;
1103        Ok(())
1104    }
1105
1106    /// RFC-017 Wave 8 Stage E3: drain the scanner supervisor's
1107    /// reconciler tasks up to `grace`, then close the sqlx pool.
1108    /// Matches the Valkey backend's shutdown_prepare contract —
1109    /// bounded best-effort drain, never returns an error.
1110    async fn shutdown_prepare(&self, grace: Duration) -> Result<(), EngineError> {
1111        #[cfg(feature = "core")]
1112        if let Some(handle) = self.scanner_handle.as_ref() {
1113            let timed_out = handle.shutdown(grace).await;
1114            if timed_out > 0 {
1115                tracing::warn!(
1116                    timed_out,
1117                    ?grace,
1118                    "postgres scanner supervisor exceeded grace on shutdown"
1119                );
1120            }
1121        }
1122        Ok(())
1123    }
1124
1125    #[tracing::instrument(name = "pg.cancel_flow", skip_all)]
1126    async fn cancel_flow(
1127        &self,
1128        id: &FlowId,
1129        policy: CancelFlowPolicy,
1130        wait: CancelFlowWait,
1131    ) -> Result<CancelFlowResult, EngineError> {
1132        let result = flow::cancel_flow(&self.pool, &self.partition_config, id, policy).await?;
1133        if let Some(deadline) = ff_core::engine_backend::cancel_flow_wait_deadline(wait) {
1134            ff_core::engine_backend::wait_for_flow_cancellation(self, id, deadline).await?;
1135        }
1136        Ok(result)
1137    }
1138
1139    #[cfg(feature = "core")]
1140    #[tracing::instrument(name = "pg.set_edge_group_policy", skip_all)]
1141    async fn set_edge_group_policy(
1142        &self,
1143        flow_id: &FlowId,
1144        downstream_execution_id: &ExecutionId,
1145        policy: EdgeDependencyPolicy,
1146    ) -> Result<SetEdgeGroupPolicyResult, EngineError> {
1147        flow::set_edge_group_policy(
1148            &self.pool,
1149            &self.partition_config,
1150            flow_id,
1151            downstream_execution_id,
1152            policy,
1153        )
1154        .await
1155    }
1156
1157    // ── Budget ──
1158
1159    #[tracing::instrument(name = "pg.report_usage", skip_all)]
1160    async fn report_usage(
1161        &self,
1162        _handle: &Handle,
1163        budget: &BudgetId,
1164        dimensions: UsageDimensions,
1165    ) -> Result<ReportUsageResult, EngineError> {
1166        budget::report_usage_impl(&self.pool, &self.partition_config, budget, dimensions).await
1167    }
1168
1169    // ── RFC-020 Wave 9 Standalone-1 — budget/quota admin (§4.4) ─────
1170    //
1171    // Five methods landing behind capability flags that stay `false`
1172    // until the Wave 9 release PR flips them atomically (RFC §6.3).
1173    // Schema + trait impls land now; capability-surface flip is one
1174    // PR later.
1175
1176    #[cfg(feature = "core")]
1177    #[tracing::instrument(name = "pg.create_budget", skip_all)]
1178    async fn create_budget(
1179        &self,
1180        args: CreateBudgetArgs,
1181    ) -> Result<CreateBudgetResult, EngineError> {
1182        budget::create_budget_impl(&self.pool, &self.partition_config, args).await
1183    }
1184
1185    #[cfg(feature = "core")]
1186    #[tracing::instrument(name = "pg.reset_budget", skip_all)]
1187    async fn reset_budget(
1188        &self,
1189        args: ResetBudgetArgs,
1190    ) -> Result<ResetBudgetResult, EngineError> {
1191        budget::reset_budget_impl(&self.pool, &self.partition_config, args).await
1192    }
1193
1194    #[cfg(feature = "core")]
1195    #[tracing::instrument(name = "pg.create_quota_policy", skip_all)]
1196    async fn create_quota_policy(
1197        &self,
1198        args: CreateQuotaPolicyArgs,
1199    ) -> Result<CreateQuotaPolicyResult, EngineError> {
1200        budget::create_quota_policy_impl(&self.pool, &self.partition_config, args).await
1201    }
1202
1203    #[cfg(feature = "core")]
1204    #[tracing::instrument(name = "pg.get_budget_status", skip_all)]
1205    async fn get_budget_status(
1206        &self,
1207        id: &BudgetId,
1208    ) -> Result<BudgetStatus, EngineError> {
1209        budget::get_budget_status_impl(&self.pool, &self.partition_config, id).await
1210    }
1211
1212    #[cfg(feature = "core")]
1213    #[tracing::instrument(name = "pg.report_usage_admin", skip_all)]
1214    async fn report_usage_admin(
1215        &self,
1216        budget_id: &BudgetId,
1217        args: ReportUsageAdminArgs,
1218    ) -> Result<ReportUsageResult, EngineError> {
1219        budget::report_usage_admin_impl(&self.pool, &self.partition_config, budget_id, args).await
1220    }
1221
1222    // ── cairn #454 Phase 4a — per-execution ledger (option A) ────────
1223
1224    #[cfg(feature = "core")]
1225    #[tracing::instrument(name = "pg.record_spend", skip_all)]
1226    async fn record_spend(
1227        &self,
1228        args: ff_core::contracts::RecordSpendArgs,
1229    ) -> Result<ReportUsageResult, EngineError> {
1230        budget::record_spend_impl(&self.pool, &self.partition_config, args).await
1231    }
1232
1233    #[cfg(feature = "core")]
1234    #[tracing::instrument(name = "pg.release_budget", skip_all)]
1235    async fn release_budget(
1236        &self,
1237        args: ff_core::contracts::ReleaseBudgetArgs,
1238    ) -> Result<(), EngineError> {
1239        budget::release_budget_impl(&self.pool, &self.partition_config, args).await
1240    }
1241
1242    // ── HMAC secret rotation (v0.7 migration-master Q4) ──
1243    //
1244    // Wave 4 replaces this stub with a single INSERT into
1245    // `ff_waitpoint_hmac(kid, secret, rotated_at)`. Wave 0/1 keep
1246    // the `Unavailable` shape so a running Postgres backend surfaces
1247    // the unimplemented op loudly rather than silently no-op'ing.
1248    #[tracing::instrument(name = "pg.rotate_waitpoint_hmac_secret_all", skip_all)]
1249    async fn rotate_waitpoint_hmac_secret_all(
1250        &self,
1251        args: RotateWaitpointHmacSecretAllArgs,
1252    ) -> Result<RotateWaitpointHmacSecretAllResult, EngineError> {
1253        // Wave 4 Agent D: Q4 single-global-row write against
1254        // `ff_waitpoint_hmac`. `now_ms` is captured here (not
1255        // inside the impl) so tests can inject a deterministic
1256        // clock via the pool layer in the future.
1257        let now_ms = std::time::SystemTime::now()
1258            .duration_since(std::time::UNIX_EPOCH)
1259            .map(|d| d.as_millis() as i64)
1260            .unwrap_or(0);
1261        signal::rotate_waitpoint_hmac_secret_all_impl(&self.pool, args, now_ms).await
1262    }
1263
1264    #[tracing::instrument(name = "pg.seed_waitpoint_hmac_secret", skip_all)]
1265    async fn seed_waitpoint_hmac_secret(
1266        &self,
1267        args: SeedWaitpointHmacSecretArgs,
1268    ) -> Result<SeedOutcome, EngineError> {
1269        // Issue #280: install-only boot-time seed against the global
1270        // `ff_waitpoint_hmac` table. Idempotent — cairn calls this on
1271        // every boot and the backend decides whether to INSERT.
1272        let now_ms = std::time::SystemTime::now()
1273            .duration_since(std::time::UNIX_EPOCH)
1274            .map(|d| d.as_millis() as i64)
1275            .unwrap_or(0);
1276        signal::seed_waitpoint_hmac_secret_impl(&self.pool, args, now_ms).await
1277    }
1278
1279    // ── Stream reads (streaming feature) ──
1280
1281    #[cfg(feature = "streaming")]
1282    #[tracing::instrument(name = "pg.read_stream", skip_all)]
1283    async fn read_stream(
1284        &self,
1285        execution_id: &ExecutionId,
1286        attempt_index: AttemptIndex,
1287        from: StreamCursor,
1288        to: StreamCursor,
1289        count_limit: u64,
1290    ) -> Result<StreamFrames, EngineError> {
1291        stream::read_stream(&self.pool, execution_id, attempt_index, from, to, count_limit).await
1292    }
1293
1294    #[cfg(feature = "streaming")]
1295    #[tracing::instrument(name = "pg.tail_stream", skip_all)]
1296    async fn tail_stream(
1297        &self,
1298        execution_id: &ExecutionId,
1299        attempt_index: AttemptIndex,
1300        after: StreamCursor,
1301        block_ms: u64,
1302        count_limit: u64,
1303        visibility: TailVisibility,
1304    ) -> Result<StreamFrames, EngineError> {
1305        let notifier = self
1306            .stream_notifier
1307            .as_ref()
1308            .ok_or(EngineError::Unavailable {
1309                op: "pg.tail_stream (notifier not initialised)",
1310            })?;
1311        stream::tail_stream(
1312            &self.pool,
1313            notifier,
1314            execution_id,
1315            attempt_index,
1316            after,
1317            block_ms,
1318            count_limit,
1319            visibility,
1320        )
1321        .await
1322    }
1323
1324    #[cfg(feature = "streaming")]
1325    #[tracing::instrument(name = "pg.read_summary", skip_all)]
1326    async fn read_summary(
1327        &self,
1328        execution_id: &ExecutionId,
1329        attempt_index: AttemptIndex,
1330    ) -> Result<Option<SummaryDocument>, EngineError> {
1331        stream::read_summary(&self.pool, execution_id, attempt_index).await
1332    }
1333
1334    // ── RFC-019 Stage A — `subscribe_completion` ──────────────────
1335    //
1336    // Postgres real impl. Wraps the existing `ff_completion_event`
1337    // outbox + `LISTEN ff_completion` machinery
1338    // (see `completion::subscribe`) and adapts each completion
1339    // payload into a `stream_subscribe::StreamEvent`.
1340    //
1341    // Cursor encoding: `POSTGRES_CURSOR_PREFIX (0x02)` + `event_id`
1342    // (i64 BE). Stage A resume-from-cursor is not plumbed through the
1343    // adapter (the existing subscriber tails from `max(event_id)`);
1344    // Stage B threads the cursor into the replay path. The surface is
1345    // correct today for consumers that subscribe from tail and
1346    // persist cursors for future resume.
1347    #[tracing::instrument(name = "pg.subscribe_completion", skip_all)]
1348    async fn subscribe_completion(
1349        &self,
1350        _cursor: ff_core::stream_subscribe::StreamCursor,
1351        filter: &ff_core::backend::ScannerFilter,
1352    ) -> Result<ff_core::stream_events::CompletionSubscription, EngineError> {
1353        use ff_core::stream_events::{CompletionEvent, CompletionOutcome};
1354        use ff_core::stream_subscribe::encode_postgres_event_cursor;
1355        use futures_core::Stream;
1356        use std::pin::Pin;
1357        use std::task::{Context, Poll};
1358
1359        // Delegate to the existing CompletionBackend implementation so
1360        // the LISTEN/replay machinery is shared. When a non-noop
1361        // `ScannerFilter` (#282) is supplied, route through the
1362        // `_filtered` variant so the outbox-inline SQL filter applies.
1363        // Resume-from-cursor is still unwired (Stage A surface tails
1364        // from tail).
1365        let inner = if filter.is_noop() {
1366            ff_core::completion_backend::CompletionBackend::subscribe_completions(self).await?
1367        } else {
1368            ff_core::completion_backend::CompletionBackend::subscribe_completions_filtered(
1369                self, filter,
1370            )
1371            .await?
1372        };
1373
1374        struct Adapter {
1375            inner: ff_core::completion_backend::CompletionStream,
1376        }
1377
1378        impl Stream for Adapter {
1379            type Item = Result<CompletionEvent, EngineError>;
1380            fn poll_next(
1381                mut self: Pin<&mut Self>,
1382                cx: &mut Context<'_>,
1383            ) -> Poll<Option<Self::Item>> {
1384                match Pin::new(&mut self.inner).poll_next(cx) {
1385                    Poll::Pending => Poll::Pending,
1386                    Poll::Ready(None) => Poll::Ready(None),
1387                    Poll::Ready(Some(payload)) => {
1388                        // Placeholder cursor (0-event_id) because
1389                        // `CompletionPayload` does not surface
1390                        // `event_id` today. Family prefix stays stable
1391                        // so persistence is forward-compatible.
1392                        let cursor = encode_postgres_event_cursor(0);
1393                        let event = CompletionEvent::new(
1394                            cursor,
1395                            payload.execution_id.clone(),
1396                            CompletionOutcome::from_wire(&payload.outcome),
1397                            payload.produced_at_ms,
1398                        );
1399                        Poll::Ready(Some(Ok(event)))
1400                    }
1401                }
1402            }
1403        }
1404
1405        Ok(Box::pin(Adapter { inner }))
1406    }
1407
1408    // ── RFC-019 Stage B — `subscribe_lease_history` ──────────────
1409    //
1410    // Real Postgres impl. Tails the `ff_lease_event` outbox (written
1411    // by producer sites in `attempt.rs`, `flow.rs`, `suspend_ops.rs`,
1412    // and the `attempt_timeout` / `lease_expiry` reconcilers) via
1413    // `LISTEN ff_lease_event` + catch-up SELECT. Cursor encoding
1414    // matches `subscribe_completion`: `0x02 ++ event_id(BE8)`.
1415    //
1416    // Partition scope: hardcoded to partition 0 — mirrors the Valkey
1417    // Stage A impl, which tails partition 0's aggregate stream key.
1418    // Cross-partition consumers instantiate one backend per
1419    // partition + merge streams consumer-side (RFC-019 §Backend
1420    // Semantics).
1421    #[tracing::instrument(name = "pg.subscribe_lease_history", skip_all)]
1422    async fn subscribe_lease_history(
1423        &self,
1424        cursor: ff_core::stream_subscribe::StreamCursor,
1425        filter: &ff_core::backend::ScannerFilter,
1426    ) -> Result<ff_core::stream_events::LeaseHistorySubscription, EngineError> {
1427        lease_event_subscribe::subscribe(&self.pool, 0, cursor, filter.clone()).await
1428    }
1429
1430    // ── RFC-019 Stage B — `subscribe_signal_delivery` (#310) ─────
1431    //
1432    // Tails the `ff_signal_event` outbox (written by the producer
1433    // INSERT in `suspend_ops::deliver_signal_impl`) via
1434    // `LISTEN ff_signal_event` + catch-up SELECT. Cursor encoding
1435    // matches `subscribe_lease_history`: `0x02 ++ event_id(BE8)`.
1436    //
1437    // Partition scope: hardcoded to partition 0 — mirrors the Valkey
1438    // Stage B impl which tails partition 0's aggregate stream key.
1439    #[tracing::instrument(name = "pg.subscribe_signal_delivery", skip_all)]
1440    async fn subscribe_signal_delivery(
1441        &self,
1442        cursor: ff_core::stream_subscribe::StreamCursor,
1443        filter: &ff_core::backend::ScannerFilter,
1444    ) -> Result<ff_core::stream_events::SignalDeliverySubscription, EngineError> {
1445        signal_delivery_subscribe::subscribe(&self.pool, 0, cursor, filter.clone()).await
1446    }
1447
1448    // ── PR-7b Cluster 1 — Foundation scanner operations ─────────
1449    //
1450    // Per-execution wrappers around the reconcilers in
1451    // `crate::reconcilers::*`. Each reconciler exposes a
1452    // `*_for_execution` / `*_for_*` helper mirroring the batch
1453    // `scan_tick` path's per-row tx logic so the engine-side scanner
1454    // trait-dispatch and the batch reconciler share one SQL code
1455    // path.
1456    //
1457    // All 5 scanner-op methods run real SQL on Postgres; both phases
1458    // of `expire_execution` (`AttemptTimeout` + `ExecutionDeadline`)
1459    // are covered. The Wave-9-minimal delivery (this commit) adds
1460    // `delayed_promoter`, `pending_wp_expiry`, and
1461    // `execution_deadline` reconcilers on top of the Wave 6c
1462    // reconcilers shipped with PR-7b/1.
1463    //
1464    // Gated on `core` because `reconcilers` (and its `dispatch` dep)
1465    // require `core`. Without `core` the trait defaults return
1466    // `EngineError::Unavailable`, preserving behavioural parity with
1467    // other feature-stripped callsites.
1468
1469    #[cfg(feature = "core")]
1470    async fn mark_lease_expired_if_due(
1471        &self,
1472        partition: Partition,
1473        execution_id: &ExecutionId,
1474    ) -> Result<(), EngineError> {
1475        let (_pk_from_eid, exec_uuid) = attempt::split_exec_id(execution_id)?;
1476        let partition_key = partition_index_to_i16(partition)?;
1477        reconcilers::lease_expiry::release_for_execution(&self.pool, partition_key, exec_uuid)
1478            .await
1479    }
1480
1481    #[cfg(feature = "core")]
1482    async fn promote_delayed(
1483        &self,
1484        partition: Partition,
1485        _lane: &LaneId,
1486        execution_id: &ExecutionId,
1487        now_ms: TimestampMs,
1488    ) -> Result<(), EngineError> {
1489        let (_pk_from_eid, exec_uuid) = attempt::split_exec_id(execution_id)?;
1490        let partition_key = partition_index_to_i16(partition)?;
1491        // `lane` is ignored: lane is authoritative on `ff_exec_core`
1492        // already (it was used only to locate the Valkey ZSET). The
1493        // candidate selection in `promote_for_execution` re-checks
1494        // the (lifecycle_phase, eligibility_state, deadline_at_ms)
1495        // tuple that `attempt::delay()` writes.
1496        reconcilers::delayed_promoter::promote_for_execution(
1497            &self.pool,
1498            partition_key,
1499            exec_uuid,
1500            now_ms.0,
1501        )
1502        .await
1503    }
1504
1505    #[cfg(feature = "core")]
1506    async fn close_waitpoint(
1507        &self,
1508        partition: Partition,
1509        _execution_id: &ExecutionId,
1510        waitpoint_id: &str,
1511        now_ms: TimestampMs,
1512    ) -> Result<(), EngineError> {
1513        // The scanner resolves (waitpoint_id → owning execution_id)
1514        // separately for filter application; the close action itself
1515        // only needs the waitpoint row (which carries `execution_id`
1516        // + `waitpoint_key`). Partition is authoritative from the
1517        // caller.
1518        let partition_key = partition_index_to_i16(partition)?;
1519        let waitpoint_uuid = uuid::Uuid::parse_str(waitpoint_id).map_err(|e| {
1520            EngineError::Validation {
1521                kind: ff_core::engine_error::ValidationKind::InvalidInput,
1522                detail: format!("waitpoint_id not a UUID: {e}"),
1523            }
1524        })?;
1525        reconcilers::pending_wp_expiry::close_for_execution(
1526            &self.pool,
1527            partition_key,
1528            waitpoint_uuid,
1529            now_ms.0,
1530        )
1531        .await
1532    }
1533
1534    #[cfg(feature = "core")]
1535    async fn expire_execution(
1536        &self,
1537        partition: Partition,
1538        execution_id: &ExecutionId,
1539        phase: ExpirePhase,
1540        now_ms: TimestampMs,
1541    ) -> Result<(), EngineError> {
1542        let (_pk_from_eid, exec_uuid) = attempt::split_exec_id(execution_id)?;
1543        let partition_key = partition_index_to_i16(partition)?;
1544        match phase {
1545            ExpirePhase::AttemptTimeout => {
1546                reconcilers::attempt_timeout::expire_for_execution(
1547                    &self.pool,
1548                    partition_key,
1549                    exec_uuid,
1550                )
1551                .await
1552            }
1553            ExpirePhase::ExecutionDeadline => {
1554                reconcilers::execution_deadline::expire_for_execution(
1555                    &self.pool,
1556                    partition_key,
1557                    exec_uuid,
1558                    now_ms.0,
1559                )
1560                .await
1561            }
1562        }
1563    }
1564
1565    #[cfg(feature = "core")]
1566    async fn expire_suspension(
1567        &self,
1568        partition: Partition,
1569        execution_id: &ExecutionId,
1570        _now_ms: TimestampMs,
1571    ) -> Result<(), EngineError> {
1572        let (_pk_from_eid, exec_uuid) = attempt::split_exec_id(execution_id)?;
1573        let partition_key = partition_index_to_i16(partition)?;
1574        reconcilers::suspension_timeout::expire_for_execution(
1575            &self.pool,
1576            partition_key,
1577            exec_uuid,
1578        )
1579        .await
1580    }
1581
1582    // PR-7b Cluster 2b-B: flow summary projection on Postgres.
1583    // Aggregates member public_state from ff_exec_core and UPSERTs the
1584    // derived summary into ff_flow_summary (migration 0019). One SQL
1585    // round-trip for the aggregation + one for the upsert.
1586    #[cfg(feature = "core")]
1587    async fn project_flow_summary(
1588        &self,
1589        partition: Partition,
1590        flow_id: &FlowId,
1591        now_ms: TimestampMs,
1592    ) -> Result<bool, EngineError> {
1593        let partition_key = partition_index_to_i16(partition)?;
1594        let flow_uuid: sqlx::types::Uuid = flow_id.0;
1595        flow::project_flow_summary_impl(
1596            &self.pool,
1597            partition_key,
1598            flow_uuid,
1599            now_ms.0,
1600        )
1601        .await
1602    }
1603
1604    // PR-7b Cluster 2b-B: retention trim on Postgres.
1605    // SELECTs a batch of terminal executions past the cutoff, then
1606    // per-execution DELETEs across every sibling table (no FK CASCADE
1607    // in the schema, so this is explicit). One transaction per batch.
1608    #[cfg(feature = "core")]
1609    async fn trim_retention(
1610        &self,
1611        partition: Partition,
1612        lane_id: &LaneId,
1613        retention_ms: u64,
1614        now_ms: TimestampMs,
1615        batch_size: u32,
1616        filter: &ff_core::backend::ScannerFilter,
1617    ) -> Result<u32, EngineError> {
1618        let partition_key = partition_index_to_i16(partition)?;
1619        exec_core::trim_retention_impl(
1620            &self.pool,
1621            partition_key,
1622            lane_id.as_str(),
1623            retention_ms,
1624            now_ms.0,
1625            batch_size,
1626            filter,
1627        )
1628        .await
1629    }
1630
1631    // ── PR-7b / #453: typed-FCALL bodies ──
1632
1633    #[cfg(feature = "core")]
1634    async fn renew_lease(
1635        &self,
1636        args: ff_core::contracts::RenewLeaseArgs,
1637    ) -> Result<ff_core::contracts::RenewLeaseResult, EngineError> {
1638        crate::typed_ops::renew_lease(self.pool(), args).await
1639    }
1640
1641    #[cfg(feature = "core")]
1642    async fn complete_execution(
1643        &self,
1644        args: ff_core::contracts::CompleteExecutionArgs,
1645    ) -> Result<ff_core::contracts::CompleteExecutionResult, EngineError> {
1646        crate::typed_ops::complete_execution(self.pool(), args).await
1647    }
1648
1649    #[cfg(feature = "core")]
1650    async fn fail_execution(
1651        &self,
1652        args: ff_core::contracts::FailExecutionArgs,
1653    ) -> Result<ff_core::contracts::FailExecutionResult, EngineError> {
1654        crate::typed_ops::fail_execution(self.pool(), args).await
1655    }
1656
1657    #[cfg(feature = "core")]
1658    async fn resume_execution(
1659        &self,
1660        args: ff_core::contracts::ResumeExecutionArgs,
1661    ) -> Result<ff_core::contracts::ResumeExecutionResult, EngineError> {
1662        crate::typed_ops::resume_execution(self.pool(), args).await
1663    }
1664
1665    #[cfg(feature = "core")]
1666    async fn check_admission(
1667        &self,
1668        quota_policy_id: &ff_core::types::QuotaPolicyId,
1669        _dimension: &str,
1670        args: ff_core::contracts::CheckAdmissionArgs,
1671    ) -> Result<ff_core::contracts::CheckAdmissionResult, EngineError> {
1672        crate::typed_ops::check_admission(
1673            self.pool(),
1674            &self.partition_config,
1675            quota_policy_id,
1676            args,
1677        )
1678        .await
1679    }
1680
1681    #[cfg(feature = "core")]
1682    async fn evaluate_flow_eligibility(
1683        &self,
1684        args: ff_core::contracts::EvaluateFlowEligibilityArgs,
1685    ) -> Result<ff_core::contracts::EvaluateFlowEligibilityResult, EngineError> {
1686        crate::typed_ops::evaluate_flow_eligibility(self.pool(), args).await
1687    }
1688
1689    #[cfg(feature = "core")]
1690    async fn claim_execution(
1691        &self,
1692        args: ff_core::contracts::ClaimExecutionArgs,
1693    ) -> Result<ff_core::contracts::ClaimExecutionResult, EngineError> {
1694        crate::typed_ops::claim_execution(self.pool(), &self.partition_config, args).await
1695    }
1696
1697    // Cairn #454 Phase 4c — backend-atomic `issue_claim_grant` +
1698    // `claim_execution` composition. sqlx tx ensures a mid-op crash
1699    // cannot leak a dangling grant (auto-rollback on drop).
1700    #[cfg(feature = "core")]
1701    #[tracing::instrument(name = "pg.issue_grant_and_claim", skip_all)]
1702    async fn issue_grant_and_claim(
1703        &self,
1704        args: ff_core::contracts::IssueGrantAndClaimArgs,
1705    ) -> Result<ff_core::contracts::ClaimGrantOutcome, EngineError> {
1706        crate::typed_ops::issue_grant_and_claim(self.pool(), &self.partition_config, args).await
1707    }
1708
1709    // ── PR-7b Wave 0a: exec_core field read ──
1710
1711    async fn read_exec_core_fields(
1712        &self,
1713        partition: ff_core::partition::Partition,
1714        execution_id: &ff_core::types::ExecutionId,
1715        fields: &[&str],
1716    ) -> Result<std::collections::HashMap<String, Option<String>>, EngineError> {
1717        if fields.is_empty() {
1718            return Ok(std::collections::HashMap::new());
1719        }
1720        // Cross-check: `partition` and `execution_id.partition()` must
1721        // agree. A mismatch would silently read the wrong row (or miss)
1722        // on Valkey via the `{p:N}` key tag, so surface it explicitly
1723        // as a validation error here.
1724        let derived: u16 = execution_id.partition();
1725        if partition.index != derived {
1726            return Err(EngineError::Validation {
1727                kind: ff_core::engine_error::ValidationKind::InvalidInput,
1728                detail: format!(
1729                    "read_exec_core_fields: partition mismatch (arg={}, eid={})",
1730                    partition.index, derived
1731                ),
1732            });
1733        }
1734        let partition_key: i16 = partition.index as i16;
1735        let exec_uuid = crate::exec_core::eid_uuid(execution_id);
1736
1737        // Build a single SELECT that projects each requested field to
1738        // a text value. Fields are classified:
1739        //  - canonical columns (lane_id, lifecycle_phase,
1740        //    ownership_state, eligibility_state, public_state,
1741        //    attempt_state, blocking_reason, cancellation_reason,
1742        //    cancelled_by, attempt_index, flow_id, priority,
1743        //    created_at_ms, terminal_at_ms, deadline_at_ms): CAST the
1744        //    column to text. Scanner-facing aliases (current_attempt_index
1745        //    → attempt_index, completed_at → terminal_at_ms,
1746        //    cancel_reason → cancellation_reason) project the canonical
1747        //    column.
1748        //  - `required_capabilities` is `text[]` on PG; projected as CSV
1749        //    via `array_to_string(..., ',')` to match Valkey's HMGET
1750        //    string shape.
1751        //  - `raw_fields` JSONB-resident names (current_waitpoint_id,
1752        //    current_worker_instance_id, budget_ids, quota_policy_id)
1753        //    project via `raw_fields ->> '<field>'`.
1754        //  - Unknown names project NULL (absent-field parity with
1755        //    Valkey HMGET).
1756        let mut projections: Vec<String> = Vec::with_capacity(fields.len());
1757        for field in fields {
1758            let expr = match *field {
1759                // Canonical exec_core columns.
1760                "lane_id" | "lifecycle_phase" | "ownership_state" | "eligibility_state"
1761                | "public_state" | "attempt_state" | "blocking_reason" | "cancellation_reason"
1762                | "cancelled_by" => format!("{f}::text", f = field),
1763                "attempt_index" => "attempt_index::text".to_string(),
1764                "flow_id" => "flow_id::text".to_string(),
1765                "priority" => "priority::text".to_string(),
1766                "created_at_ms" => "created_at_ms::text".to_string(),
1767                "terminal_at_ms" => "terminal_at_ms::text".to_string(),
1768                "deadline_at_ms" => "deadline_at_ms::text".to_string(),
1769                // Scanner-facing aliases that scan the same data from
1770                // different angles.
1771                "current_attempt_index" => "attempt_index::text".to_string(),
1772                "completed_at" => "terminal_at_ms::text".to_string(),
1773                "cancel_reason" => "cancellation_reason::text".to_string(),
1774                // required_capabilities is `text[]` on PG; scanner
1775                // callers expect a CSV string on Valkey. Convert.
1776                "required_capabilities" => {
1777                    "array_to_string(required_capabilities, ',')".to_string()
1778                }
1779                // Everything else lives under raw_fields JSONB.
1780                other => {
1781                    // Strict allowlist of raw_fields names the scanner
1782                    // code reads. Any other name returns NULL.
1783                    match other {
1784                        "current_waitpoint_id"
1785                        | "current_worker_instance_id"
1786                        | "budget_ids"
1787                        | "quota_policy_id" => format!("raw_fields ->> '{other}'"),
1788                        _ => "NULL".to_string(),
1789                    }
1790                }
1791            };
1792            projections.push(expr);
1793        }
1794        let projection_sql = projections.join(", ");
1795        let query = format!(
1796            "SELECT {projection_sql} FROM ff_exec_core \
1797             WHERE partition_key = $1 AND execution_id = $2"
1798        );
1799        let row_opt = sqlx::query(&query)
1800            .bind(partition_key)
1801            .bind(exec_uuid)
1802            .fetch_optional(self.pool())
1803            .await
1804            .map_err(|e| EngineError::Transport {
1805                backend: "postgres",
1806                source: format!("read_exec_core_fields: {e}").into(),
1807            })?;
1808
1809        let mut out = std::collections::HashMap::with_capacity(fields.len());
1810        if let Some(row) = row_opt {
1811            use sqlx::Row;
1812            for (idx, field) in fields.iter().enumerate() {
1813                let val: Option<String> =
1814                    row.try_get(idx).map_err(|e| EngineError::Transport {
1815                        backend: "postgres",
1816                        source: format!("read_exec_core_fields[{field}]: {e}").into(),
1817                    })?;
1818                out.insert((*field).to_string(), val);
1819            }
1820        } else {
1821            for field in fields {
1822                out.insert((*field).to_string(), None);
1823            }
1824        }
1825        Ok(out)
1826    }
1827
1828    // ── PR-7b Wave 0a: clock primitive ──
1829
1830    async fn server_time_ms(&self) -> Result<u64, EngineError> {
1831        // `clock_timestamp()` — `now()` returns the transaction start
1832        // timestamp and would be stale under any long-running tx.
1833        // Scanners use this to compute "due" windows, so the wall-
1834        // clock read must be fresh. Matches the `flow.rs` convention.
1835        let ms: i64 = sqlx::query_scalar(
1836            "SELECT (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint",
1837        )
1838            .fetch_one(self.pool())
1839            .await
1840            .map_err(|e| EngineError::Transport {
1841                backend: "postgres",
1842                source: format!("server_time_ms: {e}").into(),
1843            })?;
1844        if ms < 0 {
1845            return Err(EngineError::Transport {
1846                backend: "postgres",
1847                source: "server_time_ms: negative epoch".into(),
1848            });
1849        }
1850        Ok(ms as u64)
1851    }
1852
1853    // ── RFC-025 Phase 3 — worker registry ───────────────────────
1854    //
1855    // Bodies live in `crate::worker_registry`; the overrides here
1856    // just forward to those free functions so the trait
1857    // `#[cfg(feature = ..)]` gates match the module-level gate.
1858
1859    #[cfg(feature = "core")]
1860    #[tracing::instrument(name = "pg.register_worker", skip_all)]
1861    async fn register_worker(
1862        &self,
1863        args: ff_core::contracts::RegisterWorkerArgs,
1864    ) -> Result<ff_core::contracts::RegisterWorkerOutcome, EngineError> {
1865        worker_registry::register_worker(&self.pool, args).await
1866    }
1867
1868    #[cfg(feature = "core")]
1869    #[tracing::instrument(name = "pg.heartbeat_worker", skip_all)]
1870    async fn heartbeat_worker(
1871        &self,
1872        args: ff_core::contracts::HeartbeatWorkerArgs,
1873    ) -> Result<ff_core::contracts::HeartbeatWorkerOutcome, EngineError> {
1874        worker_registry::heartbeat_worker(&self.pool, args).await
1875    }
1876
1877    #[cfg(feature = "core")]
1878    #[tracing::instrument(name = "pg.mark_worker_dead", skip_all)]
1879    async fn mark_worker_dead(
1880        &self,
1881        args: ff_core::contracts::MarkWorkerDeadArgs,
1882    ) -> Result<ff_core::contracts::MarkWorkerDeadOutcome, EngineError> {
1883        worker_registry::mark_worker_dead(&self.pool, args).await
1884    }
1885
1886    // List-expired-leases joins ff_attempt+ff_exec_core, which live in
1887    // the `worker_registry` module (gated on `core`). Require both
1888    // features to keep the body's dep chain intact; a standalone
1889    // suspension-only build has no lease-bearing tables to scan.
1890    #[cfg(all(feature = "core", feature = "suspension"))]
1891    #[tracing::instrument(name = "pg.list_expired_leases", skip_all)]
1892    async fn list_expired_leases(
1893        &self,
1894        args: ff_core::contracts::ListExpiredLeasesArgs,
1895    ) -> Result<ff_core::contracts::ListExpiredLeasesResult, EngineError> {
1896        worker_registry::list_expired_leases(&self.pool, args).await
1897    }
1898
1899    #[cfg(feature = "core")]
1900    #[tracing::instrument(name = "pg.list_workers", skip_all)]
1901    async fn list_workers(
1902        &self,
1903        args: ff_core::contracts::ListWorkersArgs,
1904    ) -> Result<ff_core::contracts::ListWorkersResult, EngineError> {
1905        worker_registry::list_workers(&self.pool, args).await
1906    }
1907}
1908
1909/// Resolve a `CompletionPayload` to the matching
1910/// `ff_completion_event.event_id` for PR-7b cascade dispatch.
1911///
1912/// Mirrors the cursor-walk previously inlined in
1913/// `ff-engine::completion_listener::run_completion_listener_postgres` —
1914/// keyed on `(partition_key, execution_id_uuid, occurred_at_ms)`. The
1915/// `partition_key` scoping is what makes this hit the
1916/// `ff_completion_event_lookup_idx` composite instead of devolving into
1917/// a cross-partition seq-scan; it's recoverable from the textual
1918/// `ExecutionId` (`"<partition>:<uuid>"`).
1919///
1920/// Returns `None` if the outbox row isn't visible yet (race with the
1921/// producing tx), the payload's execution id can't be parsed, or the
1922/// partition prefix isn't an `i16` — all recoverable via the
1923/// dependency_reconciler backstop. Transient sqlx errors are logged at
1924/// `warn` with the query inputs and also fall back to `None`; the
1925/// listener retries on the next payload and the reconciler covers the
1926/// worst case.
1927async fn resolve_event_id(
1928    pool: &PgPool,
1929    payload: &ff_core::backend::CompletionPayload,
1930) -> Option<i64> {
1931    let eid_str = payload.execution_id.as_str();
1932    // ExecutionId is `{fp:N}:<uuid>`; split on the rightmost `:` so the
1933    // `{fp:N}` hash-tag prefix stays intact.
1934    let uuid_str = eid_str.rsplit_once(':').map(|(_, u)| u)?;
1935    let uuid = uuid::Uuid::parse_str(uuid_str).ok()?;
1936    // The payload's ExecutionId is already validated (construction
1937    // enforces the `{fp:N}:<uuid>` shape), so `.partition()` is
1938    // infallible. Narrow to `i16` for the partition_key column.
1939    let partition_key = i16::try_from(payload.execution_id.partition()).ok()?;
1940    let occurred_at_ms = payload.produced_at_ms.0;
1941
1942    match sqlx::query_scalar::<_, i64>(
1943        "SELECT event_id FROM ff_completion_event \
1944         WHERE partition_key = $1 AND execution_id = $2 AND occurred_at_ms = $3 \
1945         ORDER BY event_id ASC LIMIT 1",
1946    )
1947    .bind(partition_key)
1948    .bind(uuid)
1949    .bind(occurred_at_ms)
1950    .fetch_optional(pool)
1951    .await
1952    {
1953        Ok(row) => row,
1954        Err(err) => {
1955            tracing::warn!(
1956                partition_key,
1957                execution_id = %uuid,
1958                occurred_at_ms,
1959                error = %err,
1960                "resolve_event_id: ff_completion_event lookup failed; falling back to \
1961                 dependency_reconciler backstop"
1962            );
1963            None
1964        }
1965    }
1966}
1967
1968/// Narrow a `Partition`'s `u16` index into the `i16` the Postgres
1969/// schema uses as its partition-key column. FlowFabric partitions
1970/// max out well below `i16::MAX` in practice (production deployments
1971/// run a few hundred partitions per family), but the conversion is
1972/// still fallible at the type level. Surface overflow as
1973/// `ValidationKind::InvalidInput` rather than silently substituting a
1974/// fallback — a silently mis-routed reconciler would
1975/// corrupt-by-omission without diagnostics.
1976fn partition_index_to_i16(partition: Partition) -> Result<i16, EngineError> {
1977    i16::try_from(partition.index).map_err(|_| EngineError::Validation {
1978        kind: ff_core::engine_error::ValidationKind::InvalidInput,
1979        detail: format!(
1980            "partition index {} exceeds i16 range (max {})",
1981            partition.index,
1982            i16::MAX
1983        ),
1984    })
1985}
1986
1987/// Minimum recommended `max_locks_per_transaction`. Partition-heavy
1988/// schemas (256 hash partitions per logical table) can exceed the
1989/// Postgres default of `64` per tx under modest concurrent bench
1990/// load — the Wave 7c bench hit `out of shared memory` at 16 workers
1991/// × 10k tasks with the default and unblocked at `512`. We warn at
1992/// boot rather than hard-fail because operators may legitimately
1993/// run with a tuned value that still exceeds 64 but sits below our
1994/// threshold.
1995const MIN_MAX_LOCKS_PER_TRANSACTION: i64 = 256;
1996
1997/// Probe `max_locks_per_transaction` at connect time + log a warning
1998/// when the current value is below the production-safe threshold.
1999/// Never fails the connect — probe errors are logged at debug and
2000/// swallowed (pg_show may be restricted on exotic deploys).
2001async fn warn_if_max_locks_low(pool: &PgPool) {
2002    let row: Result<(String,), sqlx::Error> =
2003        sqlx::query_as("SHOW max_locks_per_transaction")
2004            .fetch_one(pool)
2005            .await;
2006    match row {
2007        Ok((raw,)) => emit_max_locks_decision(&raw),
2008        Err(e) => {
2009            tracing::debug!("failed to probe max_locks_per_transaction: {e}");
2010        }
2011    }
2012}
2013
2014/// Pure decision surface for the max-locks probe — extracted for
2015/// unit-testability (the live probe is gated by a running Postgres).
2016/// Returns the integer value when a warning SHOULD fire, `None`
2017/// otherwise (either the raw is valid + at/above threshold, or the
2018/// raw is unparseable — the latter is debug-only).
2019fn max_locks_warn_value(raw: &str) -> Option<i64> {
2020    match raw.parse::<i64>() {
2021        Ok(v) if v < MIN_MAX_LOCKS_PER_TRANSACTION => Some(v),
2022        Ok(_) => None,
2023        Err(e) => {
2024            tracing::debug!(raw, "failed to parse max_locks_per_transaction: {e}");
2025            None
2026        }
2027    }
2028}
2029
2030fn emit_max_locks_decision(raw: &str) {
2031    if let Some(v) = max_locks_warn_value(raw) {
2032        tracing::warn!(
2033            current = v,
2034            recommended = MIN_MAX_LOCKS_PER_TRANSACTION,
2035            "postgres max_locks_per_transaction={v} is below the recommended \
2036             minimum ({MIN_MAX_LOCKS_PER_TRANSACTION}); partition-heavy workloads \
2037             may hit 'out of shared memory' under concurrent load. \
2038             See docs/operator-guide-postgres.md."
2039        );
2040    }
2041}
2042
2043#[cfg(test)]
2044mod max_locks_tests {
2045    use super::{max_locks_warn_value, MIN_MAX_LOCKS_PER_TRANSACTION};
2046
2047    #[test]
2048    fn warns_when_below_threshold() {
2049        assert_eq!(max_locks_warn_value("64"), Some(64));
2050        assert_eq!(
2051            max_locks_warn_value(&(MIN_MAX_LOCKS_PER_TRANSACTION - 1).to_string()),
2052            Some(MIN_MAX_LOCKS_PER_TRANSACTION - 1)
2053        );
2054    }
2055
2056    #[test]
2057    fn silent_at_or_above_threshold() {
2058        assert_eq!(
2059            max_locks_warn_value(&MIN_MAX_LOCKS_PER_TRANSACTION.to_string()),
2060            None
2061        );
2062        assert_eq!(max_locks_warn_value("1024"), None);
2063    }
2064
2065    #[test]
2066    fn silent_for_unparseable_raw() {
2067        assert_eq!(max_locks_warn_value("not-a-number"), None);
2068    }
2069}
2070
2071#[cfg(test)]
2072mod partition_index_tests {
2073    use super::partition_index_to_i16;
2074    use ff_core::engine_error::{EngineError, ValidationKind};
2075    use ff_core::partition::{Partition, PartitionFamily};
2076
2077    #[test]
2078    fn accepts_values_within_i16_range() {
2079        let p = Partition { family: PartitionFamily::Flow, index: 0 };
2080        assert_eq!(partition_index_to_i16(p).unwrap(), 0);
2081
2082        let p = Partition { family: PartitionFamily::Flow, index: 255 };
2083        assert_eq!(partition_index_to_i16(p).unwrap(), 255);
2084
2085        let p = Partition { family: PartitionFamily::Budget, index: i16::MAX as u16 };
2086        assert_eq!(partition_index_to_i16(p).unwrap(), i16::MAX);
2087    }
2088
2089    #[test]
2090    fn rejects_overflow_above_i16_max() {
2091        let p = Partition { family: PartitionFamily::Flow, index: (i16::MAX as u16) + 1 };
2092        let err = partition_index_to_i16(p).unwrap_err();
2093        match err {
2094            EngineError::Validation { kind, detail } => {
2095                assert_eq!(kind, ValidationKind::InvalidInput);
2096                assert!(detail.contains("exceeds i16 range"), "unexpected detail: {detail}");
2097            }
2098            other => panic!("expected Validation error, got {other:?}"),
2099        }
2100    }
2101
2102    #[test]
2103    fn rejects_u16_max() {
2104        let p = Partition { family: PartitionFamily::Quota, index: u16::MAX };
2105        assert!(matches!(
2106            partition_index_to_i16(p),
2107            Err(EngineError::Validation { kind: ValidationKind::InvalidInput, .. })
2108        ));
2109    }
2110}