Skip to main content

ff_backend_postgres/
lib.rs

1//! `EngineBackend` implementation backed by Postgres.
2//!
3//! **RFC-v0.7 Wave 0 — scaffold.** This crate lands the
4//! [`PostgresBackend`] struct + the `impl EngineBackend` block with
5//! every method stubbed to return `EngineError::Unavailable`.
6//! Subsequent waves fill in bodies:
7//!
8//! * Wave 1: cross-cutting error/helpers.
9//! * Wave 2: describe / list / resolve (read surface).
10//! * Wave 3: schema migrations (replaces the Wave 0 placeholder).
11//! * Wave 4: LISTEN/NOTIFY wiring + stream reads/tails.
12//! * Wave 5-7: hot-path write methods.
13//! * Wave 8: ff-server wire-up + dual-backend running.
14//!
15//! The trait stays object-safe; consumers can hold
16//! `Arc<dyn EngineBackend>`. No ferriskey dep — this crate's
17//! transport is `sqlx`.
18
19#![allow(clippy::result_large_err)]
20
21use std::sync::Arc;
22use std::time::Duration;
23
24use async_trait::async_trait;
25
26use ff_core::backend::{
27    AppendFrameOutcome, BackendConfig, CancelFlowPolicy, CancelFlowWait, CapabilitySet,
28    ClaimPolicy, FailOutcome, FailureClass, FailureReason, Frame, Handle, LeaseRenewal,
29    PendingWaitpoint, ResumeToken, ResumeSignal, SummaryDocument, TailVisibility,
30    UsageDimensions,
31};
32#[cfg(feature = "core")]
33use ff_core::contracts::{
34    AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
35    ApplyDependencyToChildResult, ClaimResumedExecutionArgs, ClaimResumedExecutionResult,
36    CreateExecutionArgs, CreateExecutionResult, CreateFlowArgs, CreateFlowResult,
37    DeliverApprovalSignalArgs, DeliverSignalArgs, DeliverSignalResult, EdgeDependencyPolicy,
38    EdgeDirection, EdgeSnapshot,
39    ListExecutionsPage, ListFlowsPage, ListLanesPage, ListPendingWaitpointsArgs,
40    ListPendingWaitpointsResult, ListSuspendedPage, SetEdgeGroupPolicyResult,
41    StageDependencyEdgeArgs, StageDependencyEdgeResult,
42};
43#[cfg(feature = "core")]
44use ff_core::state::PublicState;
45use ff_core::contracts::{
46    CancelFlowResult, ExecutionContext, ExecutionSnapshot, FlowSnapshot, IssueReclaimGrantArgs,
47    IssueReclaimGrantOutcome, ReclaimExecutionArgs, ReclaimExecutionOutcome, ReportUsageResult,
48    RotateWaitpointHmacSecretAllArgs, RotateWaitpointHmacSecretAllResult, SeedOutcome,
49    SeedWaitpointHmacSecretArgs, SuspendArgs, SuspendOutcome,
50};
51#[cfg(feature = "core")]
52use ff_core::contracts::ExecutionInfo;
53// RFC-020 Wave 9 Spine-A pt.1 — operator-control mutating surfaces.
54#[cfg(feature = "core")]
55use ff_core::contracts::{
56    CancelExecutionArgs, CancelExecutionResult, RevokeLeaseArgs, RevokeLeaseResult,
57};
58// RFC-020 Wave 9 Spine-A pt.2 — operator-control + flow-cancel mutating surfaces.
59#[cfg(feature = "core")]
60use ff_core::contracts::{
61    CancelFlowArgs, CancelFlowHeader, ChangePriorityArgs, ChangePriorityResult,
62    ReplayExecutionArgs, ReplayExecutionResult,
63};
64// RFC-020 Wave 9 Standalone-1 — budget/quota admin surfaces.
65#[cfg(feature = "core")]
66use ff_core::contracts::{
67    BudgetStatus, CreateBudgetArgs, CreateBudgetResult, CreateQuotaPolicyArgs,
68    CreateQuotaPolicyResult, ReportUsageAdminArgs, ResetBudgetArgs, ResetBudgetResult,
69};
70#[cfg(feature = "streaming")]
71use ff_core::contracts::{StreamCursor, StreamFrames};
72use ff_core::engine_backend::{EngineBackend, ExpirePhase};
73use ff_core::engine_error::EngineError;
74#[cfg(feature = "core")]
75use ff_core::partition::PartitionKey;
76use ff_core::partition::{Partition, PartitionConfig};
77#[cfg(feature = "streaming")]
78use ff_core::types::AttemptIndex;
79#[cfg(feature = "core")]
80use ff_core::types::EdgeId;
81use ff_core::types::{BudgetId, ExecutionId, FlowId, LaneId, LeaseFence, TimestampMs};
82// Wave 5a — re-export `PgPool` so crates that depend on
83// `ff-backend-postgres` (and not `sqlx` directly) can name the pool
84// type in their own APIs (e.g. `ff-engine::dispatch_via_postgres`).
85pub use sqlx::PgPool;
86
87#[cfg(feature = "core")]
88mod admin;
89pub mod attempt;
90pub mod budget;
91pub mod claim_grant;
92pub mod completion;
93#[cfg(feature = "core")]
94pub mod dispatch;
95pub mod error;
96pub mod exec_core;
97pub mod flow;
98#[cfg(feature = "core")]
99pub mod flow_staging;
100pub mod handle_codec;
101mod lease_event;
102mod lease_event_subscribe;
103pub mod listener;
104pub mod migrate;
105#[cfg(feature = "core")]
106pub mod operator;
107#[cfg(feature = "core")]
108mod operator_event;
109pub mod pool;
110#[cfg(feature = "core")]
111pub mod reconcilers;
112#[cfg(feature = "core")]
113pub mod scanner_supervisor;
114#[cfg(feature = "core")]
115pub mod scheduler;
116pub mod signal;
117mod signal_delivery_subscribe;
118mod signal_event;
119#[cfg(feature = "streaming")]
120pub mod stream;
121pub mod suspend;
122pub mod suspend_ops;
123#[cfg(feature = "core")]
124pub(crate) mod typed_ops;
125pub mod version;
126
127pub use completion::{PostgresCompletionStream, COMPLETION_CHANNEL};
128pub use error::{map_sqlx_error, PostgresTransportError};
129pub use listener::StreamNotifier;
130pub use migrate::{apply_migrations, MigrationError};
131#[cfg(feature = "core")]
132pub use scanner_supervisor::{PostgresScannerConfig, PostgresScannerHandle};
133pub use version::check_schema_version;
134
135// Re-export the new `PostgresConnection` shape so consumers can name
136// it from this crate directly without dipping into `ff_core::backend`.
137// `BackendConfig` is already imported above and is part of the
138// `connect()` signature, so it re-exports transparently via
139// rustdoc — no explicit `pub use` needed.
140pub use ff_core::backend::PostgresConnection;
141
142/// Postgres-backed `EngineBackend`.
143///
144/// Wave 0 shape: holds a `sqlx::PgPool`, the deployment's
145/// [`PartitionConfig`] (Q5 — partition column survives on Postgres
146/// with hash partitioning across the same 256 slots Valkey uses),
147/// and an optional `ff_observability::Metrics` handle mirroring
148/// [`ff_backend_valkey::ValkeyBackend`]. Future waves add the
149/// [`StreamNotifier`] handle once Wave 4 wires up LISTEN/NOTIFY.
150/// RFC-018 Stage A: build a [`ff_core::capability::Supports`]
151/// snapshot for the Postgres backend at v0.9. `true` fields correspond
152/// to trait methods `PostgresBackend` overrides with a real body
153/// (ingress, scheduler, seed + rotate HMAC, flow cancel bulk path,
154/// stream reads, RFC-019 subscriptions, cross-cutting). `false` fields
155/// correspond to trait methods that still return
156/// `EngineError::Unavailable` on Postgres today — Wave 9 follow-up
157/// scope. See `docs/POSTGRES_PARITY_MATRIX.md` for the authoritative
158/// per-row status.
159///
160/// `prepare` is `true` on Postgres even though `prepare()` returns
161/// `PrepareOutcome::NoOp` (schema migrations are applied out-of-band).
162/// `Supports.prepare` means "can the consumer call `backend.prepare()`
163/// without getting `EngineError::Unavailable`?" — for Postgres the
164/// answer is yes; NoOp is a successful well-defined outcome. Gating
165/// the call off in consumer UIs based on a `false` bool would hide
166/// a callable + correct method.
167///
168/// `Supports` is `#[non_exhaustive]` so struct-literal construction
169/// from this crate is forbidden; we start from
170/// [`ff_core::capability::Supports::none`] and mutate named fields.
171fn postgres_supports_base() -> ff_core::capability::Supports {
172    let mut s = ff_core::capability::Supports::none();
173
174    // ── Flow bulk cancel (impl) ──
175    s.cancel_flow_wait_timeout = true;
176    s.cancel_flow_wait_indefinite = true;
177
178    // ── Admin seed + rotate HMAC (impl) ──
179    s.rotate_waitpoint_hmac_secret_all = true;
180    s.seed_waitpoint_hmac_secret = true;
181
182    // ── Scheduler ──
183    s.claim_for_worker = true;
184
185    // ── RFC-019 subscriptions ──
186    s.subscribe_lease_history = true;
187    s.subscribe_completion = true;
188    s.subscribe_signal_delivery = true;
189    s.subscribe_instance_tags = false;
190
191    // ── Streaming (RFC-015) ──
192    s.stream_durable_summary = true;
193    s.stream_best_effort_live = true;
194
195    // ── Boot (Postgres returns NoOp but call is callable + correct) ──
196    s.prepare = true;
197
198    // ── Wave 9 (v0.11) — operator control + read model + budget/quota
199    //    admin + list_pending_waitpoints + cancel_flow_header +
200    //    ack_cancel_member all ship concretely on Postgres via
201    //    RFC-020 Rev 7. subscribe_instance_tags remains `false` per
202    //    #311 (speculative demand, served by list_executions +
203    //    ScannerFilter::with_instance_tag today).
204    s.cancel_execution = true;
205    s.change_priority = true;
206    s.replay_execution = true;
207    s.revoke_lease = true;
208    s.read_execution_state = true;
209    s.read_execution_info = true;
210    s.get_execution_result = true;
211    s.budget_admin = true;
212    s.quota_admin = true;
213    s.list_pending_waitpoints = true;
214    s.cancel_flow_header = true;
215    s.ack_cancel_member = true;
216
217    s
218}
219
220pub struct PostgresBackend {
221    #[allow(dead_code)] // filled in across waves 2-7
222    pool: PgPool,
223    #[allow(dead_code)]
224    partition_config: PartitionConfig,
225    #[allow(dead_code)]
226    metrics: Option<Arc<ff_observability::Metrics>>,
227    /// Wave 4: shared LISTEN notifier. Present on `connect()`-built
228    /// backends; `None` on bare `from_pool` constructions that skip
229    /// LISTEN wiring (tests that only exercise the write path).
230    #[allow(dead_code)]
231    stream_notifier: Option<Arc<StreamNotifier>>,
232    /// RFC-017 Wave 8 Stage E3: scanner supervisor handle. Spawned
233    /// during [`Self::connect_with_metrics`] when the caller opts in
234    /// via [`Self::spawn_scanners_during_connect`]; drained on
235    /// [`EngineBackend::shutdown_prepare`]. `None` on `from_pool` /
236    /// test builds that don't want background reconcilers.
237    #[cfg(feature = "core")]
238    scanner_handle: Option<Arc<scanner_supervisor::PostgresScannerHandle>>,
239}
240
241impl PostgresBackend {
242    /// Dial Postgres with [`BackendConfig`] and return the backend as
243    /// `Arc<dyn EngineBackend>`. Modeled on
244    /// [`ff_backend_valkey::ValkeyBackend::connect`] so ff-server /
245    /// SDK call sites can swap backends without changing the
246    /// constructor shape.
247    ///
248    /// **Wave 0:** builds the pool and constructs the backend. Does
249    /// NOT run migrations (Q12 — operator out-of-band). Does NOT run
250    /// the schema-version check (Wave 3 adds the version const and
251    /// wires [`check_schema_version`] in). Does NOT start the LISTEN
252    /// task (Wave 4).
253    ///
254    /// Returns `EngineError::Unavailable` when the config's
255    /// connection arm is not Postgres.
256    pub async fn connect(config: BackendConfig) -> Result<Arc<dyn EngineBackend>, EngineError> {
257        let pool = pool::build_pool(&config).await?;
258        warn_if_max_locks_low(&pool).await;
259        let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
260        let backend = Self {
261            pool,
262            partition_config: PartitionConfig::default(),
263            metrics: None,
264            stream_notifier,
265            #[cfg(feature = "core")]
266            scanner_handle: None,
267        };
268        Ok(Arc::new(backend))
269    }
270
271    /// Test / advanced constructor: build a `PostgresBackend` from an
272    /// already-constructed `PgPool` + explicit partition config. No
273    /// network I/O. Useful for integration tests against a shared
274    /// pool and for a future migration CLI that wants to reuse a pool
275    /// across migrate-run + smoke-check.
276    pub fn from_pool(pool: PgPool, partition_config: PartitionConfig) -> Arc<Self> {
277        let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
278        Arc::new(Self {
279            pool,
280            partition_config,
281            metrics: None,
282            stream_notifier,
283            #[cfg(feature = "core")]
284            scanner_handle: None,
285        })
286    }
287
288    /// RFC-017 Wave 8 Stage E1: dial Postgres with an explicit
289    /// [`PartitionConfig`] + shared [`ff_observability::Metrics`].
290    /// Mirrors [`ff_backend_valkey::ValkeyBackend::connect_with_metrics`]
291    /// so `ff-server::Server::start_with_metrics` can wire the Postgres
292    /// branch without reaching into the pool builder directly.
293    ///
294    /// Returns a concrete `Arc<Self>` rather than `Arc<dyn EngineBackend>`
295    /// so the caller can cast to the trait object after any additional
296    /// field installs (parallel to the Valkey path which calls
297    /// `with_scheduler` / `with_stream_semaphore_permits` before the
298    /// cast). Stage E1 does NOT run `apply_migrations` — schema
299    /// provisioning is an operator concern (matches the Wave 0 contract
300    /// on [`Self::connect`]).
301    pub async fn connect_with_metrics(
302        config: BackendConfig,
303        partition_config: PartitionConfig,
304        metrics: Arc<ff_observability::Metrics>,
305    ) -> Result<Arc<Self>, EngineError> {
306        let pool = pool::build_pool(&config).await?;
307        warn_if_max_locks_low(&pool).await;
308        let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
309        Ok(Arc::new(Self {
310            pool,
311            partition_config,
312            metrics: Some(metrics),
313            stream_notifier,
314            #[cfg(feature = "core")]
315            scanner_handle: None,
316        }))
317    }
318
319    /// RFC-017 Wave 8 Stage E3: spawn the six Postgres reconcilers as
320    /// background tick loops. Returns `true` if the scanner handle
321    /// was installed; `false` if the `Arc<Self>` has outstanding
322    /// clones (mirrors the Valkey `with_*` pattern). Callers must
323    /// invoke this before publishing the `Arc<dyn EngineBackend>` so
324    /// the underlying `Arc::get_mut` succeeds.
325    #[cfg(feature = "core")]
326    pub fn with_scanners(
327        self: &mut Arc<Self>,
328        cfg: scanner_supervisor::PostgresScannerConfig,
329    ) -> bool {
330        let Some(inner) = Arc::get_mut(self) else {
331            return false;
332        };
333        let handle = scanner_supervisor::spawn_scanners(inner.pool.clone(), cfg);
334        inner.scanner_handle = Some(Arc::new(handle));
335        true
336    }
337
338    /// Accessor for the underlying `PgPool`. Stage E1 uses this so
339    /// `ff-server::Server::start_with_metrics` can run
340    /// [`apply_migrations`] on the same pool before handing the backend
341    /// out as `Arc<dyn EngineBackend>`.
342    pub fn pool(&self) -> &PgPool {
343        &self.pool
344    }
345
346    /// Create one execution row (+ seed the lane registry if new).
347    ///
348    /// **RFC-017 Stage A:** this inherent method is retained as a
349    /// thin wrapper around the module-level impl so existing in-tree
350    /// callers (ff-server request handlers, integration tests) keep
351    /// compiling. The trait-lifted entry point is
352    /// [`EngineBackend::create_execution`] below, which calls the
353    /// same impl. Return shape differs — inherent returns
354    /// `ExecutionId`, trait returns
355    /// [`CreateExecutionResult`] per RFC-017 §5 — so we cannot simply
356    /// replace the inherent method. A follow-up PR may deprecate
357    /// this inherent alongside the broader ingress shape alignment.
358    #[cfg(feature = "core")]
359    #[tracing::instrument(name = "pg.create_execution", skip_all)]
360    pub async fn create_execution(
361        &self,
362        args: CreateExecutionArgs,
363    ) -> Result<ExecutionId, EngineError> {
364        exec_core::create_execution_impl(&self.pool, &self.partition_config, args).await
365    }
366
367    // ── RFC-017 Stage A: inherent ingress methods retained for
368    // back-compat with in-tree test harnesses + ff-server direct
369    // calls. The trait-lifted peers (`EngineBackend::create_flow`
370    // etc.) delegate to the SAME module-level impls under the hood.
371    // Follow-up PR may sunset these inherents once all in-tree
372    // consumers route through `Arc<dyn EngineBackend>`.
373
374    #[cfg(feature = "core")]
375    #[tracing::instrument(name = "pg.create_flow.inherent", skip_all)]
376    pub async fn create_flow(
377        &self,
378        args: &CreateFlowArgs,
379    ) -> Result<CreateFlowResult, EngineError> {
380        flow_staging::create_flow(&self.pool, &self.partition_config, args).await
381    }
382
383    #[cfg(feature = "core")]
384    #[tracing::instrument(name = "pg.add_execution_to_flow.inherent", skip_all)]
385    pub async fn add_execution_to_flow(
386        &self,
387        args: &AddExecutionToFlowArgs,
388    ) -> Result<AddExecutionToFlowResult, EngineError> {
389        flow_staging::add_execution_to_flow(&self.pool, &self.partition_config, args).await
390    }
391
392    #[cfg(feature = "core")]
393    #[tracing::instrument(name = "pg.stage_dependency_edge.inherent", skip_all)]
394    pub async fn stage_dependency_edge(
395        &self,
396        args: &StageDependencyEdgeArgs,
397    ) -> Result<StageDependencyEdgeResult, EngineError> {
398        flow_staging::stage_dependency_edge(&self.pool, &self.partition_config, args).await
399    }
400
401    #[cfg(feature = "core")]
402    #[tracing::instrument(name = "pg.apply_dependency_to_child.inherent", skip_all)]
403    pub async fn apply_dependency_to_child(
404        &self,
405        args: &ApplyDependencyToChildArgs,
406    ) -> Result<ApplyDependencyToChildResult, EngineError> {
407        flow_staging::apply_dependency_to_child(&self.pool, &self.partition_config, args).await
408    }
409}
410
411/// Short helper: every stub method returns this. Kept as a function
412/// (rather than a macro) so rust-analyzer / IDE jumps show a single
413/// definition site for the Wave 0 stub pattern.
414#[inline]
415#[cfg_attr(feature = "streaming", allow(dead_code))]
416fn unavailable<T>(op: &'static str) -> Result<T, EngineError> {
417    Err(EngineError::Unavailable { op })
418}
419
420#[async_trait]
421impl EngineBackend for PostgresBackend {
422    // ── Claim + lifecycle ──
423
424    #[tracing::instrument(name = "pg.claim", skip_all)]
425    async fn claim(
426        &self,
427        lane: &LaneId,
428        capabilities: &CapabilitySet,
429        policy: ClaimPolicy,
430    ) -> Result<Option<Handle>, EngineError> {
431        attempt::claim(&self.pool, lane, capabilities, &policy).await
432    }
433
434    #[tracing::instrument(name = "pg.renew", skip_all)]
435    async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError> {
436        attempt::renew(&self.pool, handle).await
437    }
438
439    #[tracing::instrument(name = "pg.progress", skip_all)]
440    async fn progress(
441        &self,
442        handle: &Handle,
443        percent: Option<u8>,
444        message: Option<String>,
445    ) -> Result<(), EngineError> {
446        attempt::progress(&self.pool, handle, percent, message).await
447    }
448
449    #[tracing::instrument(name = "pg.append_frame", skip_all)]
450    async fn append_frame(
451        &self,
452        handle: &Handle,
453        frame: Frame,
454    ) -> Result<AppendFrameOutcome, EngineError> {
455        #[cfg(feature = "streaming")]
456        {
457            stream::append_frame(&self.pool, &self.partition_config, handle, frame).await
458        }
459        #[cfg(not(feature = "streaming"))]
460        {
461            let _ = (handle, frame);
462            unavailable("pg.append_frame")
463        }
464    }
465
466    #[tracing::instrument(name = "pg.complete", skip_all)]
467    async fn complete(
468        &self,
469        handle: &Handle,
470        payload: Option<Vec<u8>>,
471    ) -> Result<(), EngineError> {
472        attempt::complete(&self.pool, handle, payload).await
473    }
474
475    #[tracing::instrument(name = "pg.fail", skip_all)]
476    async fn fail(
477        &self,
478        handle: &Handle,
479        reason: FailureReason,
480        classification: FailureClass,
481    ) -> Result<FailOutcome, EngineError> {
482        attempt::fail(&self.pool, handle, reason, classification).await
483    }
484
485    #[tracing::instrument(name = "pg.cancel", skip_all)]
486    async fn cancel(&self, handle: &Handle, reason: &str) -> Result<(), EngineError> {
487        let payload = handle_codec::decode_handle(handle)?;
488        exec_core::cancel_impl(
489            &self.pool,
490            &self.partition_config,
491            &payload.execution_id,
492            reason,
493        )
494        .await
495    }
496
497    #[tracing::instrument(name = "pg.suspend", skip_all)]
498    async fn suspend(
499        &self,
500        handle: &Handle,
501        args: SuspendArgs,
502    ) -> Result<SuspendOutcome, EngineError> {
503        suspend_ops::suspend_impl(&self.pool, &self.partition_config, handle, args).await
504    }
505
506    #[tracing::instrument(name = "pg.suspend_by_triple", skip_all)]
507    async fn suspend_by_triple(
508        &self,
509        exec_id: ExecutionId,
510        triple: LeaseFence,
511        args: SuspendArgs,
512    ) -> Result<SuspendOutcome, EngineError> {
513        suspend_ops::suspend_by_triple_impl(
514            &self.pool,
515            &self.partition_config,
516            exec_id,
517            triple,
518            args,
519        )
520        .await
521    }
522
523    #[tracing::instrument(name = "pg.create_waitpoint", skip_all)]
524    async fn create_waitpoint(
525        &self,
526        handle: &Handle,
527        waitpoint_key: &str,
528        expires_in: Duration,
529    ) -> Result<PendingWaitpoint, EngineError> {
530        suspend_ops::create_waitpoint_impl(&self.pool, handle, waitpoint_key, expires_in).await
531    }
532
533    #[cfg(feature = "core")]
534    #[tracing::instrument(name = "pg.read_waitpoint_token", skip_all)]
535    async fn read_waitpoint_token(
536        &self,
537        partition: PartitionKey,
538        waitpoint_id: &ff_core::types::WaitpointId,
539    ) -> Result<Option<String>, EngineError> {
540        suspend_ops::read_waitpoint_token_impl(&self.pool, &partition, waitpoint_id).await
541    }
542
543    #[tracing::instrument(name = "pg.observe_signals", skip_all)]
544    async fn observe_signals(
545        &self,
546        handle: &Handle,
547    ) -> Result<Vec<ResumeSignal>, EngineError> {
548        suspend_ops::observe_signals_impl(&self.pool, handle).await
549    }
550
551    #[tracing::instrument(name = "pg.claim_from_resume_grant", skip_all)]
552    async fn claim_from_resume_grant(
553        &self,
554        token: ResumeToken,
555    ) -> Result<Option<Handle>, EngineError> {
556        attempt::claim_from_resume_grant(&self.pool, token).await
557    }
558
559    // RFC-024 PR-D — lease-reclaim grant issuance + consumption.
560
561    #[tracing::instrument(name = "pg.issue_reclaim_grant", skip_all)]
562    async fn issue_reclaim_grant(
563        &self,
564        args: IssueReclaimGrantArgs,
565    ) -> Result<IssueReclaimGrantOutcome, EngineError> {
566        claim_grant::issue_reclaim_grant_impl(&self.pool, args).await
567    }
568
569    #[tracing::instrument(name = "pg.reclaim_execution", skip_all)]
570    async fn reclaim_execution(
571        &self,
572        args: ReclaimExecutionArgs,
573    ) -> Result<ReclaimExecutionOutcome, EngineError> {
574        claim_grant::reclaim_execution_impl(&self.pool, args).await
575    }
576
577    #[tracing::instrument(name = "pg.delay", skip_all)]
578    async fn delay(
579        &self,
580        handle: &Handle,
581        delay_until: TimestampMs,
582    ) -> Result<(), EngineError> {
583        attempt::delay(&self.pool, handle, delay_until).await
584    }
585
586    #[tracing::instrument(name = "pg.wait_children", skip_all)]
587    async fn wait_children(&self, handle: &Handle) -> Result<(), EngineError> {
588        attempt::wait_children(&self.pool, handle).await
589    }
590
591    // ── Read / admin ──
592
593    #[tracing::instrument(name = "pg.describe_execution", skip_all)]
594    async fn describe_execution(
595        &self,
596        id: &ExecutionId,
597    ) -> Result<Option<ExecutionSnapshot>, EngineError> {
598        exec_core::describe_execution_impl(&self.pool, &self.partition_config, id).await
599    }
600
601    #[tracing::instrument(name = "pg.read_execution_context", skip_all)]
602    async fn read_execution_context(
603        &self,
604        execution_id: &ExecutionId,
605    ) -> Result<ExecutionContext, EngineError> {
606        exec_core::read_execution_context_impl(&self.pool, &self.partition_config, execution_id)
607            .await
608    }
609
610    #[tracing::instrument(name = "pg.read_current_attempt_index", skip_all)]
611    async fn read_current_attempt_index(
612        &self,
613        execution_id: &ExecutionId,
614    ) -> Result<ff_core::types::AttemptIndex, EngineError> {
615        exec_core::read_current_attempt_index_impl(
616            &self.pool,
617            &self.partition_config,
618            execution_id,
619        )
620        .await
621    }
622
623    #[tracing::instrument(name = "pg.read_total_attempt_count", skip_all)]
624    async fn read_total_attempt_count(
625        &self,
626        execution_id: &ExecutionId,
627    ) -> Result<ff_core::types::AttemptIndex, EngineError> {
628        exec_core::read_total_attempt_count_impl(
629            &self.pool,
630            &self.partition_config,
631            execution_id,
632        )
633        .await
634    }
635
636    #[tracing::instrument(name = "pg.describe_flow", skip_all)]
637    async fn describe_flow(
638        &self,
639        id: &FlowId,
640    ) -> Result<Option<FlowSnapshot>, EngineError> {
641        flow::describe_flow(&self.pool, &self.partition_config, id).await
642    }
643
644    #[tracing::instrument(name = "pg.set_execution_tag", skip_all)]
645    async fn set_execution_tag(
646        &self,
647        execution_id: &ExecutionId,
648        key: &str,
649        value: &str,
650    ) -> Result<(), EngineError> {
651        ff_core::engine_backend::validate_tag_key(key)?;
652        exec_core::set_execution_tag_impl(&self.pool, execution_id, key, value).await
653    }
654
655    #[tracing::instrument(name = "pg.set_flow_tag", skip_all)]
656    async fn set_flow_tag(
657        &self,
658        flow_id: &FlowId,
659        key: &str,
660        value: &str,
661    ) -> Result<(), EngineError> {
662        ff_core::engine_backend::validate_tag_key(key)?;
663        flow::set_flow_tag_impl(&self.pool, &self.partition_config, flow_id, key, value).await
664    }
665
666    #[tracing::instrument(name = "pg.get_execution_tag", skip_all)]
667    async fn get_execution_tag(
668        &self,
669        execution_id: &ExecutionId,
670        key: &str,
671    ) -> Result<Option<String>, EngineError> {
672        ff_core::engine_backend::validate_tag_key(key)?;
673        exec_core::get_execution_tag_impl(&self.pool, execution_id, key).await
674    }
675
676    #[tracing::instrument(name = "pg.get_flow_tag", skip_all)]
677    async fn get_flow_tag(
678        &self,
679        flow_id: &FlowId,
680        key: &str,
681    ) -> Result<Option<String>, EngineError> {
682        ff_core::engine_backend::validate_tag_key(key)?;
683        flow::get_flow_tag_impl(&self.pool, &self.partition_config, flow_id, key).await
684    }
685
686    #[tracing::instrument(name = "pg.get_execution_namespace", skip_all)]
687    async fn get_execution_namespace(
688        &self,
689        execution_id: &ExecutionId,
690    ) -> Result<Option<String>, EngineError> {
691        exec_core::get_execution_namespace_impl(&self.pool, execution_id).await
692    }
693
694    #[cfg(feature = "core")]
695    #[tracing::instrument(name = "pg.list_edges", skip_all)]
696    async fn list_edges(
697        &self,
698        flow_id: &FlowId,
699        direction: EdgeDirection,
700    ) -> Result<Vec<EdgeSnapshot>, EngineError> {
701        flow::list_edges(&self.pool, &self.partition_config, flow_id, direction).await
702    }
703
704    #[cfg(feature = "core")]
705    #[tracing::instrument(name = "pg.describe_edge", skip_all)]
706    async fn describe_edge(
707        &self,
708        flow_id: &FlowId,
709        edge_id: &EdgeId,
710    ) -> Result<Option<EdgeSnapshot>, EngineError> {
711        flow::describe_edge(&self.pool, &self.partition_config, flow_id, edge_id).await
712    }
713
714    #[cfg(feature = "core")]
715    #[tracing::instrument(name = "pg.resolve_execution_flow_id", skip_all)]
716    async fn resolve_execution_flow_id(
717        &self,
718        eid: &ExecutionId,
719    ) -> Result<Option<FlowId>, EngineError> {
720        exec_core::resolve_execution_flow_id_impl(&self.pool, &self.partition_config, eid).await
721    }
722
723    #[cfg(feature = "core")]
724    #[tracing::instrument(name = "pg.list_flows", skip_all)]
725    async fn list_flows(
726        &self,
727        partition: PartitionKey,
728        cursor: Option<FlowId>,
729        limit: usize,
730    ) -> Result<ListFlowsPage, EngineError> {
731        flow::list_flows(&self.pool, partition, cursor, limit).await
732    }
733
734    #[cfg(feature = "core")]
735    #[tracing::instrument(name = "pg.list_lanes", skip_all)]
736    async fn list_lanes(
737        &self,
738        cursor: Option<LaneId>,
739        limit: usize,
740    ) -> Result<ListLanesPage, EngineError> {
741        admin::list_lanes_impl(&self.pool, cursor, limit).await
742    }
743
744    #[cfg(feature = "core")]
745    #[tracing::instrument(name = "pg.list_suspended", skip_all)]
746    async fn list_suspended(
747        &self,
748        partition: PartitionKey,
749        cursor: Option<ExecutionId>,
750        limit: usize,
751    ) -> Result<ListSuspendedPage, EngineError> {
752        admin::list_suspended_impl(&self.pool, partition, cursor, limit).await
753    }
754
755    #[cfg(feature = "core")]
756    #[tracing::instrument(name = "pg.list_executions", skip_all)]
757    async fn list_executions(
758        &self,
759        partition: PartitionKey,
760        cursor: Option<ExecutionId>,
761        limit: usize,
762    ) -> Result<ListExecutionsPage, EngineError> {
763        exec_core::list_executions_impl(
764            &self.pool,
765            &self.partition_config,
766            partition,
767            cursor,
768            limit,
769        )
770        .await
771    }
772
773    // ── Trigger ops (issue #150) ──
774
775    #[cfg(feature = "core")]
776    #[tracing::instrument(name = "pg.deliver_signal", skip_all)]
777    async fn deliver_signal(
778        &self,
779        args: DeliverSignalArgs,
780    ) -> Result<DeliverSignalResult, EngineError> {
781        suspend_ops::deliver_signal_impl(&self.pool, &self.partition_config, args).await
782    }
783
784    #[cfg(feature = "core")]
785    #[tracing::instrument(name = "pg.deliver_approval_signal", skip_all)]
786    async fn deliver_approval_signal(
787        &self,
788        args: DeliverApprovalSignalArgs,
789    ) -> Result<DeliverSignalResult, EngineError> {
790        suspend_ops::deliver_approval_signal_impl(&self.pool, &self.partition_config, args).await
791    }
792
793    #[cfg(feature = "core")]
794    #[tracing::instrument(name = "pg.claim_resumed_execution", skip_all)]
795    async fn claim_resumed_execution(
796        &self,
797        args: ClaimResumedExecutionArgs,
798    ) -> Result<ClaimResumedExecutionResult, EngineError> {
799        suspend_ops::claim_resumed_execution_impl(&self.pool, &self.partition_config, args).await
800    }
801
802    // ── RFC-020 Wave 9 Spine-B — read model (3 methods, §4.1) ────────
803    //
804    // Partition-local single-row reads against `ff_exec_core` (+ LATERAL
805    // join on `ff_attempt` for `read_execution_info`). READ COMMITTED
806    // (no CAS; all three are read-only). `get_execution_result` returns
807    // current-attempt semantics per §7.8 (matches Valkey's
808    // `GET ctx.result()` primitive). Capability flips land at the Wave 9
809    // release PR per RFC §6.3.
810
811    #[cfg(feature = "core")]
812    #[tracing::instrument(name = "pg.read_execution_state", skip_all)]
813    async fn read_execution_state(
814        &self,
815        id: &ExecutionId,
816    ) -> Result<Option<PublicState>, EngineError> {
817        exec_core::read_execution_state_impl(&self.pool, &self.partition_config, id).await
818    }
819
820    #[cfg(feature = "core")]
821    #[tracing::instrument(name = "pg.read_execution_info", skip_all)]
822    async fn read_execution_info(
823        &self,
824        id: &ExecutionId,
825    ) -> Result<Option<ExecutionInfo>, EngineError> {
826        exec_core::read_execution_info_impl(&self.pool, &self.partition_config, id).await
827    }
828
829    #[tracing::instrument(name = "pg.get_execution_result", skip_all)]
830    async fn get_execution_result(
831        &self,
832        id: &ExecutionId,
833    ) -> Result<Option<Vec<u8>>, EngineError> {
834        exec_core::get_execution_result_impl(&self.pool, &self.partition_config, id).await
835    }
836
837    // ── RFC-020 Wave 9 Standalone-2 — list_pending_waitpoints (§4.5) ─
838    //
839    // Read-only projection of `ff_waitpoint_pending` serving the 10-
840    // field `PendingWaitpointInfo` contract. Producer-side writes of
841    // the 3 new 0011 columns (`state`, `required_signal_names`,
842    // `activated_at_ms`) land alongside this method in the same PR —
843    // see `suspend_ops::suspend_core` INSERT site. Capability flip
844    // deferred to Wave 9 release PR per RFC §6.3.
845    #[cfg(feature = "core")]
846    #[tracing::instrument(name = "pg.list_pending_waitpoints", skip_all)]
847    async fn list_pending_waitpoints(
848        &self,
849        args: ListPendingWaitpointsArgs,
850    ) -> Result<ListPendingWaitpointsResult, EngineError> {
851        suspend_ops::list_pending_waitpoints_impl(&self.pool, args).await
852    }
853
854    // ── RFC-020 Wave 9 Spine-A pt.1 — operator-control mutations (§4.2) ─
855    //
856    // Two methods landing behind `Supports.cancel_execution` +
857    // `Supports.revoke_lease` (both stay `false` until the Wave 9
858    // release PR flips them atomically, RFC §6.3). SERIALIZABLE + CAS +
859    // `ff_lease_event` outbox emit on the same tx (§4.2.6 + §4.2.7).
860
861    #[cfg(feature = "core")]
862    #[tracing::instrument(name = "pg.cancel_execution", skip_all)]
863    async fn cancel_execution(
864        &self,
865        args: CancelExecutionArgs,
866    ) -> Result<CancelExecutionResult, EngineError> {
867        operator::cancel_execution_impl(&self.pool, args).await
868    }
869
870    #[cfg(feature = "core")]
871    #[tracing::instrument(name = "pg.revoke_lease", skip_all)]
872    async fn revoke_lease(
873        &self,
874        args: RevokeLeaseArgs,
875    ) -> Result<RevokeLeaseResult, EngineError> {
876        operator::revoke_lease_impl(&self.pool, args).await
877    }
878
879    // ── RFC-020 Wave 9 Spine-A pt.2 — operator control + flow cancel (§4.2.3 + §4.2.4 + §4.2.5) ─
880    //
881    // Four methods landing behind `Supports.change_priority` +
882    // `Supports.replay_execution` + `Supports.cancel_flow_header` +
883    // `Supports.ack_cancel_member` (all stay `false` until the Wave 9
884    // release PR flips them atomically, RFC §6.3). SERIALIZABLE + CAS +
885    // `ff_operator_event` outbox emit on the same tx (§4.2.6 + §4.2.7).
886    // `ack_cancel_member` is silent on the outbox (Valkey-parity).
887
888    #[cfg(feature = "core")]
889    #[tracing::instrument(name = "pg.change_priority", skip_all)]
890    async fn change_priority(
891        &self,
892        args: ChangePriorityArgs,
893    ) -> Result<ChangePriorityResult, EngineError> {
894        operator::change_priority_impl(&self.pool, args).await
895    }
896
897    #[cfg(feature = "core")]
898    #[tracing::instrument(name = "pg.replay_execution", skip_all)]
899    async fn replay_execution(
900        &self,
901        args: ReplayExecutionArgs,
902    ) -> Result<ReplayExecutionResult, EngineError> {
903        operator::replay_execution_impl(&self.pool, args).await
904    }
905
906    #[cfg(feature = "core")]
907    #[tracing::instrument(name = "pg.cancel_flow_header", skip_all)]
908    async fn cancel_flow_header(
909        &self,
910        args: CancelFlowArgs,
911    ) -> Result<CancelFlowHeader, EngineError> {
912        operator::cancel_flow_header_impl(&self.pool, &self.partition_config, args).await
913    }
914
915    #[cfg(feature = "core")]
916    #[tracing::instrument(name = "pg.ack_cancel_member", skip_all)]
917    async fn ack_cancel_member(
918        &self,
919        flow_id: &FlowId,
920        execution_id: &ExecutionId,
921    ) -> Result<(), EngineError> {
922        operator::ack_cancel_member_impl(
923            &self.pool,
924            &self.partition_config,
925            flow_id.clone(),
926            execution_id.clone(),
927        )
928        .await
929    }
930
931    // ── RFC-017 Stage A — ingress (promoted from inherent) ────
932
933    /// RFC-017 Wave 8 Stage E1: lift the inherent
934    /// [`PostgresBackend::create_execution`] onto the trait so
935    /// ff-server's migrated HTTP handler can dispatch to Postgres.
936    /// Post-insert the row is idempotent; the Postgres impl does not
937    /// distinguish `Created` from `Duplicate` at the helper level
938    /// (both paths commit and return the execution id), so we always
939    /// surface `Created { public_state: Waiting }` here. A follow-up
940    /// may lift the distinction if a consumer relies on it.
941    #[cfg(feature = "core")]
942    #[tracing::instrument(name = "pg.create_execution.trait", skip_all)]
943    async fn create_execution(
944        &self,
945        args: CreateExecutionArgs,
946    ) -> Result<CreateExecutionResult, EngineError> {
947        let eid = args.execution_id.clone();
948        exec_core::create_execution_impl(&self.pool, &self.partition_config, args).await?;
949        Ok(CreateExecutionResult::Created {
950            execution_id: eid,
951            public_state: PublicState::Waiting,
952        })
953    }
954
955    #[cfg(feature = "core")]
956    #[tracing::instrument(name = "pg.create_flow", skip_all)]
957    async fn create_flow(
958        &self,
959        args: CreateFlowArgs,
960    ) -> Result<CreateFlowResult, EngineError> {
961        flow_staging::create_flow(&self.pool, &self.partition_config, &args).await
962    }
963
964    #[cfg(feature = "core")]
965    #[tracing::instrument(name = "pg.add_execution_to_flow", skip_all)]
966    async fn add_execution_to_flow(
967        &self,
968        args: AddExecutionToFlowArgs,
969    ) -> Result<AddExecutionToFlowResult, EngineError> {
970        flow_staging::add_execution_to_flow(&self.pool, &self.partition_config, &args).await
971    }
972
973    #[cfg(feature = "core")]
974    #[tracing::instrument(name = "pg.stage_dependency_edge", skip_all)]
975    async fn stage_dependency_edge(
976        &self,
977        args: StageDependencyEdgeArgs,
978    ) -> Result<StageDependencyEdgeResult, EngineError> {
979        flow_staging::stage_dependency_edge(&self.pool, &self.partition_config, &args).await
980    }
981
982    #[cfg(feature = "core")]
983    #[tracing::instrument(name = "pg.apply_dependency_to_child", skip_all)]
984    async fn apply_dependency_to_child(
985        &self,
986        args: ApplyDependencyToChildArgs,
987    ) -> Result<ApplyDependencyToChildResult, EngineError> {
988        flow_staging::apply_dependency_to_child(&self.pool, &self.partition_config, &args).await
989    }
990
991    // PR-7b Cluster 4. Async-via-outbox cascade — see trait rustdoc
992    // "Timing semantics" for the Valkey-sync vs PG-outbox divergence.
993    // This resolves the payload to its `ff_completion_event.event_id`
994    // and invokes the existing Wave-5a `dispatch_completion`; further
995    // hops ride their own outbox events emitted by the per-hop tx.
996    #[cfg(feature = "core")]
997    async fn cascade_completion(
998        &self,
999        payload: &ff_core::backend::CompletionPayload,
1000    ) -> Result<ff_core::contracts::CascadeOutcome, EngineError> {
1001        let event_id = match resolve_event_id(&self.pool, payload).await {
1002            Some(id) => id,
1003            None => {
1004                // Event not materialised (outbox race or pre-subscribe
1005                // payload) — reconciler is the backstop.
1006                tracing::warn!(
1007                    execution_id = %payload.execution_id,
1008                    produced_at_ms = payload.produced_at_ms.0,
1009                    "pg.cascade_completion: could not resolve event_id; reconciler will claim"
1010                );
1011                return Ok(ff_core::contracts::CascadeOutcome::async_dispatched(0));
1012            }
1013        };
1014        let outcome = crate::dispatch::dispatch_completion(&self.pool, event_id).await?;
1015        let advanced = match outcome {
1016            crate::dispatch::DispatchOutcome::NoOp => 0,
1017            crate::dispatch::DispatchOutcome::Advanced(n) => n,
1018        };
1019        Ok(ff_core::contracts::CascadeOutcome::async_dispatched(advanced))
1020    }
1021
1022    fn backend_label(&self) -> &'static str {
1023        "postgres"
1024    }
1025
1026    /// RFC-018 Stage A: populate the `Capabilities` snapshot from the
1027    /// static [`postgres_supports_base`] shape. The Postgres backend
1028    /// landed through RFC-017 Stage E4 at v0.8.0; fields still `false`
1029    /// correspond to Wave-9 follow-up work (`cancel_flow_header`,
1030    /// `ack_cancel_member`, read-model, operator control, budget /
1031    /// quota, `list_pending_waitpoints`). See
1032    /// `docs/POSTGRES_PARITY_MATRIX.md` for the per-row breakdown.
1033    fn capabilities(&self) -> ff_core::capability::Capabilities {
1034        ff_core::capability::Capabilities::new(
1035            ff_core::capability::BackendIdentity::new(
1036                "postgres",
1037                ff_core::capability::Version::new(0, 11, 0),
1038                "E-shipped",
1039            ),
1040            postgres_supports_base(),
1041        )
1042    }
1043
1044    /// Issue #281: no-op. Schema migrations are applied out-of-band
1045    /// per `rfcs/drafts/v0.7-migration-master.md §Q12` (operator runs
1046    /// `sqlx migrate run` or the future `ff-migrate` CLI). Boot runs a
1047    /// schema-version check at connect time
1048    /// ([`crate::version::check_schema_version`]) and refuses to
1049    /// start on mismatch, so by the time `prepare()` is callable
1050    /// there is nothing further to do.
1051    async fn prepare(
1052        &self,
1053    ) -> Result<ff_core::backend::PrepareOutcome, EngineError> {
1054        Ok(ff_core::backend::PrepareOutcome::NoOp)
1055    }
1056
1057    /// RFC-017 Wave 8 Stage E3 (§4 row 9, §7): forward the claim to the
1058    /// Postgres-native admission pipeline. Returns `NoWork` when no
1059    /// eligible execution is admissible this scan cycle. Budget
1060    /// breaches surface as `NoWork` (leaving the row eligible for a
1061    /// retry by another worker); validation-class rejections
1062    /// (malformed partition, unknown kid) surface as typed
1063    /// [`EngineError`] variants mapped to the Server's 400/503 arms.
1064    #[cfg(feature = "core")]
1065    #[tracing::instrument(name = "pg.claim_for_worker", skip_all)]
1066    async fn claim_for_worker(
1067        &self,
1068        args: ff_core::contracts::ClaimForWorkerArgs,
1069    ) -> Result<ff_core::contracts::ClaimForWorkerOutcome, EngineError> {
1070        let sched = scheduler::PostgresScheduler::new(self.pool.clone());
1071        let grant_opt = sched
1072            .claim_for_worker(
1073                &args.lane_id,
1074                &args.worker_id,
1075                &args.worker_instance_id,
1076                &args.worker_capabilities,
1077                args.grant_ttl_ms,
1078            )
1079            .await?;
1080        Ok(match grant_opt {
1081            Some(g) => ff_core::contracts::ClaimForWorkerOutcome::granted(g),
1082            None => ff_core::contracts::ClaimForWorkerOutcome::no_work(),
1083        })
1084    }
1085
1086    async fn ping(&self) -> Result<(), EngineError> {
1087        // Postgres analogue to Valkey PING — single-round-trip pool
1088        // liveness. Errors propagate as transport-class EngineError via
1089        // the existing sqlx→EngineError map.
1090        let _ = sqlx::query_scalar::<_, i32>("SELECT 1")
1091            .fetch_one(&self.pool)
1092            .await
1093            .map_err(error::map_sqlx_error)?;
1094        Ok(())
1095    }
1096
1097    /// RFC-017 Wave 8 Stage E3: drain the scanner supervisor's
1098    /// reconciler tasks up to `grace`, then close the sqlx pool.
1099    /// Matches the Valkey backend's shutdown_prepare contract —
1100    /// bounded best-effort drain, never returns an error.
1101    async fn shutdown_prepare(&self, grace: Duration) -> Result<(), EngineError> {
1102        #[cfg(feature = "core")]
1103        if let Some(handle) = self.scanner_handle.as_ref() {
1104            let timed_out = handle.shutdown(grace).await;
1105            if timed_out > 0 {
1106                tracing::warn!(
1107                    timed_out,
1108                    ?grace,
1109                    "postgres scanner supervisor exceeded grace on shutdown"
1110                );
1111            }
1112        }
1113        Ok(())
1114    }
1115
1116    #[tracing::instrument(name = "pg.cancel_flow", skip_all)]
1117    async fn cancel_flow(
1118        &self,
1119        id: &FlowId,
1120        policy: CancelFlowPolicy,
1121        wait: CancelFlowWait,
1122    ) -> Result<CancelFlowResult, EngineError> {
1123        let result = flow::cancel_flow(&self.pool, &self.partition_config, id, policy).await?;
1124        if let Some(deadline) = ff_core::engine_backend::cancel_flow_wait_deadline(wait) {
1125            ff_core::engine_backend::wait_for_flow_cancellation(self, id, deadline).await?;
1126        }
1127        Ok(result)
1128    }
1129
1130    #[cfg(feature = "core")]
1131    #[tracing::instrument(name = "pg.set_edge_group_policy", skip_all)]
1132    async fn set_edge_group_policy(
1133        &self,
1134        flow_id: &FlowId,
1135        downstream_execution_id: &ExecutionId,
1136        policy: EdgeDependencyPolicy,
1137    ) -> Result<SetEdgeGroupPolicyResult, EngineError> {
1138        flow::set_edge_group_policy(
1139            &self.pool,
1140            &self.partition_config,
1141            flow_id,
1142            downstream_execution_id,
1143            policy,
1144        )
1145        .await
1146    }
1147
1148    // ── Budget ──
1149
1150    #[tracing::instrument(name = "pg.report_usage", skip_all)]
1151    async fn report_usage(
1152        &self,
1153        _handle: &Handle,
1154        budget: &BudgetId,
1155        dimensions: UsageDimensions,
1156    ) -> Result<ReportUsageResult, EngineError> {
1157        budget::report_usage_impl(&self.pool, &self.partition_config, budget, dimensions).await
1158    }
1159
1160    // ── RFC-020 Wave 9 Standalone-1 — budget/quota admin (§4.4) ─────
1161    //
1162    // Five methods landing behind capability flags that stay `false`
1163    // until the Wave 9 release PR flips them atomically (RFC §6.3).
1164    // Schema + trait impls land now; capability-surface flip is one
1165    // PR later.
1166
1167    #[cfg(feature = "core")]
1168    #[tracing::instrument(name = "pg.create_budget", skip_all)]
1169    async fn create_budget(
1170        &self,
1171        args: CreateBudgetArgs,
1172    ) -> Result<CreateBudgetResult, EngineError> {
1173        budget::create_budget_impl(&self.pool, &self.partition_config, args).await
1174    }
1175
1176    #[cfg(feature = "core")]
1177    #[tracing::instrument(name = "pg.reset_budget", skip_all)]
1178    async fn reset_budget(
1179        &self,
1180        args: ResetBudgetArgs,
1181    ) -> Result<ResetBudgetResult, EngineError> {
1182        budget::reset_budget_impl(&self.pool, &self.partition_config, args).await
1183    }
1184
1185    #[cfg(feature = "core")]
1186    #[tracing::instrument(name = "pg.create_quota_policy", skip_all)]
1187    async fn create_quota_policy(
1188        &self,
1189        args: CreateQuotaPolicyArgs,
1190    ) -> Result<CreateQuotaPolicyResult, EngineError> {
1191        budget::create_quota_policy_impl(&self.pool, &self.partition_config, args).await
1192    }
1193
1194    #[cfg(feature = "core")]
1195    #[tracing::instrument(name = "pg.get_budget_status", skip_all)]
1196    async fn get_budget_status(
1197        &self,
1198        id: &BudgetId,
1199    ) -> Result<BudgetStatus, EngineError> {
1200        budget::get_budget_status_impl(&self.pool, &self.partition_config, id).await
1201    }
1202
1203    #[cfg(feature = "core")]
1204    #[tracing::instrument(name = "pg.report_usage_admin", skip_all)]
1205    async fn report_usage_admin(
1206        &self,
1207        budget_id: &BudgetId,
1208        args: ReportUsageAdminArgs,
1209    ) -> Result<ReportUsageResult, EngineError> {
1210        budget::report_usage_admin_impl(&self.pool, &self.partition_config, budget_id, args).await
1211    }
1212
1213    // ── cairn #454 Phase 4a — per-execution ledger (option A) ────────
1214
1215    #[cfg(feature = "core")]
1216    #[tracing::instrument(name = "pg.record_spend", skip_all)]
1217    async fn record_spend(
1218        &self,
1219        args: ff_core::contracts::RecordSpendArgs,
1220    ) -> Result<ReportUsageResult, EngineError> {
1221        budget::record_spend_impl(&self.pool, &self.partition_config, args).await
1222    }
1223
1224    #[cfg(feature = "core")]
1225    #[tracing::instrument(name = "pg.release_budget", skip_all)]
1226    async fn release_budget(
1227        &self,
1228        args: ff_core::contracts::ReleaseBudgetArgs,
1229    ) -> Result<(), EngineError> {
1230        budget::release_budget_impl(&self.pool, &self.partition_config, args).await
1231    }
1232
1233    // ── HMAC secret rotation (v0.7 migration-master Q4) ──
1234    //
1235    // Wave 4 replaces this stub with a single INSERT into
1236    // `ff_waitpoint_hmac(kid, secret, rotated_at)`. Wave 0/1 keep
1237    // the `Unavailable` shape so a running Postgres backend surfaces
1238    // the unimplemented op loudly rather than silently no-op'ing.
1239    #[tracing::instrument(name = "pg.rotate_waitpoint_hmac_secret_all", skip_all)]
1240    async fn rotate_waitpoint_hmac_secret_all(
1241        &self,
1242        args: RotateWaitpointHmacSecretAllArgs,
1243    ) -> Result<RotateWaitpointHmacSecretAllResult, EngineError> {
1244        // Wave 4 Agent D: Q4 single-global-row write against
1245        // `ff_waitpoint_hmac`. `now_ms` is captured here (not
1246        // inside the impl) so tests can inject a deterministic
1247        // clock via the pool layer in the future.
1248        let now_ms = std::time::SystemTime::now()
1249            .duration_since(std::time::UNIX_EPOCH)
1250            .map(|d| d.as_millis() as i64)
1251            .unwrap_or(0);
1252        signal::rotate_waitpoint_hmac_secret_all_impl(&self.pool, args, now_ms).await
1253    }
1254
1255    #[tracing::instrument(name = "pg.seed_waitpoint_hmac_secret", skip_all)]
1256    async fn seed_waitpoint_hmac_secret(
1257        &self,
1258        args: SeedWaitpointHmacSecretArgs,
1259    ) -> Result<SeedOutcome, EngineError> {
1260        // Issue #280: install-only boot-time seed against the global
1261        // `ff_waitpoint_hmac` table. Idempotent — cairn calls this on
1262        // every boot and the backend decides whether to INSERT.
1263        let now_ms = std::time::SystemTime::now()
1264            .duration_since(std::time::UNIX_EPOCH)
1265            .map(|d| d.as_millis() as i64)
1266            .unwrap_or(0);
1267        signal::seed_waitpoint_hmac_secret_impl(&self.pool, args, now_ms).await
1268    }
1269
1270    // ── Stream reads (streaming feature) ──
1271
1272    #[cfg(feature = "streaming")]
1273    #[tracing::instrument(name = "pg.read_stream", skip_all)]
1274    async fn read_stream(
1275        &self,
1276        execution_id: &ExecutionId,
1277        attempt_index: AttemptIndex,
1278        from: StreamCursor,
1279        to: StreamCursor,
1280        count_limit: u64,
1281    ) -> Result<StreamFrames, EngineError> {
1282        stream::read_stream(&self.pool, execution_id, attempt_index, from, to, count_limit).await
1283    }
1284
1285    #[cfg(feature = "streaming")]
1286    #[tracing::instrument(name = "pg.tail_stream", skip_all)]
1287    async fn tail_stream(
1288        &self,
1289        execution_id: &ExecutionId,
1290        attempt_index: AttemptIndex,
1291        after: StreamCursor,
1292        block_ms: u64,
1293        count_limit: u64,
1294        visibility: TailVisibility,
1295    ) -> Result<StreamFrames, EngineError> {
1296        let notifier = self
1297            .stream_notifier
1298            .as_ref()
1299            .ok_or(EngineError::Unavailable {
1300                op: "pg.tail_stream (notifier not initialised)",
1301            })?;
1302        stream::tail_stream(
1303            &self.pool,
1304            notifier,
1305            execution_id,
1306            attempt_index,
1307            after,
1308            block_ms,
1309            count_limit,
1310            visibility,
1311        )
1312        .await
1313    }
1314
1315    #[cfg(feature = "streaming")]
1316    #[tracing::instrument(name = "pg.read_summary", skip_all)]
1317    async fn read_summary(
1318        &self,
1319        execution_id: &ExecutionId,
1320        attempt_index: AttemptIndex,
1321    ) -> Result<Option<SummaryDocument>, EngineError> {
1322        stream::read_summary(&self.pool, execution_id, attempt_index).await
1323    }
1324
1325    // ── RFC-019 Stage A — `subscribe_completion` ──────────────────
1326    //
1327    // Postgres real impl. Wraps the existing `ff_completion_event`
1328    // outbox + `LISTEN ff_completion` machinery
1329    // (see `completion::subscribe`) and adapts each completion
1330    // payload into a `stream_subscribe::StreamEvent`.
1331    //
1332    // Cursor encoding: `POSTGRES_CURSOR_PREFIX (0x02)` + `event_id`
1333    // (i64 BE). Stage A resume-from-cursor is not plumbed through the
1334    // adapter (the existing subscriber tails from `max(event_id)`);
1335    // Stage B threads the cursor into the replay path. The surface is
1336    // correct today for consumers that subscribe from tail and
1337    // persist cursors for future resume.
1338    #[tracing::instrument(name = "pg.subscribe_completion", skip_all)]
1339    async fn subscribe_completion(
1340        &self,
1341        _cursor: ff_core::stream_subscribe::StreamCursor,
1342        filter: &ff_core::backend::ScannerFilter,
1343    ) -> Result<ff_core::stream_events::CompletionSubscription, EngineError> {
1344        use ff_core::stream_events::{CompletionEvent, CompletionOutcome};
1345        use ff_core::stream_subscribe::encode_postgres_event_cursor;
1346        use futures_core::Stream;
1347        use std::pin::Pin;
1348        use std::task::{Context, Poll};
1349
1350        // Delegate to the existing CompletionBackend implementation so
1351        // the LISTEN/replay machinery is shared. When a non-noop
1352        // `ScannerFilter` (#282) is supplied, route through the
1353        // `_filtered` variant so the outbox-inline SQL filter applies.
1354        // Resume-from-cursor is still unwired (Stage A surface tails
1355        // from tail).
1356        let inner = if filter.is_noop() {
1357            ff_core::completion_backend::CompletionBackend::subscribe_completions(self).await?
1358        } else {
1359            ff_core::completion_backend::CompletionBackend::subscribe_completions_filtered(
1360                self, filter,
1361            )
1362            .await?
1363        };
1364
1365        struct Adapter {
1366            inner: ff_core::completion_backend::CompletionStream,
1367        }
1368
1369        impl Stream for Adapter {
1370            type Item = Result<CompletionEvent, EngineError>;
1371            fn poll_next(
1372                mut self: Pin<&mut Self>,
1373                cx: &mut Context<'_>,
1374            ) -> Poll<Option<Self::Item>> {
1375                match Pin::new(&mut self.inner).poll_next(cx) {
1376                    Poll::Pending => Poll::Pending,
1377                    Poll::Ready(None) => Poll::Ready(None),
1378                    Poll::Ready(Some(payload)) => {
1379                        // Placeholder cursor (0-event_id) because
1380                        // `CompletionPayload` does not surface
1381                        // `event_id` today. Family prefix stays stable
1382                        // so persistence is forward-compatible.
1383                        let cursor = encode_postgres_event_cursor(0);
1384                        let event = CompletionEvent::new(
1385                            cursor,
1386                            payload.execution_id.clone(),
1387                            CompletionOutcome::from_wire(&payload.outcome),
1388                            payload.produced_at_ms,
1389                        );
1390                        Poll::Ready(Some(Ok(event)))
1391                    }
1392                }
1393            }
1394        }
1395
1396        Ok(Box::pin(Adapter { inner }))
1397    }
1398
1399    // ── RFC-019 Stage B — `subscribe_lease_history` ──────────────
1400    //
1401    // Real Postgres impl. Tails the `ff_lease_event` outbox (written
1402    // by producer sites in `attempt.rs`, `flow.rs`, `suspend_ops.rs`,
1403    // and the `attempt_timeout` / `lease_expiry` reconcilers) via
1404    // `LISTEN ff_lease_event` + catch-up SELECT. Cursor encoding
1405    // matches `subscribe_completion`: `0x02 ++ event_id(BE8)`.
1406    //
1407    // Partition scope: hardcoded to partition 0 — mirrors the Valkey
1408    // Stage A impl, which tails partition 0's aggregate stream key.
1409    // Cross-partition consumers instantiate one backend per
1410    // partition + merge streams consumer-side (RFC-019 §Backend
1411    // Semantics).
1412    #[tracing::instrument(name = "pg.subscribe_lease_history", skip_all)]
1413    async fn subscribe_lease_history(
1414        &self,
1415        cursor: ff_core::stream_subscribe::StreamCursor,
1416        filter: &ff_core::backend::ScannerFilter,
1417    ) -> Result<ff_core::stream_events::LeaseHistorySubscription, EngineError> {
1418        lease_event_subscribe::subscribe(&self.pool, 0, cursor, filter.clone()).await
1419    }
1420
1421    // ── RFC-019 Stage B — `subscribe_signal_delivery` (#310) ─────
1422    //
1423    // Tails the `ff_signal_event` outbox (written by the producer
1424    // INSERT in `suspend_ops::deliver_signal_impl`) via
1425    // `LISTEN ff_signal_event` + catch-up SELECT. Cursor encoding
1426    // matches `subscribe_lease_history`: `0x02 ++ event_id(BE8)`.
1427    //
1428    // Partition scope: hardcoded to partition 0 — mirrors the Valkey
1429    // Stage B impl which tails partition 0's aggregate stream key.
1430    #[tracing::instrument(name = "pg.subscribe_signal_delivery", skip_all)]
1431    async fn subscribe_signal_delivery(
1432        &self,
1433        cursor: ff_core::stream_subscribe::StreamCursor,
1434        filter: &ff_core::backend::ScannerFilter,
1435    ) -> Result<ff_core::stream_events::SignalDeliverySubscription, EngineError> {
1436        signal_delivery_subscribe::subscribe(&self.pool, 0, cursor, filter.clone()).await
1437    }
1438
1439    // ── PR-7b Cluster 1 — Foundation scanner operations ─────────
1440    //
1441    // Per-execution wrappers around the reconcilers in
1442    // `crate::reconcilers::*`. Each reconciler exposes a
1443    // `*_for_execution` / `*_for_*` helper mirroring the batch
1444    // `scan_tick` path's per-row tx logic so the engine-side scanner
1445    // trait-dispatch and the batch reconciler share one SQL code
1446    // path.
1447    //
1448    // All 5 scanner-op methods run real SQL on Postgres; both phases
1449    // of `expire_execution` (`AttemptTimeout` + `ExecutionDeadline`)
1450    // are covered. The Wave-9-minimal delivery (this commit) adds
1451    // `delayed_promoter`, `pending_wp_expiry`, and
1452    // `execution_deadline` reconcilers on top of the Wave 6c
1453    // reconcilers shipped with PR-7b/1.
1454    //
1455    // Gated on `core` because `reconcilers` (and its `dispatch` dep)
1456    // require `core`. Without `core` the trait defaults return
1457    // `EngineError::Unavailable`, preserving behavioural parity with
1458    // other feature-stripped callsites.
1459
1460    #[cfg(feature = "core")]
1461    async fn mark_lease_expired_if_due(
1462        &self,
1463        partition: Partition,
1464        execution_id: &ExecutionId,
1465    ) -> Result<(), EngineError> {
1466        let (_pk_from_eid, exec_uuid) = attempt::split_exec_id(execution_id)?;
1467        let partition_key = partition_index_to_i16(partition)?;
1468        reconcilers::lease_expiry::release_for_execution(&self.pool, partition_key, exec_uuid)
1469            .await
1470    }
1471
1472    #[cfg(feature = "core")]
1473    async fn promote_delayed(
1474        &self,
1475        partition: Partition,
1476        _lane: &LaneId,
1477        execution_id: &ExecutionId,
1478        now_ms: TimestampMs,
1479    ) -> Result<(), EngineError> {
1480        let (_pk_from_eid, exec_uuid) = attempt::split_exec_id(execution_id)?;
1481        let partition_key = partition_index_to_i16(partition)?;
1482        // `lane` is ignored: lane is authoritative on `ff_exec_core`
1483        // already (it was used only to locate the Valkey ZSET). The
1484        // candidate selection in `promote_for_execution` re-checks
1485        // the (lifecycle_phase, eligibility_state, deadline_at_ms)
1486        // tuple that `attempt::delay()` writes.
1487        reconcilers::delayed_promoter::promote_for_execution(
1488            &self.pool,
1489            partition_key,
1490            exec_uuid,
1491            now_ms.0,
1492        )
1493        .await
1494    }
1495
1496    #[cfg(feature = "core")]
1497    async fn close_waitpoint(
1498        &self,
1499        partition: Partition,
1500        _execution_id: &ExecutionId,
1501        waitpoint_id: &str,
1502        now_ms: TimestampMs,
1503    ) -> Result<(), EngineError> {
1504        // The scanner resolves (waitpoint_id → owning execution_id)
1505        // separately for filter application; the close action itself
1506        // only needs the waitpoint row (which carries `execution_id`
1507        // + `waitpoint_key`). Partition is authoritative from the
1508        // caller.
1509        let partition_key = partition_index_to_i16(partition)?;
1510        let waitpoint_uuid = uuid::Uuid::parse_str(waitpoint_id).map_err(|e| {
1511            EngineError::Validation {
1512                kind: ff_core::engine_error::ValidationKind::InvalidInput,
1513                detail: format!("waitpoint_id not a UUID: {e}"),
1514            }
1515        })?;
1516        reconcilers::pending_wp_expiry::close_for_execution(
1517            &self.pool,
1518            partition_key,
1519            waitpoint_uuid,
1520            now_ms.0,
1521        )
1522        .await
1523    }
1524
1525    #[cfg(feature = "core")]
1526    async fn expire_execution(
1527        &self,
1528        partition: Partition,
1529        execution_id: &ExecutionId,
1530        phase: ExpirePhase,
1531        now_ms: TimestampMs,
1532    ) -> Result<(), EngineError> {
1533        let (_pk_from_eid, exec_uuid) = attempt::split_exec_id(execution_id)?;
1534        let partition_key = partition_index_to_i16(partition)?;
1535        match phase {
1536            ExpirePhase::AttemptTimeout => {
1537                reconcilers::attempt_timeout::expire_for_execution(
1538                    &self.pool,
1539                    partition_key,
1540                    exec_uuid,
1541                )
1542                .await
1543            }
1544            ExpirePhase::ExecutionDeadline => {
1545                reconcilers::execution_deadline::expire_for_execution(
1546                    &self.pool,
1547                    partition_key,
1548                    exec_uuid,
1549                    now_ms.0,
1550                )
1551                .await
1552            }
1553        }
1554    }
1555
1556    #[cfg(feature = "core")]
1557    async fn expire_suspension(
1558        &self,
1559        partition: Partition,
1560        execution_id: &ExecutionId,
1561        _now_ms: TimestampMs,
1562    ) -> Result<(), EngineError> {
1563        let (_pk_from_eid, exec_uuid) = attempt::split_exec_id(execution_id)?;
1564        let partition_key = partition_index_to_i16(partition)?;
1565        reconcilers::suspension_timeout::expire_for_execution(
1566            &self.pool,
1567            partition_key,
1568            exec_uuid,
1569        )
1570        .await
1571    }
1572
1573    // PR-7b Cluster 2b-B: flow summary projection on Postgres.
1574    // Aggregates member public_state from ff_exec_core and UPSERTs the
1575    // derived summary into ff_flow_summary (migration 0019). One SQL
1576    // round-trip for the aggregation + one for the upsert.
1577    #[cfg(feature = "core")]
1578    async fn project_flow_summary(
1579        &self,
1580        partition: Partition,
1581        flow_id: &FlowId,
1582        now_ms: TimestampMs,
1583    ) -> Result<bool, EngineError> {
1584        let partition_key = partition_index_to_i16(partition)?;
1585        let flow_uuid: sqlx::types::Uuid = flow_id.0;
1586        flow::project_flow_summary_impl(
1587            &self.pool,
1588            partition_key,
1589            flow_uuid,
1590            now_ms.0,
1591        )
1592        .await
1593    }
1594
1595    // PR-7b Cluster 2b-B: retention trim on Postgres.
1596    // SELECTs a batch of terminal executions past the cutoff, then
1597    // per-execution DELETEs across every sibling table (no FK CASCADE
1598    // in the schema, so this is explicit). One transaction per batch.
1599    #[cfg(feature = "core")]
1600    async fn trim_retention(
1601        &self,
1602        partition: Partition,
1603        lane_id: &LaneId,
1604        retention_ms: u64,
1605        now_ms: TimestampMs,
1606        batch_size: u32,
1607        filter: &ff_core::backend::ScannerFilter,
1608    ) -> Result<u32, EngineError> {
1609        let partition_key = partition_index_to_i16(partition)?;
1610        exec_core::trim_retention_impl(
1611            &self.pool,
1612            partition_key,
1613            lane_id.as_str(),
1614            retention_ms,
1615            now_ms.0,
1616            batch_size,
1617            filter,
1618        )
1619        .await
1620    }
1621
1622    // ── PR-7b / #453: typed-FCALL bodies ──
1623
1624    #[cfg(feature = "core")]
1625    async fn renew_lease(
1626        &self,
1627        args: ff_core::contracts::RenewLeaseArgs,
1628    ) -> Result<ff_core::contracts::RenewLeaseResult, EngineError> {
1629        crate::typed_ops::renew_lease(self.pool(), args).await
1630    }
1631
1632    #[cfg(feature = "core")]
1633    async fn complete_execution(
1634        &self,
1635        args: ff_core::contracts::CompleteExecutionArgs,
1636    ) -> Result<ff_core::contracts::CompleteExecutionResult, EngineError> {
1637        crate::typed_ops::complete_execution(self.pool(), args).await
1638    }
1639
1640    #[cfg(feature = "core")]
1641    async fn fail_execution(
1642        &self,
1643        args: ff_core::contracts::FailExecutionArgs,
1644    ) -> Result<ff_core::contracts::FailExecutionResult, EngineError> {
1645        crate::typed_ops::fail_execution(self.pool(), args).await
1646    }
1647
1648    #[cfg(feature = "core")]
1649    async fn resume_execution(
1650        &self,
1651        args: ff_core::contracts::ResumeExecutionArgs,
1652    ) -> Result<ff_core::contracts::ResumeExecutionResult, EngineError> {
1653        crate::typed_ops::resume_execution(self.pool(), args).await
1654    }
1655
1656    #[cfg(feature = "core")]
1657    async fn check_admission(
1658        &self,
1659        quota_policy_id: &ff_core::types::QuotaPolicyId,
1660        _dimension: &str,
1661        args: ff_core::contracts::CheckAdmissionArgs,
1662    ) -> Result<ff_core::contracts::CheckAdmissionResult, EngineError> {
1663        crate::typed_ops::check_admission(
1664            self.pool(),
1665            &self.partition_config,
1666            quota_policy_id,
1667            args,
1668        )
1669        .await
1670    }
1671
1672    #[cfg(feature = "core")]
1673    async fn evaluate_flow_eligibility(
1674        &self,
1675        args: ff_core::contracts::EvaluateFlowEligibilityArgs,
1676    ) -> Result<ff_core::contracts::EvaluateFlowEligibilityResult, EngineError> {
1677        crate::typed_ops::evaluate_flow_eligibility(self.pool(), args).await
1678    }
1679
1680    #[cfg(feature = "core")]
1681    async fn claim_execution(
1682        &self,
1683        args: ff_core::contracts::ClaimExecutionArgs,
1684    ) -> Result<ff_core::contracts::ClaimExecutionResult, EngineError> {
1685        crate::typed_ops::claim_execution(self.pool(), &self.partition_config, args).await
1686    }
1687
1688    // Cairn #454 Phase 4c — backend-atomic `issue_claim_grant` +
1689    // `claim_execution` composition. sqlx tx ensures a mid-op crash
1690    // cannot leak a dangling grant (auto-rollback on drop).
1691    #[cfg(feature = "core")]
1692    #[tracing::instrument(name = "pg.issue_grant_and_claim", skip_all)]
1693    async fn issue_grant_and_claim(
1694        &self,
1695        args: ff_core::contracts::IssueGrantAndClaimArgs,
1696    ) -> Result<ff_core::contracts::ClaimGrantOutcome, EngineError> {
1697        crate::typed_ops::issue_grant_and_claim(self.pool(), &self.partition_config, args).await
1698    }
1699
1700    // ── PR-7b Wave 0a: exec_core field read ──
1701
1702    async fn read_exec_core_fields(
1703        &self,
1704        partition: ff_core::partition::Partition,
1705        execution_id: &ff_core::types::ExecutionId,
1706        fields: &[&str],
1707    ) -> Result<std::collections::HashMap<String, Option<String>>, EngineError> {
1708        if fields.is_empty() {
1709            return Ok(std::collections::HashMap::new());
1710        }
1711        // Cross-check: `partition` and `execution_id.partition()` must
1712        // agree. A mismatch would silently read the wrong row (or miss)
1713        // on Valkey via the `{p:N}` key tag, so surface it explicitly
1714        // as a validation error here.
1715        let derived: u16 = execution_id.partition();
1716        if partition.index != derived {
1717            return Err(EngineError::Validation {
1718                kind: ff_core::engine_error::ValidationKind::InvalidInput,
1719                detail: format!(
1720                    "read_exec_core_fields: partition mismatch (arg={}, eid={})",
1721                    partition.index, derived
1722                ),
1723            });
1724        }
1725        let partition_key: i16 = partition.index as i16;
1726        let exec_uuid = crate::exec_core::eid_uuid(execution_id);
1727
1728        // Build a single SELECT that projects each requested field to
1729        // a text value. Fields are classified:
1730        //  - canonical columns (lane_id, lifecycle_phase,
1731        //    ownership_state, eligibility_state, public_state,
1732        //    attempt_state, blocking_reason, cancellation_reason,
1733        //    cancelled_by, attempt_index, flow_id, priority,
1734        //    created_at_ms, terminal_at_ms, deadline_at_ms): CAST the
1735        //    column to text. Scanner-facing aliases (current_attempt_index
1736        //    → attempt_index, completed_at → terminal_at_ms,
1737        //    cancel_reason → cancellation_reason) project the canonical
1738        //    column.
1739        //  - `required_capabilities` is `text[]` on PG; projected as CSV
1740        //    via `array_to_string(..., ',')` to match Valkey's HMGET
1741        //    string shape.
1742        //  - `raw_fields` JSONB-resident names (current_waitpoint_id,
1743        //    current_worker_instance_id, budget_ids, quota_policy_id)
1744        //    project via `raw_fields ->> '<field>'`.
1745        //  - Unknown names project NULL (absent-field parity with
1746        //    Valkey HMGET).
1747        let mut projections: Vec<String> = Vec::with_capacity(fields.len());
1748        for field in fields {
1749            let expr = match *field {
1750                // Canonical exec_core columns.
1751                "lane_id" | "lifecycle_phase" | "ownership_state" | "eligibility_state"
1752                | "public_state" | "attempt_state" | "blocking_reason" | "cancellation_reason"
1753                | "cancelled_by" => format!("{f}::text", f = field),
1754                "attempt_index" => "attempt_index::text".to_string(),
1755                "flow_id" => "flow_id::text".to_string(),
1756                "priority" => "priority::text".to_string(),
1757                "created_at_ms" => "created_at_ms::text".to_string(),
1758                "terminal_at_ms" => "terminal_at_ms::text".to_string(),
1759                "deadline_at_ms" => "deadline_at_ms::text".to_string(),
1760                // Scanner-facing aliases that scan the same data from
1761                // different angles.
1762                "current_attempt_index" => "attempt_index::text".to_string(),
1763                "completed_at" => "terminal_at_ms::text".to_string(),
1764                "cancel_reason" => "cancellation_reason::text".to_string(),
1765                // required_capabilities is `text[]` on PG; scanner
1766                // callers expect a CSV string on Valkey. Convert.
1767                "required_capabilities" => {
1768                    "array_to_string(required_capabilities, ',')".to_string()
1769                }
1770                // Everything else lives under raw_fields JSONB.
1771                other => {
1772                    // Strict allowlist of raw_fields names the scanner
1773                    // code reads. Any other name returns NULL.
1774                    match other {
1775                        "current_waitpoint_id"
1776                        | "current_worker_instance_id"
1777                        | "budget_ids"
1778                        | "quota_policy_id" => format!("raw_fields ->> '{other}'"),
1779                        _ => "NULL".to_string(),
1780                    }
1781                }
1782            };
1783            projections.push(expr);
1784        }
1785        let projection_sql = projections.join(", ");
1786        let query = format!(
1787            "SELECT {projection_sql} FROM ff_exec_core \
1788             WHERE partition_key = $1 AND execution_id = $2"
1789        );
1790        let row_opt = sqlx::query(&query)
1791            .bind(partition_key)
1792            .bind(exec_uuid)
1793            .fetch_optional(self.pool())
1794            .await
1795            .map_err(|e| EngineError::Transport {
1796                backend: "postgres",
1797                source: format!("read_exec_core_fields: {e}").into(),
1798            })?;
1799
1800        let mut out = std::collections::HashMap::with_capacity(fields.len());
1801        if let Some(row) = row_opt {
1802            use sqlx::Row;
1803            for (idx, field) in fields.iter().enumerate() {
1804                let val: Option<String> =
1805                    row.try_get(idx).map_err(|e| EngineError::Transport {
1806                        backend: "postgres",
1807                        source: format!("read_exec_core_fields[{field}]: {e}").into(),
1808                    })?;
1809                out.insert((*field).to_string(), val);
1810            }
1811        } else {
1812            for field in fields {
1813                out.insert((*field).to_string(), None);
1814            }
1815        }
1816        Ok(out)
1817    }
1818
1819    // ── PR-7b Wave 0a: clock primitive ──
1820
1821    async fn server_time_ms(&self) -> Result<u64, EngineError> {
1822        // `clock_timestamp()` — `now()` returns the transaction start
1823        // timestamp and would be stale under any long-running tx.
1824        // Scanners use this to compute "due" windows, so the wall-
1825        // clock read must be fresh. Matches the `flow.rs` convention.
1826        let ms: i64 = sqlx::query_scalar(
1827            "SELECT (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint",
1828        )
1829            .fetch_one(self.pool())
1830            .await
1831            .map_err(|e| EngineError::Transport {
1832                backend: "postgres",
1833                source: format!("server_time_ms: {e}").into(),
1834            })?;
1835        if ms < 0 {
1836            return Err(EngineError::Transport {
1837                backend: "postgres",
1838                source: "server_time_ms: negative epoch".into(),
1839            });
1840        }
1841        Ok(ms as u64)
1842    }
1843}
1844
1845/// Resolve a `CompletionPayload` to the matching
1846/// `ff_completion_event.event_id` for PR-7b cascade dispatch.
1847///
1848/// Mirrors the cursor-walk previously inlined in
1849/// `ff-engine::completion_listener::run_completion_listener_postgres` —
1850/// keyed on `(partition_key, execution_id_uuid, occurred_at_ms)`. The
1851/// `partition_key` scoping is what makes this hit the
1852/// `ff_completion_event_lookup_idx` composite instead of devolving into
1853/// a cross-partition seq-scan; it's recoverable from the textual
1854/// `ExecutionId` (`"<partition>:<uuid>"`).
1855///
1856/// Returns `None` if the outbox row isn't visible yet (race with the
1857/// producing tx), the payload's execution id can't be parsed, or the
1858/// partition prefix isn't an `i16` — all recoverable via the
1859/// dependency_reconciler backstop. Transient sqlx errors are logged at
1860/// `warn` with the query inputs and also fall back to `None`; the
1861/// listener retries on the next payload and the reconciler covers the
1862/// worst case.
1863async fn resolve_event_id(
1864    pool: &PgPool,
1865    payload: &ff_core::backend::CompletionPayload,
1866) -> Option<i64> {
1867    let eid_str = payload.execution_id.as_str();
1868    // ExecutionId is `{fp:N}:<uuid>`; split on the rightmost `:` so the
1869    // `{fp:N}` hash-tag prefix stays intact.
1870    let uuid_str = eid_str.rsplit_once(':').map(|(_, u)| u)?;
1871    let uuid = uuid::Uuid::parse_str(uuid_str).ok()?;
1872    // The payload's ExecutionId is already validated (construction
1873    // enforces the `{fp:N}:<uuid>` shape), so `.partition()` is
1874    // infallible. Narrow to `i16` for the partition_key column.
1875    let partition_key = i16::try_from(payload.execution_id.partition()).ok()?;
1876    let occurred_at_ms = payload.produced_at_ms.0;
1877
1878    match sqlx::query_scalar::<_, i64>(
1879        "SELECT event_id FROM ff_completion_event \
1880         WHERE partition_key = $1 AND execution_id = $2 AND occurred_at_ms = $3 \
1881         ORDER BY event_id ASC LIMIT 1",
1882    )
1883    .bind(partition_key)
1884    .bind(uuid)
1885    .bind(occurred_at_ms)
1886    .fetch_optional(pool)
1887    .await
1888    {
1889        Ok(row) => row,
1890        Err(err) => {
1891            tracing::warn!(
1892                partition_key,
1893                execution_id = %uuid,
1894                occurred_at_ms,
1895                error = %err,
1896                "resolve_event_id: ff_completion_event lookup failed; falling back to \
1897                 dependency_reconciler backstop"
1898            );
1899            None
1900        }
1901    }
1902}
1903
1904/// Narrow a `Partition`'s `u16` index into the `i16` the Postgres
1905/// schema uses as its partition-key column. FlowFabric partitions
1906/// max out well below `i16::MAX` in practice (production deployments
1907/// run a few hundred partitions per family), but the conversion is
1908/// still fallible at the type level. Surface overflow as
1909/// `ValidationKind::InvalidInput` rather than silently substituting a
1910/// fallback — a silently mis-routed reconciler would
1911/// corrupt-by-omission without diagnostics.
1912fn partition_index_to_i16(partition: Partition) -> Result<i16, EngineError> {
1913    i16::try_from(partition.index).map_err(|_| EngineError::Validation {
1914        kind: ff_core::engine_error::ValidationKind::InvalidInput,
1915        detail: format!(
1916            "partition index {} exceeds i16 range (max {})",
1917            partition.index,
1918            i16::MAX
1919        ),
1920    })
1921}
1922
1923/// Minimum recommended `max_locks_per_transaction`. Partition-heavy
1924/// schemas (256 hash partitions per logical table) can exceed the
1925/// Postgres default of `64` per tx under modest concurrent bench
1926/// load — the Wave 7c bench hit `out of shared memory` at 16 workers
1927/// × 10k tasks with the default and unblocked at `512`. We warn at
1928/// boot rather than hard-fail because operators may legitimately
1929/// run with a tuned value that still exceeds 64 but sits below our
1930/// threshold.
1931const MIN_MAX_LOCKS_PER_TRANSACTION: i64 = 256;
1932
1933/// Probe `max_locks_per_transaction` at connect time + log a warning
1934/// when the current value is below the production-safe threshold.
1935/// Never fails the connect — probe errors are logged at debug and
1936/// swallowed (pg_show may be restricted on exotic deploys).
1937async fn warn_if_max_locks_low(pool: &PgPool) {
1938    let row: Result<(String,), sqlx::Error> =
1939        sqlx::query_as("SHOW max_locks_per_transaction")
1940            .fetch_one(pool)
1941            .await;
1942    match row {
1943        Ok((raw,)) => emit_max_locks_decision(&raw),
1944        Err(e) => {
1945            tracing::debug!("failed to probe max_locks_per_transaction: {e}");
1946        }
1947    }
1948}
1949
1950/// Pure decision surface for the max-locks probe — extracted for
1951/// unit-testability (the live probe is gated by a running Postgres).
1952/// Returns the integer value when a warning SHOULD fire, `None`
1953/// otherwise (either the raw is valid + at/above threshold, or the
1954/// raw is unparseable — the latter is debug-only).
1955fn max_locks_warn_value(raw: &str) -> Option<i64> {
1956    match raw.parse::<i64>() {
1957        Ok(v) if v < MIN_MAX_LOCKS_PER_TRANSACTION => Some(v),
1958        Ok(_) => None,
1959        Err(e) => {
1960            tracing::debug!(raw, "failed to parse max_locks_per_transaction: {e}");
1961            None
1962        }
1963    }
1964}
1965
1966fn emit_max_locks_decision(raw: &str) {
1967    if let Some(v) = max_locks_warn_value(raw) {
1968        tracing::warn!(
1969            current = v,
1970            recommended = MIN_MAX_LOCKS_PER_TRANSACTION,
1971            "postgres max_locks_per_transaction={v} is below the recommended \
1972             minimum ({MIN_MAX_LOCKS_PER_TRANSACTION}); partition-heavy workloads \
1973             may hit 'out of shared memory' under concurrent load. \
1974             See docs/operator-guide-postgres.md."
1975        );
1976    }
1977}
1978
1979#[cfg(test)]
1980mod max_locks_tests {
1981    use super::{max_locks_warn_value, MIN_MAX_LOCKS_PER_TRANSACTION};
1982
1983    #[test]
1984    fn warns_when_below_threshold() {
1985        assert_eq!(max_locks_warn_value("64"), Some(64));
1986        assert_eq!(
1987            max_locks_warn_value(&(MIN_MAX_LOCKS_PER_TRANSACTION - 1).to_string()),
1988            Some(MIN_MAX_LOCKS_PER_TRANSACTION - 1)
1989        );
1990    }
1991
1992    #[test]
1993    fn silent_at_or_above_threshold() {
1994        assert_eq!(
1995            max_locks_warn_value(&MIN_MAX_LOCKS_PER_TRANSACTION.to_string()),
1996            None
1997        );
1998        assert_eq!(max_locks_warn_value("1024"), None);
1999    }
2000
2001    #[test]
2002    fn silent_for_unparseable_raw() {
2003        assert_eq!(max_locks_warn_value("not-a-number"), None);
2004    }
2005}
2006
2007#[cfg(test)]
2008mod partition_index_tests {
2009    use super::partition_index_to_i16;
2010    use ff_core::engine_error::{EngineError, ValidationKind};
2011    use ff_core::partition::{Partition, PartitionFamily};
2012
2013    #[test]
2014    fn accepts_values_within_i16_range() {
2015        let p = Partition { family: PartitionFamily::Flow, index: 0 };
2016        assert_eq!(partition_index_to_i16(p).unwrap(), 0);
2017
2018        let p = Partition { family: PartitionFamily::Flow, index: 255 };
2019        assert_eq!(partition_index_to_i16(p).unwrap(), 255);
2020
2021        let p = Partition { family: PartitionFamily::Budget, index: i16::MAX as u16 };
2022        assert_eq!(partition_index_to_i16(p).unwrap(), i16::MAX);
2023    }
2024
2025    #[test]
2026    fn rejects_overflow_above_i16_max() {
2027        let p = Partition { family: PartitionFamily::Flow, index: (i16::MAX as u16) + 1 };
2028        let err = partition_index_to_i16(p).unwrap_err();
2029        match err {
2030            EngineError::Validation { kind, detail } => {
2031                assert_eq!(kind, ValidationKind::InvalidInput);
2032                assert!(detail.contains("exceeds i16 range"), "unexpected detail: {detail}");
2033            }
2034            other => panic!("expected Validation error, got {other:?}"),
2035        }
2036    }
2037
2038    #[test]
2039    fn rejects_u16_max() {
2040        let p = Partition { family: PartitionFamily::Quota, index: u16::MAX };
2041        assert!(matches!(
2042            partition_index_to_i16(p),
2043            Err(EngineError::Validation { kind: ValidationKind::InvalidInput, .. })
2044        ));
2045    }
2046}