Skip to main content

ff_backend_postgres/
lib.rs

1//! `EngineBackend` implementation backed by Postgres.
2//!
3//! **RFC-v0.7 Wave 0 — scaffold.** This crate lands the
4//! [`PostgresBackend`] struct + the `impl EngineBackend` block with
5//! every method stubbed to return `EngineError::Unavailable`.
6//! Subsequent waves fill in bodies:
7//!
8//! * Wave 1: cross-cutting error/helpers.
9//! * Wave 2: describe / list / resolve (read surface).
10//! * Wave 3: schema migrations (replaces the Wave 0 placeholder).
11//! * Wave 4: LISTEN/NOTIFY wiring + stream reads/tails.
12//! * Wave 5-7: hot-path write methods.
13//! * Wave 8: ff-server wire-up + dual-backend running.
14//!
15//! The trait stays object-safe; consumers can hold
16//! `Arc<dyn EngineBackend>`. No ferriskey dep — this crate's
17//! transport is `sqlx`.
18
19#![allow(clippy::result_large_err)]
20
21use std::sync::Arc;
22use std::time::Duration;
23
24use async_trait::async_trait;
25
26use ff_core::backend::{
27    AppendFrameOutcome, BackendConfig, CancelFlowPolicy, CancelFlowWait, CapabilitySet,
28    ClaimPolicy, FailOutcome, FailureClass, FailureReason, Frame, Handle, LeaseRenewal,
29    PendingWaitpoint, ResumeToken, ResumeSignal, SummaryDocument, TailVisibility,
30    UsageDimensions,
31};
32#[cfg(feature = "core")]
33use ff_core::contracts::{
34    AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
35    ApplyDependencyToChildResult, ClaimResumedExecutionArgs, ClaimResumedExecutionResult,
36    CreateExecutionArgs, CreateExecutionResult, CreateFlowArgs, CreateFlowResult,
37    DeliverApprovalSignalArgs, DeliverSignalArgs, DeliverSignalResult, EdgeDependencyPolicy,
38    EdgeDirection, EdgeSnapshot,
39    ListExecutionsPage, ListFlowsPage, ListLanesPage, ListPendingWaitpointsArgs,
40    ListPendingWaitpointsResult, ListSuspendedPage, SetEdgeGroupPolicyResult,
41    StageDependencyEdgeArgs, StageDependencyEdgeResult,
42};
43#[cfg(feature = "core")]
44use ff_core::state::PublicState;
45use ff_core::contracts::{
46    CancelFlowResult, ExecutionContext, ExecutionSnapshot, FlowSnapshot, IssueReclaimGrantArgs,
47    IssueReclaimGrantOutcome, ReclaimExecutionArgs, ReclaimExecutionOutcome, ReportUsageResult,
48    RotateWaitpointHmacSecretAllArgs, RotateWaitpointHmacSecretAllResult, SeedOutcome,
49    SeedWaitpointHmacSecretArgs, SuspendArgs, SuspendOutcome,
50};
51#[cfg(feature = "core")]
52use ff_core::contracts::ExecutionInfo;
53// RFC-020 Wave 9 Spine-A pt.1 — operator-control mutating surfaces.
54#[cfg(feature = "core")]
55use ff_core::contracts::{
56    CancelExecutionArgs, CancelExecutionResult, RevokeLeaseArgs, RevokeLeaseResult,
57};
58// RFC-020 Wave 9 Spine-A pt.2 — operator-control + flow-cancel mutating surfaces.
59#[cfg(feature = "core")]
60use ff_core::contracts::{
61    CancelFlowArgs, CancelFlowHeader, ChangePriorityArgs, ChangePriorityResult,
62    ReplayExecutionArgs, ReplayExecutionResult,
63};
64// RFC-020 Wave 9 Standalone-1 — budget/quota admin surfaces.
65#[cfg(feature = "core")]
66use ff_core::contracts::{
67    BudgetStatus, CreateBudgetArgs, CreateBudgetResult, CreateQuotaPolicyArgs,
68    CreateQuotaPolicyResult, ReportUsageAdminArgs, ResetBudgetArgs, ResetBudgetResult,
69};
70#[cfg(feature = "streaming")]
71use ff_core::contracts::{StreamCursor, StreamFrames};
72use ff_core::engine_backend::{EngineBackend, ExpirePhase};
73use ff_core::engine_error::EngineError;
74#[cfg(feature = "core")]
75use ff_core::partition::PartitionKey;
76use ff_core::partition::{Partition, PartitionConfig};
77#[cfg(feature = "streaming")]
78use ff_core::types::AttemptIndex;
79#[cfg(feature = "core")]
80use ff_core::types::EdgeId;
81use ff_core::types::{BudgetId, ExecutionId, FlowId, LaneId, LeaseFence, TimestampMs};
82// Wave 5a — re-export `PgPool` so crates that depend on
83// `ff-backend-postgres` (and not `sqlx` directly) can name the pool
84// type in their own APIs (e.g. `ff-engine::dispatch_via_postgres`).
85pub use sqlx::PgPool;
86
87#[cfg(feature = "core")]
88mod admin;
89pub mod attempt;
90pub mod budget;
91pub mod claim_grant;
92pub mod completion;
93#[cfg(feature = "core")]
94pub mod dispatch;
95pub mod error;
96pub mod exec_core;
97pub mod flow;
98#[cfg(feature = "core")]
99pub mod flow_staging;
100pub mod handle_codec;
101mod lease_event;
102mod lease_event_subscribe;
103pub mod listener;
104pub mod migrate;
105#[cfg(feature = "core")]
106pub mod operator;
107#[cfg(feature = "core")]
108mod operator_event;
109pub mod pool;
110#[cfg(feature = "core")]
111pub mod reconcilers;
112#[cfg(feature = "core")]
113pub mod scanner_supervisor;
114#[cfg(feature = "core")]
115pub mod scheduler;
116pub mod signal;
117mod signal_delivery_subscribe;
118mod signal_event;
119#[cfg(feature = "streaming")]
120pub mod stream;
121pub mod suspend;
122pub mod suspend_ops;
123#[cfg(feature = "core")]
124pub(crate) mod typed_ops;
125pub mod version;
126#[cfg(feature = "core")]
127pub mod worker_registry;
128
129pub use completion::{PostgresCompletionStream, COMPLETION_CHANNEL};
130pub use error::{map_sqlx_error, PostgresTransportError};
131pub use listener::StreamNotifier;
132pub use migrate::{apply_migrations, MigrationError};
133#[cfg(feature = "core")]
134pub use scanner_supervisor::{PostgresScannerConfig, PostgresScannerHandle};
135pub use version::check_schema_version;
136
137// Re-export the new `PostgresConnection` shape so consumers can name
138// it from this crate directly without dipping into `ff_core::backend`.
139// `BackendConfig` is already imported above and is part of the
140// `connect()` signature, so it re-exports transparently via
141// rustdoc — no explicit `pub use` needed.
142pub use ff_core::backend::PostgresConnection;
143
144/// Postgres-backed `EngineBackend`.
145///
146/// Wave 0 shape: holds a `sqlx::PgPool`, the deployment's
147/// [`PartitionConfig`] (Q5 — partition column survives on Postgres
148/// with hash partitioning across the same 256 slots Valkey uses),
149/// and an optional `ff_observability::Metrics` handle mirroring
150/// [`ff_backend_valkey::ValkeyBackend`]. Future waves add the
151/// [`StreamNotifier`] handle once Wave 4 wires up LISTEN/NOTIFY.
152/// RFC-018 Stage A: build a [`ff_core::capability::Supports`]
153/// snapshot for the Postgres backend at v0.9. `true` fields correspond
154/// to trait methods `PostgresBackend` overrides with a real body
155/// (ingress, scheduler, seed + rotate HMAC, flow cancel bulk path,
156/// stream reads, RFC-019 subscriptions, cross-cutting). `false` fields
157/// correspond to trait methods that still return
158/// `EngineError::Unavailable` on Postgres today — Wave 9 follow-up
159/// scope. See `docs/POSTGRES_PARITY_MATRIX.md` for the authoritative
160/// per-row status.
161///
162/// `prepare` is `true` on Postgres even though `prepare()` returns
163/// `PrepareOutcome::NoOp` (schema migrations are applied out-of-band).
164/// `Supports.prepare` means "can the consumer call `backend.prepare()`
165/// without getting `EngineError::Unavailable`?" — for Postgres the
166/// answer is yes; NoOp is a successful well-defined outcome. Gating
167/// the call off in consumer UIs based on a `false` bool would hide
168/// a callable + correct method.
169///
170/// `Supports` is `#[non_exhaustive]` so struct-literal construction
171/// from this crate is forbidden; we start from
172/// [`ff_core::capability::Supports::none`] and mutate named fields.
173fn postgres_supports_base() -> ff_core::capability::Supports {
174    let mut s = ff_core::capability::Supports::none();
175
176    // ── Flow bulk cancel (impl) ──
177    s.cancel_flow_wait_timeout = true;
178    s.cancel_flow_wait_indefinite = true;
179
180    // ── Admin seed + rotate HMAC (impl) ──
181    s.rotate_waitpoint_hmac_secret_all = true;
182    s.seed_waitpoint_hmac_secret = true;
183
184    // ── Scheduler ──
185    s.claim_for_worker = true;
186
187    // ── RFC-019 subscriptions ──
188    s.subscribe_lease_history = true;
189    s.subscribe_completion = true;
190    s.subscribe_signal_delivery = true;
191    s.subscribe_instance_tags = false;
192
193    // ── Streaming (RFC-015) ──
194    s.stream_durable_summary = true;
195    s.stream_best_effort_live = true;
196
197    // ── Boot (Postgres returns NoOp but call is callable + correct) ──
198    s.prepare = true;
199
200    // ── Wave 9 (v0.11) — operator control + read model + budget/quota
201    //    admin + list_pending_waitpoints + cancel_flow_header +
202    //    ack_cancel_member all ship concretely on Postgres via
203    //    RFC-020 Rev 7. subscribe_instance_tags remains `false` per
204    //    #311 (speculative demand, served by list_executions +
205    //    ScannerFilter::with_instance_tag today).
206    s.cancel_execution = true;
207    s.change_priority = true;
208    s.replay_execution = true;
209    s.revoke_lease = true;
210    s.read_execution_state = true;
211    s.read_execution_info = true;
212    s.get_execution_result = true;
213    s.budget_admin = true;
214    s.quota_admin = true;
215    s.list_pending_waitpoints = true;
216    s.cancel_flow_header = true;
217    s.ack_cancel_member = true;
218
219    // ── RFC-025 Phase 3 — worker registry ──
220    s.register_worker = true;
221    s.heartbeat_worker = true;
222    s.mark_worker_dead = true;
223    s.list_expired_leases = true;
224    s.list_workers = true;
225
226    // ── FF #511 Phase 2a — scheduler-admission primitives ──
227    // block_execution_for_admission + read_budget_usage_and_limits
228    // stay `false` on PG (scheduler is Valkey-only per RFC-023 §4.1).
229    s.release_admission = true;
230    s.read_quota_policy_limits = true;
231
232    s
233}
234
235pub struct PostgresBackend {
236    #[allow(dead_code)] // filled in across waves 2-7
237    pool: PgPool,
238    #[allow(dead_code)]
239    partition_config: PartitionConfig,
240    #[allow(dead_code)]
241    metrics: Option<Arc<ff_observability::Metrics>>,
242    /// Wave 4: shared LISTEN notifier. Present on `connect()`-built
243    /// backends; `None` on bare `from_pool` constructions that skip
244    /// LISTEN wiring (tests that only exercise the write path).
245    #[allow(dead_code)]
246    stream_notifier: Option<Arc<StreamNotifier>>,
247    /// RFC-017 Wave 8 Stage E3: scanner supervisor handle. Spawned
248    /// during [`Self::connect_with_metrics`] when the caller opts in
249    /// via [`Self::spawn_scanners_during_connect`]; drained on
250    /// [`EngineBackend::shutdown_prepare`]. `None` on `from_pool` /
251    /// test builds that don't want background reconcilers.
252    #[cfg(feature = "core")]
253    scanner_handle: Option<Arc<scanner_supervisor::PostgresScannerHandle>>,
254}
255
256impl PostgresBackend {
257    /// Dial Postgres with [`BackendConfig`] and return the backend as
258    /// `Arc<dyn EngineBackend>`. Modeled on
259    /// [`ff_backend_valkey::ValkeyBackend::connect`] so ff-server /
260    /// SDK call sites can swap backends without changing the
261    /// constructor shape.
262    ///
263    /// **Wave 0:** builds the pool and constructs the backend. Does
264    /// NOT run migrations (Q12 — operator out-of-band). Does NOT run
265    /// the schema-version check (Wave 3 adds the version const and
266    /// wires [`check_schema_version`] in). Does NOT start the LISTEN
267    /// task (Wave 4).
268    ///
269    /// Returns `EngineError::Unavailable` when the config's
270    /// connection arm is not Postgres.
271    pub async fn connect(config: BackendConfig) -> Result<Arc<dyn EngineBackend>, EngineError> {
272        let pool = pool::build_pool(&config).await?;
273        warn_if_max_locks_low(&pool).await;
274        let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
275        let backend = Self {
276            pool,
277            partition_config: PartitionConfig::default(),
278            metrics: None,
279            stream_notifier,
280            #[cfg(feature = "core")]
281            scanner_handle: None,
282        };
283        Ok(Arc::new(backend))
284    }
285
286    /// Test / advanced constructor: build a `PostgresBackend` from an
287    /// already-constructed `PgPool` + explicit partition config. No
288    /// network I/O. Useful for integration tests against a shared
289    /// pool and for a future migration CLI that wants to reuse a pool
290    /// across migrate-run + smoke-check.
291    pub fn from_pool(pool: PgPool, partition_config: PartitionConfig) -> Arc<Self> {
292        let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
293        Arc::new(Self {
294            pool,
295            partition_config,
296            metrics: None,
297            stream_notifier,
298            #[cfg(feature = "core")]
299            scanner_handle: None,
300        })
301    }
302
303    /// RFC-017 Wave 8 Stage E1: dial Postgres with an explicit
304    /// [`PartitionConfig`] + shared [`ff_observability::Metrics`].
305    /// Mirrors [`ff_backend_valkey::ValkeyBackend::connect_with_metrics`]
306    /// so `ff-server::Server::start_with_metrics` can wire the Postgres
307    /// branch without reaching into the pool builder directly.
308    ///
309    /// Returns a concrete `Arc<Self>` rather than `Arc<dyn EngineBackend>`
310    /// so the caller can cast to the trait object after any additional
311    /// field installs (parallel to the Valkey path which calls
312    /// `with_scheduler` / `with_stream_semaphore_permits` before the
313    /// cast). Stage E1 does NOT run `apply_migrations` — schema
314    /// provisioning is an operator concern (matches the Wave 0 contract
315    /// on [`Self::connect`]).
316    pub async fn connect_with_metrics(
317        config: BackendConfig,
318        partition_config: PartitionConfig,
319        metrics: Arc<ff_observability::Metrics>,
320    ) -> Result<Arc<Self>, EngineError> {
321        let pool = pool::build_pool(&config).await?;
322        warn_if_max_locks_low(&pool).await;
323        let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
324        Ok(Arc::new(Self {
325            pool,
326            partition_config,
327            metrics: Some(metrics),
328            stream_notifier,
329            #[cfg(feature = "core")]
330            scanner_handle: None,
331        }))
332    }
333
334    /// RFC-017 Wave 8 Stage E3: spawn the six Postgres reconcilers as
335    /// background tick loops. Returns `true` if the scanner handle
336    /// was installed; `false` if the `Arc<Self>` has outstanding
337    /// clones (mirrors the Valkey `with_*` pattern). Callers must
338    /// invoke this before publishing the `Arc<dyn EngineBackend>` so
339    /// the underlying `Arc::get_mut` succeeds.
340    #[cfg(feature = "core")]
341    pub fn with_scanners(
342        self: &mut Arc<Self>,
343        cfg: scanner_supervisor::PostgresScannerConfig,
344    ) -> bool {
345        let Some(inner) = Arc::get_mut(self) else {
346            return false;
347        };
348        let handle = scanner_supervisor::spawn_scanners(inner.pool.clone(), cfg);
349        inner.scanner_handle = Some(Arc::new(handle));
350        true
351    }
352
353    /// Accessor for the underlying `PgPool`. Stage E1 uses this so
354    /// `ff-server::Server::start_with_metrics` can run
355    /// [`apply_migrations`] on the same pool before handing the backend
356    /// out as `Arc<dyn EngineBackend>`.
357    pub fn pool(&self) -> &PgPool {
358        &self.pool
359    }
360
361    /// Create one execution row (+ seed the lane registry if new).
362    ///
363    /// **RFC-017 Stage A:** this inherent method is retained as a
364    /// thin wrapper around the module-level impl so existing in-tree
365    /// callers (ff-server request handlers, integration tests) keep
366    /// compiling. The trait-lifted entry point is
367    /// [`EngineBackend::create_execution`] below, which calls the
368    /// same impl. Return shape differs — inherent returns
369    /// `ExecutionId`, trait returns
370    /// [`CreateExecutionResult`] per RFC-017 §5 — so we cannot simply
371    /// replace the inherent method. A follow-up PR may deprecate
372    /// this inherent alongside the broader ingress shape alignment.
373    #[cfg(feature = "core")]
374    #[tracing::instrument(name = "pg.create_execution", skip_all)]
375    pub async fn create_execution(
376        &self,
377        args: CreateExecutionArgs,
378    ) -> Result<ExecutionId, EngineError> {
379        exec_core::create_execution_impl(&self.pool, &self.partition_config, args).await
380    }
381
382    // ── RFC-017 Stage A: inherent ingress methods retained for
383    // back-compat with in-tree test harnesses + ff-server direct
384    // calls. The trait-lifted peers (`EngineBackend::create_flow`
385    // etc.) delegate to the SAME module-level impls under the hood.
386    // Follow-up PR may sunset these inherents once all in-tree
387    // consumers route through `Arc<dyn EngineBackend>`.
388
389    #[cfg(feature = "core")]
390    #[tracing::instrument(name = "pg.create_flow.inherent", skip_all)]
391    pub async fn create_flow(
392        &self,
393        args: &CreateFlowArgs,
394    ) -> Result<CreateFlowResult, EngineError> {
395        flow_staging::create_flow(&self.pool, &self.partition_config, args).await
396    }
397
398    #[cfg(feature = "core")]
399    #[tracing::instrument(name = "pg.add_execution_to_flow.inherent", skip_all)]
400    pub async fn add_execution_to_flow(
401        &self,
402        args: &AddExecutionToFlowArgs,
403    ) -> Result<AddExecutionToFlowResult, EngineError> {
404        flow_staging::add_execution_to_flow(&self.pool, &self.partition_config, args).await
405    }
406
407    #[cfg(feature = "core")]
408    #[tracing::instrument(name = "pg.stage_dependency_edge.inherent", skip_all)]
409    pub async fn stage_dependency_edge(
410        &self,
411        args: &StageDependencyEdgeArgs,
412    ) -> Result<StageDependencyEdgeResult, EngineError> {
413        flow_staging::stage_dependency_edge(&self.pool, &self.partition_config, args).await
414    }
415
416    #[cfg(feature = "core")]
417    #[tracing::instrument(name = "pg.apply_dependency_to_child.inherent", skip_all)]
418    pub async fn apply_dependency_to_child(
419        &self,
420        args: &ApplyDependencyToChildArgs,
421    ) -> Result<ApplyDependencyToChildResult, EngineError> {
422        flow_staging::apply_dependency_to_child(&self.pool, &self.partition_config, args).await
423    }
424}
425
426/// Short helper: every stub method returns this. Kept as a function
427/// (rather than a macro) so rust-analyzer / IDE jumps show a single
428/// definition site for the Wave 0 stub pattern.
429#[inline]
430#[cfg_attr(feature = "streaming", allow(dead_code))]
431fn unavailable<T>(op: &'static str) -> Result<T, EngineError> {
432    Err(EngineError::Unavailable { op })
433}
434
435#[async_trait]
436impl EngineBackend for PostgresBackend {
437    // ── Claim + lifecycle ──
438
439    #[tracing::instrument(name = "pg.claim", skip_all)]
440    async fn claim(
441        &self,
442        lane: &LaneId,
443        capabilities: &CapabilitySet,
444        policy: ClaimPolicy,
445    ) -> Result<Option<Handle>, EngineError> {
446        attempt::claim(&self.pool, lane, capabilities, &policy).await
447    }
448
449    #[tracing::instrument(name = "pg.renew", skip_all)]
450    async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError> {
451        attempt::renew(&self.pool, handle).await
452    }
453
454    #[tracing::instrument(name = "pg.progress", skip_all)]
455    async fn progress(
456        &self,
457        handle: &Handle,
458        percent: Option<u8>,
459        message: Option<String>,
460    ) -> Result<(), EngineError> {
461        attempt::progress(&self.pool, handle, percent, message).await
462    }
463
464    #[tracing::instrument(name = "pg.append_frame", skip_all)]
465    async fn append_frame(
466        &self,
467        handle: &Handle,
468        frame: Frame,
469    ) -> Result<AppendFrameOutcome, EngineError> {
470        #[cfg(feature = "streaming")]
471        {
472            stream::append_frame(&self.pool, &self.partition_config, handle, frame).await
473        }
474        #[cfg(not(feature = "streaming"))]
475        {
476            let _ = (handle, frame);
477            unavailable("pg.append_frame")
478        }
479    }
480
481    #[tracing::instrument(name = "pg.complete", skip_all)]
482    async fn complete(
483        &self,
484        handle: &Handle,
485        payload: Option<Vec<u8>>,
486    ) -> Result<(), EngineError> {
487        attempt::complete(&self.pool, handle, payload).await
488    }
489
490    #[tracing::instrument(name = "pg.fail", skip_all)]
491    async fn fail(
492        &self,
493        handle: &Handle,
494        reason: FailureReason,
495        classification: FailureClass,
496    ) -> Result<FailOutcome, EngineError> {
497        attempt::fail(&self.pool, handle, reason, classification).await
498    }
499
500    #[tracing::instrument(name = "pg.cancel", skip_all)]
501    async fn cancel(&self, handle: &Handle, reason: &str) -> Result<(), EngineError> {
502        let payload = handle_codec::decode_handle(handle)?;
503        exec_core::cancel_impl(
504            &self.pool,
505            &self.partition_config,
506            &payload.execution_id,
507            reason,
508        )
509        .await
510    }
511
512    #[tracing::instrument(name = "pg.suspend", skip_all)]
513    async fn suspend(
514        &self,
515        handle: &Handle,
516        args: SuspendArgs,
517    ) -> Result<SuspendOutcome, EngineError> {
518        suspend_ops::suspend_impl(&self.pool, &self.partition_config, handle, args).await
519    }
520
521    #[tracing::instrument(name = "pg.suspend_by_triple", skip_all)]
522    async fn suspend_by_triple(
523        &self,
524        exec_id: ExecutionId,
525        triple: LeaseFence,
526        args: SuspendArgs,
527    ) -> Result<SuspendOutcome, EngineError> {
528        suspend_ops::suspend_by_triple_impl(
529            &self.pool,
530            &self.partition_config,
531            exec_id,
532            triple,
533            args,
534        )
535        .await
536    }
537
538    #[tracing::instrument(name = "pg.create_waitpoint", skip_all)]
539    async fn create_waitpoint(
540        &self,
541        handle: &Handle,
542        waitpoint_key: &str,
543        expires_in: Duration,
544    ) -> Result<PendingWaitpoint, EngineError> {
545        suspend_ops::create_waitpoint_impl(&self.pool, handle, waitpoint_key, expires_in).await
546    }
547
548    #[cfg(feature = "core")]
549    #[tracing::instrument(name = "pg.read_waitpoint_token", skip_all)]
550    async fn read_waitpoint_token(
551        &self,
552        partition: PartitionKey,
553        waitpoint_id: &ff_core::types::WaitpointId,
554    ) -> Result<Option<String>, EngineError> {
555        suspend_ops::read_waitpoint_token_impl(&self.pool, &partition, waitpoint_id).await
556    }
557
558    #[tracing::instrument(name = "pg.observe_signals", skip_all)]
559    async fn observe_signals(
560        &self,
561        handle: &Handle,
562    ) -> Result<Vec<ResumeSignal>, EngineError> {
563        suspend_ops::observe_signals_impl(&self.pool, handle).await
564    }
565
566    #[tracing::instrument(name = "pg.claim_from_resume_grant", skip_all)]
567    async fn claim_from_resume_grant(
568        &self,
569        token: ResumeToken,
570    ) -> Result<Option<Handle>, EngineError> {
571        attempt::claim_from_resume_grant(&self.pool, token).await
572    }
573
574    // RFC-024 PR-D — lease-reclaim grant issuance + consumption.
575
576    #[tracing::instrument(name = "pg.issue_reclaim_grant", skip_all)]
577    async fn issue_reclaim_grant(
578        &self,
579        args: IssueReclaimGrantArgs,
580    ) -> Result<IssueReclaimGrantOutcome, EngineError> {
581        claim_grant::issue_reclaim_grant_impl(&self.pool, args).await
582    }
583
584    #[tracing::instrument(name = "pg.reclaim_execution", skip_all)]
585    async fn reclaim_execution(
586        &self,
587        args: ReclaimExecutionArgs,
588    ) -> Result<ReclaimExecutionOutcome, EngineError> {
589        claim_grant::reclaim_execution_impl(&self.pool, args).await
590    }
591
592    #[tracing::instrument(name = "pg.delay", skip_all)]
593    async fn delay(
594        &self,
595        handle: &Handle,
596        delay_until: TimestampMs,
597    ) -> Result<(), EngineError> {
598        attempt::delay(&self.pool, handle, delay_until).await
599    }
600
601    #[tracing::instrument(name = "pg.wait_children", skip_all)]
602    async fn wait_children(&self, handle: &Handle) -> Result<(), EngineError> {
603        attempt::wait_children(&self.pool, handle).await
604    }
605
606    // ── Read / admin ──
607
608    #[tracing::instrument(name = "pg.describe_execution", skip_all)]
609    async fn describe_execution(
610        &self,
611        id: &ExecutionId,
612    ) -> Result<Option<ExecutionSnapshot>, EngineError> {
613        exec_core::describe_execution_impl(&self.pool, &self.partition_config, id).await
614    }
615
616    #[tracing::instrument(name = "pg.read_execution_context", skip_all)]
617    async fn read_execution_context(
618        &self,
619        execution_id: &ExecutionId,
620    ) -> Result<ExecutionContext, EngineError> {
621        exec_core::read_execution_context_impl(&self.pool, &self.partition_config, execution_id)
622            .await
623    }
624
625    #[tracing::instrument(name = "pg.read_current_attempt_index", skip_all)]
626    async fn read_current_attempt_index(
627        &self,
628        execution_id: &ExecutionId,
629    ) -> Result<ff_core::types::AttemptIndex, EngineError> {
630        exec_core::read_current_attempt_index_impl(
631            &self.pool,
632            &self.partition_config,
633            execution_id,
634        )
635        .await
636    }
637
638    #[tracing::instrument(name = "pg.read_total_attempt_count", skip_all)]
639    async fn read_total_attempt_count(
640        &self,
641        execution_id: &ExecutionId,
642    ) -> Result<ff_core::types::AttemptIndex, EngineError> {
643        exec_core::read_total_attempt_count_impl(
644            &self.pool,
645            &self.partition_config,
646            execution_id,
647        )
648        .await
649    }
650
651    #[tracing::instrument(name = "pg.describe_flow", skip_all)]
652    async fn describe_flow(
653        &self,
654        id: &FlowId,
655    ) -> Result<Option<FlowSnapshot>, EngineError> {
656        flow::describe_flow(&self.pool, &self.partition_config, id).await
657    }
658
659    #[tracing::instrument(name = "pg.set_execution_tag", skip_all)]
660    async fn set_execution_tag(
661        &self,
662        execution_id: &ExecutionId,
663        key: &str,
664        value: &str,
665    ) -> Result<(), EngineError> {
666        ff_core::engine_backend::validate_tag_key(key)?;
667        exec_core::set_execution_tag_impl(&self.pool, execution_id, key, value).await
668    }
669
670    #[tracing::instrument(name = "pg.set_flow_tag", skip_all)]
671    async fn set_flow_tag(
672        &self,
673        flow_id: &FlowId,
674        key: &str,
675        value: &str,
676    ) -> Result<(), EngineError> {
677        ff_core::engine_backend::validate_tag_key(key)?;
678        flow::set_flow_tag_impl(&self.pool, &self.partition_config, flow_id, key, value).await
679    }
680
681    #[tracing::instrument(name = "pg.get_execution_tag", skip_all)]
682    async fn get_execution_tag(
683        &self,
684        execution_id: &ExecutionId,
685        key: &str,
686    ) -> Result<Option<String>, EngineError> {
687        ff_core::engine_backend::validate_tag_key(key)?;
688        exec_core::get_execution_tag_impl(&self.pool, execution_id, key).await
689    }
690
691    #[tracing::instrument(name = "pg.get_flow_tag", skip_all)]
692    async fn get_flow_tag(
693        &self,
694        flow_id: &FlowId,
695        key: &str,
696    ) -> Result<Option<String>, EngineError> {
697        ff_core::engine_backend::validate_tag_key(key)?;
698        flow::get_flow_tag_impl(&self.pool, &self.partition_config, flow_id, key).await
699    }
700
701    #[tracing::instrument(name = "pg.get_execution_namespace", skip_all)]
702    async fn get_execution_namespace(
703        &self,
704        execution_id: &ExecutionId,
705    ) -> Result<Option<String>, EngineError> {
706        exec_core::get_execution_namespace_impl(&self.pool, execution_id).await
707    }
708
709    #[cfg(feature = "core")]
710    #[tracing::instrument(name = "pg.list_edges", skip_all)]
711    async fn list_edges(
712        &self,
713        flow_id: &FlowId,
714        direction: EdgeDirection,
715    ) -> Result<Vec<EdgeSnapshot>, EngineError> {
716        flow::list_edges(&self.pool, &self.partition_config, flow_id, direction).await
717    }
718
719    #[cfg(feature = "core")]
720    #[tracing::instrument(name = "pg.describe_edge", skip_all)]
721    async fn describe_edge(
722        &self,
723        flow_id: &FlowId,
724        edge_id: &EdgeId,
725    ) -> Result<Option<EdgeSnapshot>, EngineError> {
726        flow::describe_edge(&self.pool, &self.partition_config, flow_id, edge_id).await
727    }
728
729    #[cfg(feature = "core")]
730    #[tracing::instrument(name = "pg.resolve_execution_flow_id", skip_all)]
731    async fn resolve_execution_flow_id(
732        &self,
733        eid: &ExecutionId,
734    ) -> Result<Option<FlowId>, EngineError> {
735        exec_core::resolve_execution_flow_id_impl(&self.pool, &self.partition_config, eid).await
736    }
737
738    #[cfg(feature = "core")]
739    #[tracing::instrument(name = "pg.list_flows", skip_all)]
740    async fn list_flows(
741        &self,
742        partition: PartitionKey,
743        cursor: Option<FlowId>,
744        limit: usize,
745    ) -> Result<ListFlowsPage, EngineError> {
746        flow::list_flows(&self.pool, partition, cursor, limit).await
747    }
748
749    #[cfg(feature = "core")]
750    #[tracing::instrument(name = "pg.list_lanes", skip_all)]
751    async fn list_lanes(
752        &self,
753        cursor: Option<LaneId>,
754        limit: usize,
755    ) -> Result<ListLanesPage, EngineError> {
756        admin::list_lanes_impl(&self.pool, cursor, limit).await
757    }
758
759    #[cfg(feature = "core")]
760    #[tracing::instrument(name = "pg.list_suspended", skip_all)]
761    async fn list_suspended(
762        &self,
763        partition: PartitionKey,
764        cursor: Option<ExecutionId>,
765        limit: usize,
766    ) -> Result<ListSuspendedPage, EngineError> {
767        admin::list_suspended_impl(&self.pool, partition, cursor, limit).await
768    }
769
770    #[cfg(feature = "core")]
771    #[tracing::instrument(name = "pg.list_executions", skip_all)]
772    async fn list_executions(
773        &self,
774        partition: PartitionKey,
775        cursor: Option<ExecutionId>,
776        limit: usize,
777    ) -> Result<ListExecutionsPage, EngineError> {
778        exec_core::list_executions_impl(
779            &self.pool,
780            &self.partition_config,
781            partition,
782            cursor,
783            limit,
784        )
785        .await
786    }
787
788    // ── Trigger ops (issue #150) ──
789
790    #[cfg(feature = "core")]
791    #[tracing::instrument(name = "pg.deliver_signal", skip_all)]
792    async fn deliver_signal(
793        &self,
794        args: DeliverSignalArgs,
795    ) -> Result<DeliverSignalResult, EngineError> {
796        suspend_ops::deliver_signal_impl(&self.pool, &self.partition_config, args).await
797    }
798
799    #[cfg(feature = "core")]
800    #[tracing::instrument(name = "pg.deliver_approval_signal", skip_all)]
801    async fn deliver_approval_signal(
802        &self,
803        args: DeliverApprovalSignalArgs,
804    ) -> Result<DeliverSignalResult, EngineError> {
805        suspend_ops::deliver_approval_signal_impl(&self.pool, &self.partition_config, args).await
806    }
807
808    #[cfg(feature = "core")]
809    #[tracing::instrument(name = "pg.claim_resumed_execution", skip_all)]
810    async fn claim_resumed_execution(
811        &self,
812        args: ClaimResumedExecutionArgs,
813    ) -> Result<ClaimResumedExecutionResult, EngineError> {
814        suspend_ops::claim_resumed_execution_impl(&self.pool, &self.partition_config, args).await
815    }
816
817    // ── RFC-020 Wave 9 Spine-B — read model (3 methods, §4.1) ────────
818    //
819    // Partition-local single-row reads against `ff_exec_core` (+ LATERAL
820    // join on `ff_attempt` for `read_execution_info`). READ COMMITTED
821    // (no CAS; all three are read-only). `get_execution_result` returns
822    // current-attempt semantics per §7.8 (matches Valkey's
823    // `GET ctx.result()` primitive). Capability flips land at the Wave 9
824    // release PR per RFC §6.3.
825
826    #[cfg(feature = "core")]
827    #[tracing::instrument(name = "pg.read_execution_state", skip_all)]
828    async fn read_execution_state(
829        &self,
830        id: &ExecutionId,
831    ) -> Result<Option<PublicState>, EngineError> {
832        exec_core::read_execution_state_impl(&self.pool, &self.partition_config, id).await
833    }
834
835    #[cfg(feature = "core")]
836    #[tracing::instrument(name = "pg.read_execution_info", skip_all)]
837    async fn read_execution_info(
838        &self,
839        id: &ExecutionId,
840    ) -> Result<Option<ExecutionInfo>, EngineError> {
841        exec_core::read_execution_info_impl(&self.pool, &self.partition_config, id).await
842    }
843
844    #[tracing::instrument(name = "pg.get_execution_result", skip_all)]
845    async fn get_execution_result(
846        &self,
847        id: &ExecutionId,
848    ) -> Result<Option<Vec<u8>>, EngineError> {
849        exec_core::get_execution_result_impl(&self.pool, &self.partition_config, id).await
850    }
851
852    // ── RFC-020 Wave 9 Standalone-2 — list_pending_waitpoints (§4.5) ─
853    //
854    // Read-only projection of `ff_waitpoint_pending` serving the 10-
855    // field `PendingWaitpointInfo` contract. Producer-side writes of
856    // the 3 new 0011 columns (`state`, `required_signal_names`,
857    // `activated_at_ms`) land alongside this method in the same PR —
858    // see `suspend_ops::suspend_core` INSERT site. Capability flip
859    // deferred to Wave 9 release PR per RFC §6.3.
860    #[cfg(feature = "core")]
861    #[tracing::instrument(name = "pg.list_pending_waitpoints", skip_all)]
862    async fn list_pending_waitpoints(
863        &self,
864        args: ListPendingWaitpointsArgs,
865    ) -> Result<ListPendingWaitpointsResult, EngineError> {
866        suspend_ops::list_pending_waitpoints_impl(&self.pool, args).await
867    }
868
869    // ── RFC-020 Wave 9 Spine-A pt.1 — operator-control mutations (§4.2) ─
870    //
871    // Two methods landing behind `Supports.cancel_execution` +
872    // `Supports.revoke_lease` (both stay `false` until the Wave 9
873    // release PR flips them atomically, RFC §6.3). SERIALIZABLE + CAS +
874    // `ff_lease_event` outbox emit on the same tx (§4.2.6 + §4.2.7).
875
876    #[cfg(feature = "core")]
877    #[tracing::instrument(name = "pg.cancel_execution", skip_all)]
878    async fn cancel_execution(
879        &self,
880        args: CancelExecutionArgs,
881    ) -> Result<CancelExecutionResult, EngineError> {
882        operator::cancel_execution_impl(&self.pool, args).await
883    }
884
885    #[cfg(feature = "core")]
886    #[tracing::instrument(name = "pg.revoke_lease", skip_all)]
887    async fn revoke_lease(
888        &self,
889        args: RevokeLeaseArgs,
890    ) -> Result<RevokeLeaseResult, EngineError> {
891        operator::revoke_lease_impl(&self.pool, args).await
892    }
893
894    // ── RFC-020 Wave 9 Spine-A pt.2 — operator control + flow cancel (§4.2.3 + §4.2.4 + §4.2.5) ─
895    //
896    // Four methods landing behind `Supports.change_priority` +
897    // `Supports.replay_execution` + `Supports.cancel_flow_header` +
898    // `Supports.ack_cancel_member` (all stay `false` until the Wave 9
899    // release PR flips them atomically, RFC §6.3). SERIALIZABLE + CAS +
900    // `ff_operator_event` outbox emit on the same tx (§4.2.6 + §4.2.7).
901    // `ack_cancel_member` is silent on the outbox (Valkey-parity).
902
903    #[cfg(feature = "core")]
904    #[tracing::instrument(name = "pg.change_priority", skip_all)]
905    async fn change_priority(
906        &self,
907        args: ChangePriorityArgs,
908    ) -> Result<ChangePriorityResult, EngineError> {
909        operator::change_priority_impl(&self.pool, args).await
910    }
911
912    #[cfg(feature = "core")]
913    #[tracing::instrument(name = "pg.replay_execution", skip_all)]
914    async fn replay_execution(
915        &self,
916        args: ReplayExecutionArgs,
917    ) -> Result<ReplayExecutionResult, EngineError> {
918        operator::replay_execution_impl(&self.pool, args).await
919    }
920
921    #[cfg(feature = "core")]
922    #[tracing::instrument(name = "pg.cancel_flow_header", skip_all)]
923    async fn cancel_flow_header(
924        &self,
925        args: CancelFlowArgs,
926    ) -> Result<CancelFlowHeader, EngineError> {
927        operator::cancel_flow_header_impl(&self.pool, &self.partition_config, args).await
928    }
929
930    #[cfg(feature = "core")]
931    #[tracing::instrument(name = "pg.ack_cancel_member", skip_all)]
932    async fn ack_cancel_member(
933        &self,
934        flow_id: &FlowId,
935        execution_id: &ExecutionId,
936    ) -> Result<(), EngineError> {
937        operator::ack_cancel_member_impl(
938            &self.pool,
939            &self.partition_config,
940            flow_id.clone(),
941            execution_id.clone(),
942        )
943        .await
944    }
945
946    // ── RFC-017 Stage A — ingress (promoted from inherent) ────
947
948    /// RFC-017 Wave 8 Stage E1: lift the inherent
949    /// [`PostgresBackend::create_execution`] onto the trait so
950    /// ff-server's migrated HTTP handler can dispatch to Postgres.
951    /// Post-insert the row is idempotent; the Postgres impl does not
952    /// distinguish `Created` from `Duplicate` at the helper level
953    /// (both paths commit and return the execution id), so we always
954    /// surface `Created { public_state: Waiting }` here. A follow-up
955    /// may lift the distinction if a consumer relies on it.
956    #[cfg(feature = "core")]
957    #[tracing::instrument(name = "pg.create_execution.trait", skip_all)]
958    async fn create_execution(
959        &self,
960        args: CreateExecutionArgs,
961    ) -> Result<CreateExecutionResult, EngineError> {
962        let eid = args.execution_id.clone();
963        exec_core::create_execution_impl(&self.pool, &self.partition_config, args).await?;
964        Ok(CreateExecutionResult::Created {
965            execution_id: eid,
966            public_state: PublicState::Waiting,
967        })
968    }
969
970    #[cfg(feature = "core")]
971    #[tracing::instrument(name = "pg.create_flow", skip_all)]
972    async fn create_flow(
973        &self,
974        args: CreateFlowArgs,
975    ) -> Result<CreateFlowResult, EngineError> {
976        flow_staging::create_flow(&self.pool, &self.partition_config, &args).await
977    }
978
979    #[cfg(feature = "core")]
980    #[tracing::instrument(name = "pg.add_execution_to_flow", skip_all)]
981    async fn add_execution_to_flow(
982        &self,
983        args: AddExecutionToFlowArgs,
984    ) -> Result<AddExecutionToFlowResult, EngineError> {
985        flow_staging::add_execution_to_flow(&self.pool, &self.partition_config, &args).await
986    }
987
988    #[cfg(feature = "core")]
989    #[tracing::instrument(name = "pg.stage_dependency_edge", skip_all)]
990    async fn stage_dependency_edge(
991        &self,
992        args: StageDependencyEdgeArgs,
993    ) -> Result<StageDependencyEdgeResult, EngineError> {
994        flow_staging::stage_dependency_edge(&self.pool, &self.partition_config, &args).await
995    }
996
997    #[cfg(feature = "core")]
998    #[tracing::instrument(name = "pg.apply_dependency_to_child", skip_all)]
999    async fn apply_dependency_to_child(
1000        &self,
1001        args: ApplyDependencyToChildArgs,
1002    ) -> Result<ApplyDependencyToChildResult, EngineError> {
1003        flow_staging::apply_dependency_to_child(&self.pool, &self.partition_config, &args).await
1004    }
1005
1006    // PR-7b Cluster 4. Async-via-outbox cascade — see trait rustdoc
1007    // "Timing semantics" for the Valkey-sync vs PG-outbox divergence.
1008    // This resolves the payload to its `ff_completion_event.event_id`
1009    // and invokes the existing Wave-5a `dispatch_completion`; further
1010    // hops ride their own outbox events emitted by the per-hop tx.
1011    #[cfg(feature = "core")]
1012    async fn cascade_completion(
1013        &self,
1014        payload: &ff_core::backend::CompletionPayload,
1015    ) -> Result<ff_core::contracts::CascadeOutcome, EngineError> {
1016        let event_id = match resolve_event_id(&self.pool, payload).await {
1017            Some(id) => id,
1018            None => {
1019                // Event not materialised (outbox race or pre-subscribe
1020                // payload) — reconciler is the backstop.
1021                tracing::warn!(
1022                    execution_id = %payload.execution_id,
1023                    produced_at_ms = payload.produced_at_ms.0,
1024                    "pg.cascade_completion: could not resolve event_id; reconciler will claim"
1025                );
1026                return Ok(ff_core::contracts::CascadeOutcome::async_dispatched(0));
1027            }
1028        };
1029        let outcome = crate::dispatch::dispatch_completion(&self.pool, event_id).await?;
1030        let advanced = match outcome {
1031            crate::dispatch::DispatchOutcome::NoOp => 0,
1032            crate::dispatch::DispatchOutcome::Advanced(n) => n,
1033        };
1034        Ok(ff_core::contracts::CascadeOutcome::async_dispatched(advanced))
1035    }
1036
1037    fn backend_label(&self) -> &'static str {
1038        "postgres"
1039    }
1040
1041    /// RFC-018 Stage A: populate the `Capabilities` snapshot from the
1042    /// static [`postgres_supports_base`] shape. The Postgres backend
1043    /// landed through RFC-017 Stage E4 at v0.8.0; fields still `false`
1044    /// correspond to Wave-9 follow-up work (`cancel_flow_header`,
1045    /// `ack_cancel_member`, read-model, operator control, budget /
1046    /// quota, `list_pending_waitpoints`). See
1047    /// `docs/POSTGRES_PARITY_MATRIX.md` for the per-row breakdown.
1048    fn capabilities(&self) -> ff_core::capability::Capabilities {
1049        ff_core::capability::Capabilities::new(
1050            ff_core::capability::BackendIdentity::new(
1051                "postgres",
1052                ff_core::capability::Version::new(0, 11, 0),
1053                "E-shipped",
1054            ),
1055            postgres_supports_base(),
1056        )
1057    }
1058
1059    /// Issue #281: no-op. Schema migrations are applied out-of-band
1060    /// per `rfcs/drafts/v0.7-migration-master.md §Q12` (operator runs
1061    /// `sqlx migrate run` or the future `ff-migrate` CLI). Boot runs a
1062    /// schema-version check at connect time
1063    /// ([`crate::version::check_schema_version`]) and refuses to
1064    /// start on mismatch, so by the time `prepare()` is callable
1065    /// there is nothing further to do.
1066    async fn prepare(
1067        &self,
1068    ) -> Result<ff_core::backend::PrepareOutcome, EngineError> {
1069        Ok(ff_core::backend::PrepareOutcome::NoOp)
1070    }
1071
1072    /// RFC-017 Wave 8 Stage E3 (§4 row 9, §7): forward the claim to the
1073    /// Postgres-native admission pipeline. Returns `NoWork` when no
1074    /// eligible execution is admissible this scan cycle. Budget
1075    /// breaches surface as `NoWork` (leaving the row eligible for a
1076    /// retry by another worker); validation-class rejections
1077    /// (malformed partition, unknown kid) surface as typed
1078    /// [`EngineError`] variants mapped to the Server's 400/503 arms.
1079    #[cfg(feature = "core")]
1080    #[tracing::instrument(name = "pg.claim_for_worker", skip_all)]
1081    async fn claim_for_worker(
1082        &self,
1083        args: ff_core::contracts::ClaimForWorkerArgs,
1084    ) -> Result<ff_core::contracts::ClaimForWorkerOutcome, EngineError> {
1085        let sched = scheduler::PostgresScheduler::new(self.pool.clone());
1086        let grant_opt = sched
1087            .claim_for_worker(
1088                &args.lane_id,
1089                &args.worker_id,
1090                &args.worker_instance_id,
1091                &args.worker_capabilities,
1092                args.grant_ttl_ms,
1093            )
1094            .await?;
1095        Ok(match grant_opt {
1096            Some(g) => ff_core::contracts::ClaimForWorkerOutcome::granted(g),
1097            None => ff_core::contracts::ClaimForWorkerOutcome::no_work(),
1098        })
1099    }
1100
1101    async fn ping(&self) -> Result<(), EngineError> {
1102        // Postgres analogue to Valkey PING — single-round-trip pool
1103        // liveness. Errors propagate as transport-class EngineError via
1104        // the existing sqlx→EngineError map.
1105        let _ = sqlx::query_scalar::<_, i32>("SELECT 1")
1106            .fetch_one(&self.pool)
1107            .await
1108            .map_err(error::map_sqlx_error)?;
1109        Ok(())
1110    }
1111
1112    /// RFC-017 Wave 8 Stage E3: drain the scanner supervisor's
1113    /// reconciler tasks up to `grace`, then close the sqlx pool.
1114    /// Matches the Valkey backend's shutdown_prepare contract —
1115    /// bounded best-effort drain, never returns an error.
1116    async fn shutdown_prepare(&self, grace: Duration) -> Result<(), EngineError> {
1117        #[cfg(feature = "core")]
1118        if let Some(handle) = self.scanner_handle.as_ref() {
1119            let timed_out = handle.shutdown(grace).await;
1120            if timed_out > 0 {
1121                tracing::warn!(
1122                    timed_out,
1123                    ?grace,
1124                    "postgres scanner supervisor exceeded grace on shutdown"
1125                );
1126            }
1127        }
1128        Ok(())
1129    }
1130
1131    #[tracing::instrument(name = "pg.cancel_flow", skip_all)]
1132    async fn cancel_flow(
1133        &self,
1134        id: &FlowId,
1135        policy: CancelFlowPolicy,
1136        wait: CancelFlowWait,
1137    ) -> Result<CancelFlowResult, EngineError> {
1138        let result = flow::cancel_flow(&self.pool, &self.partition_config, id, policy).await?;
1139        if let Some(deadline) = ff_core::engine_backend::cancel_flow_wait_deadline(wait) {
1140            ff_core::engine_backend::wait_for_flow_cancellation(self, id, deadline).await?;
1141        }
1142        Ok(result)
1143    }
1144
1145    #[cfg(feature = "core")]
1146    #[tracing::instrument(name = "pg.set_edge_group_policy", skip_all)]
1147    async fn set_edge_group_policy(
1148        &self,
1149        flow_id: &FlowId,
1150        downstream_execution_id: &ExecutionId,
1151        policy: EdgeDependencyPolicy,
1152    ) -> Result<SetEdgeGroupPolicyResult, EngineError> {
1153        flow::set_edge_group_policy(
1154            &self.pool,
1155            &self.partition_config,
1156            flow_id,
1157            downstream_execution_id,
1158            policy,
1159        )
1160        .await
1161    }
1162
1163    // ── Budget ──
1164
1165    #[tracing::instrument(name = "pg.report_usage", skip_all)]
1166    async fn report_usage(
1167        &self,
1168        _handle: &Handle,
1169        budget: &BudgetId,
1170        dimensions: UsageDimensions,
1171    ) -> Result<ReportUsageResult, EngineError> {
1172        budget::report_usage_impl(&self.pool, &self.partition_config, budget, dimensions).await
1173    }
1174
1175    // ── RFC-020 Wave 9 Standalone-1 — budget/quota admin (§4.4) ─────
1176    //
1177    // Five methods landing behind capability flags that stay `false`
1178    // until the Wave 9 release PR flips them atomically (RFC §6.3).
1179    // Schema + trait impls land now; capability-surface flip is one
1180    // PR later.
1181
1182    #[cfg(feature = "core")]
1183    #[tracing::instrument(name = "pg.create_budget", skip_all)]
1184    async fn create_budget(
1185        &self,
1186        args: CreateBudgetArgs,
1187    ) -> Result<CreateBudgetResult, EngineError> {
1188        budget::create_budget_impl(&self.pool, &self.partition_config, args).await
1189    }
1190
1191    #[cfg(feature = "core")]
1192    #[tracing::instrument(name = "pg.reset_budget", skip_all)]
1193    async fn reset_budget(
1194        &self,
1195        args: ResetBudgetArgs,
1196    ) -> Result<ResetBudgetResult, EngineError> {
1197        budget::reset_budget_impl(&self.pool, &self.partition_config, args).await
1198    }
1199
1200    #[cfg(feature = "core")]
1201    #[tracing::instrument(name = "pg.create_quota_policy", skip_all)]
1202    async fn create_quota_policy(
1203        &self,
1204        args: CreateQuotaPolicyArgs,
1205    ) -> Result<CreateQuotaPolicyResult, EngineError> {
1206        budget::create_quota_policy_impl(&self.pool, &self.partition_config, args).await
1207    }
1208
1209    #[cfg(feature = "core")]
1210    #[tracing::instrument(name = "pg.get_budget_status", skip_all)]
1211    async fn get_budget_status(
1212        &self,
1213        id: &BudgetId,
1214    ) -> Result<BudgetStatus, EngineError> {
1215        budget::get_budget_status_impl(&self.pool, &self.partition_config, id).await
1216    }
1217
1218    #[cfg(feature = "core")]
1219    #[tracing::instrument(name = "pg.report_usage_admin", skip_all)]
1220    async fn report_usage_admin(
1221        &self,
1222        budget_id: &BudgetId,
1223        args: ReportUsageAdminArgs,
1224    ) -> Result<ReportUsageResult, EngineError> {
1225        budget::report_usage_admin_impl(&self.pool, &self.partition_config, budget_id, args).await
1226    }
1227
1228    // ── cairn #454 Phase 4a — per-execution ledger (option A) ────────
1229
1230    #[cfg(feature = "core")]
1231    #[tracing::instrument(name = "pg.record_spend", skip_all)]
1232    async fn record_spend(
1233        &self,
1234        args: ff_core::contracts::RecordSpendArgs,
1235    ) -> Result<ReportUsageResult, EngineError> {
1236        budget::record_spend_impl(&self.pool, &self.partition_config, args).await
1237    }
1238
1239    #[cfg(feature = "core")]
1240    #[tracing::instrument(name = "pg.release_budget", skip_all)]
1241    async fn release_budget(
1242        &self,
1243        args: ff_core::contracts::ReleaseBudgetArgs,
1244    ) -> Result<(), EngineError> {
1245        budget::release_budget_impl(&self.pool, &self.partition_config, args).await
1246    }
1247
1248    #[cfg(feature = "core")]
1249    #[tracing::instrument(name = "pg.release_admission", skip_all)]
1250    async fn release_admission(
1251        &self,
1252        args: ff_core::contracts::ReleaseAdmissionArgs,
1253    ) -> Result<ff_core::contracts::ReleaseAdmissionResult, EngineError> {
1254        crate::typed_ops::release_admission(&self.pool, &self.partition_config, args).await
1255    }
1256
1257    #[cfg(feature = "core")]
1258    #[tracing::instrument(name = "pg.read_quota_policy_limits", skip_all)]
1259    async fn read_quota_policy_limits(
1260        &self,
1261        quota_policy_id: &ff_core::types::QuotaPolicyId,
1262    ) -> Result<Option<ff_core::contracts::QuotaPolicyLimits>, EngineError> {
1263        crate::typed_ops::read_quota_policy_limits(
1264            &self.pool,
1265            &self.partition_config,
1266            quota_policy_id,
1267        )
1268        .await
1269    }
1270
1271    // ── HMAC secret rotation (v0.7 migration-master Q4) ──
1272    //
1273    // Wave 4 replaces this stub with a single INSERT into
1274    // `ff_waitpoint_hmac(kid, secret, rotated_at)`. Wave 0/1 keep
1275    // the `Unavailable` shape so a running Postgres backend surfaces
1276    // the unimplemented op loudly rather than silently no-op'ing.
1277    #[tracing::instrument(name = "pg.rotate_waitpoint_hmac_secret_all", skip_all)]
1278    async fn rotate_waitpoint_hmac_secret_all(
1279        &self,
1280        args: RotateWaitpointHmacSecretAllArgs,
1281    ) -> Result<RotateWaitpointHmacSecretAllResult, EngineError> {
1282        // Wave 4 Agent D: Q4 single-global-row write against
1283        // `ff_waitpoint_hmac`. `now_ms` is captured here (not
1284        // inside the impl) so tests can inject a deterministic
1285        // clock via the pool layer in the future.
1286        let now_ms = std::time::SystemTime::now()
1287            .duration_since(std::time::UNIX_EPOCH)
1288            .map(|d| d.as_millis() as i64)
1289            .unwrap_or(0);
1290        signal::rotate_waitpoint_hmac_secret_all_impl(&self.pool, args, now_ms).await
1291    }
1292
1293    #[tracing::instrument(name = "pg.seed_waitpoint_hmac_secret", skip_all)]
1294    async fn seed_waitpoint_hmac_secret(
1295        &self,
1296        args: SeedWaitpointHmacSecretArgs,
1297    ) -> Result<SeedOutcome, EngineError> {
1298        // Issue #280: install-only boot-time seed against the global
1299        // `ff_waitpoint_hmac` table. Idempotent — cairn calls this on
1300        // every boot and the backend decides whether to INSERT.
1301        let now_ms = std::time::SystemTime::now()
1302            .duration_since(std::time::UNIX_EPOCH)
1303            .map(|d| d.as_millis() as i64)
1304            .unwrap_or(0);
1305        signal::seed_waitpoint_hmac_secret_impl(&self.pool, args, now_ms).await
1306    }
1307
1308    // ── Stream reads (streaming feature) ──
1309
1310    #[cfg(feature = "streaming")]
1311    #[tracing::instrument(name = "pg.read_stream", skip_all)]
1312    async fn read_stream(
1313        &self,
1314        execution_id: &ExecutionId,
1315        attempt_index: AttemptIndex,
1316        from: StreamCursor,
1317        to: StreamCursor,
1318        count_limit: u64,
1319    ) -> Result<StreamFrames, EngineError> {
1320        stream::read_stream(&self.pool, execution_id, attempt_index, from, to, count_limit).await
1321    }
1322
1323    #[cfg(feature = "streaming")]
1324    #[tracing::instrument(name = "pg.tail_stream", skip_all)]
1325    async fn tail_stream(
1326        &self,
1327        execution_id: &ExecutionId,
1328        attempt_index: AttemptIndex,
1329        after: StreamCursor,
1330        block_ms: u64,
1331        count_limit: u64,
1332        visibility: TailVisibility,
1333    ) -> Result<StreamFrames, EngineError> {
1334        let notifier = self
1335            .stream_notifier
1336            .as_ref()
1337            .ok_or(EngineError::Unavailable {
1338                op: "pg.tail_stream (notifier not initialised)",
1339            })?;
1340        stream::tail_stream(
1341            &self.pool,
1342            notifier,
1343            execution_id,
1344            attempt_index,
1345            after,
1346            block_ms,
1347            count_limit,
1348            visibility,
1349        )
1350        .await
1351    }
1352
1353    #[cfg(feature = "streaming")]
1354    #[tracing::instrument(name = "pg.read_summary", skip_all)]
1355    async fn read_summary(
1356        &self,
1357        execution_id: &ExecutionId,
1358        attempt_index: AttemptIndex,
1359    ) -> Result<Option<SummaryDocument>, EngineError> {
1360        stream::read_summary(&self.pool, execution_id, attempt_index).await
1361    }
1362
1363    // ── RFC-019 Stage A — `subscribe_completion` ──────────────────
1364    //
1365    // Postgres real impl. Wraps the existing `ff_completion_event`
1366    // outbox + `LISTEN ff_completion` machinery
1367    // (see `completion::subscribe`) and adapts each completion
1368    // payload into a `stream_subscribe::StreamEvent`.
1369    //
1370    // Cursor encoding: `POSTGRES_CURSOR_PREFIX (0x02)` + `event_id`
1371    // (i64 BE). Stage A resume-from-cursor is not plumbed through the
1372    // adapter (the existing subscriber tails from `max(event_id)`);
1373    // Stage B threads the cursor into the replay path. The surface is
1374    // correct today for consumers that subscribe from tail and
1375    // persist cursors for future resume.
1376    #[tracing::instrument(name = "pg.subscribe_completion", skip_all)]
1377    async fn subscribe_completion(
1378        &self,
1379        _cursor: ff_core::stream_subscribe::StreamCursor,
1380        filter: &ff_core::backend::ScannerFilter,
1381    ) -> Result<ff_core::stream_events::CompletionSubscription, EngineError> {
1382        use ff_core::stream_events::{CompletionEvent, CompletionOutcome};
1383        use ff_core::stream_subscribe::encode_postgres_event_cursor;
1384        use futures_core::Stream;
1385        use std::pin::Pin;
1386        use std::task::{Context, Poll};
1387
1388        // Delegate to the existing CompletionBackend implementation so
1389        // the LISTEN/replay machinery is shared. When a non-noop
1390        // `ScannerFilter` (#282) is supplied, route through the
1391        // `_filtered` variant so the outbox-inline SQL filter applies.
1392        // Resume-from-cursor is still unwired (Stage A surface tails
1393        // from tail).
1394        let inner = if filter.is_noop() {
1395            ff_core::completion_backend::CompletionBackend::subscribe_completions(self).await?
1396        } else {
1397            ff_core::completion_backend::CompletionBackend::subscribe_completions_filtered(
1398                self, filter,
1399            )
1400            .await?
1401        };
1402
1403        struct Adapter {
1404            inner: ff_core::completion_backend::CompletionStream,
1405        }
1406
1407        impl Stream for Adapter {
1408            type Item = Result<CompletionEvent, EngineError>;
1409            fn poll_next(
1410                mut self: Pin<&mut Self>,
1411                cx: &mut Context<'_>,
1412            ) -> Poll<Option<Self::Item>> {
1413                match Pin::new(&mut self.inner).poll_next(cx) {
1414                    Poll::Pending => Poll::Pending,
1415                    Poll::Ready(None) => Poll::Ready(None),
1416                    Poll::Ready(Some(payload)) => {
1417                        // Placeholder cursor (0-event_id) because
1418                        // `CompletionPayload` does not surface
1419                        // `event_id` today. Family prefix stays stable
1420                        // so persistence is forward-compatible.
1421                        let cursor = encode_postgres_event_cursor(0);
1422                        let event = CompletionEvent::new(
1423                            cursor,
1424                            payload.execution_id.clone(),
1425                            CompletionOutcome::from_wire(&payload.outcome),
1426                            payload.produced_at_ms,
1427                        );
1428                        Poll::Ready(Some(Ok(event)))
1429                    }
1430                }
1431            }
1432        }
1433
1434        Ok(Box::pin(Adapter { inner }))
1435    }
1436
1437    // ── RFC-019 Stage B — `subscribe_lease_history` ──────────────
1438    //
1439    // Real Postgres impl. Tails the `ff_lease_event` outbox (written
1440    // by producer sites in `attempt.rs`, `flow.rs`, `suspend_ops.rs`,
1441    // and the `attempt_timeout` / `lease_expiry` reconcilers) via
1442    // `LISTEN ff_lease_event` + catch-up SELECT. Cursor encoding
1443    // matches `subscribe_completion`: `0x02 ++ event_id(BE8)`.
1444    //
1445    // Partition scope: hardcoded to partition 0 — mirrors the Valkey
1446    // Stage A impl, which tails partition 0's aggregate stream key.
1447    // Cross-partition consumers instantiate one backend per
1448    // partition + merge streams consumer-side (RFC-019 §Backend
1449    // Semantics).
1450    #[tracing::instrument(name = "pg.subscribe_lease_history", skip_all)]
1451    async fn subscribe_lease_history(
1452        &self,
1453        cursor: ff_core::stream_subscribe::StreamCursor,
1454        filter: &ff_core::backend::ScannerFilter,
1455    ) -> Result<ff_core::stream_events::LeaseHistorySubscription, EngineError> {
1456        lease_event_subscribe::subscribe(&self.pool, 0, cursor, filter.clone()).await
1457    }
1458
1459    // ── RFC-019 Stage B — `subscribe_signal_delivery` (#310) ─────
1460    //
1461    // Tails the `ff_signal_event` outbox (written by the producer
1462    // INSERT in `suspend_ops::deliver_signal_impl`) via
1463    // `LISTEN ff_signal_event` + catch-up SELECT. Cursor encoding
1464    // matches `subscribe_lease_history`: `0x02 ++ event_id(BE8)`.
1465    //
1466    // Partition scope: hardcoded to partition 0 — mirrors the Valkey
1467    // Stage B impl which tails partition 0's aggregate stream key.
1468    #[tracing::instrument(name = "pg.subscribe_signal_delivery", skip_all)]
1469    async fn subscribe_signal_delivery(
1470        &self,
1471        cursor: ff_core::stream_subscribe::StreamCursor,
1472        filter: &ff_core::backend::ScannerFilter,
1473    ) -> Result<ff_core::stream_events::SignalDeliverySubscription, EngineError> {
1474        signal_delivery_subscribe::subscribe(&self.pool, 0, cursor, filter.clone()).await
1475    }
1476
1477    // ── PR-7b Cluster 1 — Foundation scanner operations ─────────
1478    //
1479    // Per-execution wrappers around the reconcilers in
1480    // `crate::reconcilers::*`. Each reconciler exposes a
1481    // `*_for_execution` / `*_for_*` helper mirroring the batch
1482    // `scan_tick` path's per-row tx logic so the engine-side scanner
1483    // trait-dispatch and the batch reconciler share one SQL code
1484    // path.
1485    //
1486    // All 5 scanner-op methods run real SQL on Postgres; both phases
1487    // of `expire_execution` (`AttemptTimeout` + `ExecutionDeadline`)
1488    // are covered. The Wave-9-minimal delivery (this commit) adds
1489    // `delayed_promoter`, `pending_wp_expiry`, and
1490    // `execution_deadline` reconcilers on top of the Wave 6c
1491    // reconcilers shipped with PR-7b/1.
1492    //
1493    // Gated on `core` because `reconcilers` (and its `dispatch` dep)
1494    // require `core`. Without `core` the trait defaults return
1495    // `EngineError::Unavailable`, preserving behavioural parity with
1496    // other feature-stripped callsites.
1497
1498    #[cfg(feature = "core")]
1499    async fn mark_lease_expired_if_due(
1500        &self,
1501        partition: Partition,
1502        execution_id: &ExecutionId,
1503    ) -> Result<(), EngineError> {
1504        let (_pk_from_eid, exec_uuid) = attempt::split_exec_id(execution_id)?;
1505        let partition_key = partition_index_to_i16(partition)?;
1506        reconcilers::lease_expiry::release_for_execution(&self.pool, partition_key, exec_uuid)
1507            .await
1508    }
1509
1510    #[cfg(feature = "core")]
1511    async fn promote_delayed(
1512        &self,
1513        partition: Partition,
1514        _lane: &LaneId,
1515        execution_id: &ExecutionId,
1516        now_ms: TimestampMs,
1517    ) -> Result<(), EngineError> {
1518        let (_pk_from_eid, exec_uuid) = attempt::split_exec_id(execution_id)?;
1519        let partition_key = partition_index_to_i16(partition)?;
1520        // `lane` is ignored: lane is authoritative on `ff_exec_core`
1521        // already (it was used only to locate the Valkey ZSET). The
1522        // candidate selection in `promote_for_execution` re-checks
1523        // the (lifecycle_phase, eligibility_state, deadline_at_ms)
1524        // tuple that `attempt::delay()` writes.
1525        reconcilers::delayed_promoter::promote_for_execution(
1526            &self.pool,
1527            partition_key,
1528            exec_uuid,
1529            now_ms.0,
1530        )
1531        .await
1532    }
1533
1534    #[cfg(feature = "core")]
1535    async fn close_waitpoint(
1536        &self,
1537        partition: Partition,
1538        _execution_id: &ExecutionId,
1539        waitpoint_id: &str,
1540        now_ms: TimestampMs,
1541    ) -> Result<(), EngineError> {
1542        // The scanner resolves (waitpoint_id → owning execution_id)
1543        // separately for filter application; the close action itself
1544        // only needs the waitpoint row (which carries `execution_id`
1545        // + `waitpoint_key`). Partition is authoritative from the
1546        // caller.
1547        let partition_key = partition_index_to_i16(partition)?;
1548        let waitpoint_uuid = uuid::Uuid::parse_str(waitpoint_id).map_err(|e| {
1549            EngineError::Validation {
1550                kind: ff_core::engine_error::ValidationKind::InvalidInput,
1551                detail: format!("waitpoint_id not a UUID: {e}"),
1552            }
1553        })?;
1554        reconcilers::pending_wp_expiry::close_for_execution(
1555            &self.pool,
1556            partition_key,
1557            waitpoint_uuid,
1558            now_ms.0,
1559        )
1560        .await
1561    }
1562
1563    #[cfg(feature = "core")]
1564    async fn expire_execution(
1565        &self,
1566        partition: Partition,
1567        execution_id: &ExecutionId,
1568        phase: ExpirePhase,
1569        now_ms: TimestampMs,
1570    ) -> Result<(), EngineError> {
1571        let (_pk_from_eid, exec_uuid) = attempt::split_exec_id(execution_id)?;
1572        let partition_key = partition_index_to_i16(partition)?;
1573        match phase {
1574            ExpirePhase::AttemptTimeout => {
1575                reconcilers::attempt_timeout::expire_for_execution(
1576                    &self.pool,
1577                    partition_key,
1578                    exec_uuid,
1579                )
1580                .await
1581            }
1582            ExpirePhase::ExecutionDeadline => {
1583                reconcilers::execution_deadline::expire_for_execution(
1584                    &self.pool,
1585                    partition_key,
1586                    exec_uuid,
1587                    now_ms.0,
1588                )
1589                .await
1590            }
1591        }
1592    }
1593
1594    #[cfg(feature = "core")]
1595    async fn expire_suspension(
1596        &self,
1597        partition: Partition,
1598        execution_id: &ExecutionId,
1599        _now_ms: TimestampMs,
1600    ) -> Result<(), EngineError> {
1601        let (_pk_from_eid, exec_uuid) = attempt::split_exec_id(execution_id)?;
1602        let partition_key = partition_index_to_i16(partition)?;
1603        reconcilers::suspension_timeout::expire_for_execution(
1604            &self.pool,
1605            partition_key,
1606            exec_uuid,
1607        )
1608        .await
1609    }
1610
1611    // PR-7b Cluster 2b-B: flow summary projection on Postgres.
1612    // Aggregates member public_state from ff_exec_core and UPSERTs the
1613    // derived summary into ff_flow_summary (migration 0019). One SQL
1614    // round-trip for the aggregation + one for the upsert.
1615    #[cfg(feature = "core")]
1616    async fn project_flow_summary(
1617        &self,
1618        partition: Partition,
1619        flow_id: &FlowId,
1620        now_ms: TimestampMs,
1621    ) -> Result<bool, EngineError> {
1622        let partition_key = partition_index_to_i16(partition)?;
1623        let flow_uuid: sqlx::types::Uuid = flow_id.0;
1624        flow::project_flow_summary_impl(
1625            &self.pool,
1626            partition_key,
1627            flow_uuid,
1628            now_ms.0,
1629        )
1630        .await
1631    }
1632
1633    // PR-7b Cluster 2b-B: retention trim on Postgres.
1634    // SELECTs a batch of terminal executions past the cutoff, then
1635    // per-execution DELETEs across every sibling table (no FK CASCADE
1636    // in the schema, so this is explicit). One transaction per batch.
1637    #[cfg(feature = "core")]
1638    async fn trim_retention(
1639        &self,
1640        partition: Partition,
1641        lane_id: &LaneId,
1642        retention_ms: u64,
1643        now_ms: TimestampMs,
1644        batch_size: u32,
1645        filter: &ff_core::backend::ScannerFilter,
1646    ) -> Result<u32, EngineError> {
1647        let partition_key = partition_index_to_i16(partition)?;
1648        exec_core::trim_retention_impl(
1649            &self.pool,
1650            partition_key,
1651            lane_id.as_str(),
1652            retention_ms,
1653            now_ms.0,
1654            batch_size,
1655            filter,
1656        )
1657        .await
1658    }
1659
1660    // ── PR-7b / #453: typed-FCALL bodies ──
1661
1662    #[cfg(feature = "core")]
1663    async fn renew_lease(
1664        &self,
1665        args: ff_core::contracts::RenewLeaseArgs,
1666    ) -> Result<ff_core::contracts::RenewLeaseResult, EngineError> {
1667        crate::typed_ops::renew_lease(self.pool(), args).await
1668    }
1669
1670    #[cfg(feature = "core")]
1671    async fn complete_execution(
1672        &self,
1673        args: ff_core::contracts::CompleteExecutionArgs,
1674    ) -> Result<ff_core::contracts::CompleteExecutionResult, EngineError> {
1675        crate::typed_ops::complete_execution(self.pool(), args).await
1676    }
1677
1678    #[cfg(feature = "core")]
1679    async fn fail_execution(
1680        &self,
1681        args: ff_core::contracts::FailExecutionArgs,
1682    ) -> Result<ff_core::contracts::FailExecutionResult, EngineError> {
1683        crate::typed_ops::fail_execution(self.pool(), args).await
1684    }
1685
1686    #[cfg(feature = "core")]
1687    async fn resume_execution(
1688        &self,
1689        args: ff_core::contracts::ResumeExecutionArgs,
1690    ) -> Result<ff_core::contracts::ResumeExecutionResult, EngineError> {
1691        crate::typed_ops::resume_execution(self.pool(), args).await
1692    }
1693
1694    #[cfg(feature = "core")]
1695    async fn check_admission(
1696        &self,
1697        quota_policy_id: &ff_core::types::QuotaPolicyId,
1698        _dimension: &str,
1699        args: ff_core::contracts::CheckAdmissionArgs,
1700    ) -> Result<ff_core::contracts::CheckAdmissionResult, EngineError> {
1701        crate::typed_ops::check_admission(
1702            self.pool(),
1703            &self.partition_config,
1704            quota_policy_id,
1705            args,
1706        )
1707        .await
1708    }
1709
1710    #[cfg(feature = "core")]
1711    async fn evaluate_flow_eligibility(
1712        &self,
1713        args: ff_core::contracts::EvaluateFlowEligibilityArgs,
1714    ) -> Result<ff_core::contracts::EvaluateFlowEligibilityResult, EngineError> {
1715        crate::typed_ops::evaluate_flow_eligibility(self.pool(), args).await
1716    }
1717
1718    #[cfg(feature = "core")]
1719    async fn claim_execution(
1720        &self,
1721        args: ff_core::contracts::ClaimExecutionArgs,
1722    ) -> Result<ff_core::contracts::ClaimExecutionResult, EngineError> {
1723        crate::typed_ops::claim_execution(self.pool(), &self.partition_config, args).await
1724    }
1725
1726    // Cairn #454 Phase 4c — backend-atomic `issue_claim_grant` +
1727    // `claim_execution` composition. sqlx tx ensures a mid-op crash
1728    // cannot leak a dangling grant (auto-rollback on drop).
1729    #[cfg(feature = "core")]
1730    #[tracing::instrument(name = "pg.issue_grant_and_claim", skip_all)]
1731    async fn issue_grant_and_claim(
1732        &self,
1733        args: ff_core::contracts::IssueGrantAndClaimArgs,
1734    ) -> Result<ff_core::contracts::ClaimGrantOutcome, EngineError> {
1735        crate::typed_ops::issue_grant_and_claim(self.pool(), &self.partition_config, args).await
1736    }
1737
1738    // ── PR-7b Wave 0a: exec_core field read ──
1739
1740    async fn read_exec_core_fields(
1741        &self,
1742        partition: ff_core::partition::Partition,
1743        execution_id: &ff_core::types::ExecutionId,
1744        fields: &[&str],
1745    ) -> Result<std::collections::HashMap<String, Option<String>>, EngineError> {
1746        if fields.is_empty() {
1747            return Ok(std::collections::HashMap::new());
1748        }
1749        // Cross-check: `partition` and `execution_id.partition()` must
1750        // agree. A mismatch would silently read the wrong row (or miss)
1751        // on Valkey via the `{p:N}` key tag, so surface it explicitly
1752        // as a validation error here.
1753        let derived: u16 = execution_id.partition();
1754        if partition.index != derived {
1755            return Err(EngineError::Validation {
1756                kind: ff_core::engine_error::ValidationKind::InvalidInput,
1757                detail: format!(
1758                    "read_exec_core_fields: partition mismatch (arg={}, eid={})",
1759                    partition.index, derived
1760                ),
1761            });
1762        }
1763        let partition_key: i16 = partition.index as i16;
1764        let exec_uuid = crate::exec_core::eid_uuid(execution_id);
1765
1766        // Build a single SELECT that projects each requested field to
1767        // a text value. Fields are classified:
1768        //  - canonical columns (lane_id, lifecycle_phase,
1769        //    ownership_state, eligibility_state, public_state,
1770        //    attempt_state, blocking_reason, cancellation_reason,
1771        //    cancelled_by, attempt_index, flow_id, priority,
1772        //    created_at_ms, terminal_at_ms, deadline_at_ms): CAST the
1773        //    column to text. Scanner-facing aliases (current_attempt_index
1774        //    → attempt_index, completed_at → terminal_at_ms,
1775        //    cancel_reason → cancellation_reason) project the canonical
1776        //    column.
1777        //  - `required_capabilities` is `text[]` on PG; projected as CSV
1778        //    via `array_to_string(..., ',')` to match Valkey's HMGET
1779        //    string shape.
1780        //  - `raw_fields` JSONB-resident names (current_waitpoint_id,
1781        //    current_worker_instance_id, budget_ids, quota_policy_id)
1782        //    project via `raw_fields ->> '<field>'`.
1783        //  - Unknown names project NULL (absent-field parity with
1784        //    Valkey HMGET).
1785        let mut projections: Vec<String> = Vec::with_capacity(fields.len());
1786        for field in fields {
1787            let expr = match *field {
1788                // Canonical exec_core columns.
1789                "lane_id" | "lifecycle_phase" | "ownership_state" | "eligibility_state"
1790                | "public_state" | "attempt_state" | "blocking_reason" | "cancellation_reason"
1791                | "cancelled_by" => format!("{f}::text", f = field),
1792                "attempt_index" => "attempt_index::text".to_string(),
1793                "flow_id" => "flow_id::text".to_string(),
1794                "priority" => "priority::text".to_string(),
1795                "created_at_ms" => "created_at_ms::text".to_string(),
1796                "terminal_at_ms" => "terminal_at_ms::text".to_string(),
1797                "deadline_at_ms" => "deadline_at_ms::text".to_string(),
1798                // Scanner-facing aliases that scan the same data from
1799                // different angles.
1800                "current_attempt_index" => "attempt_index::text".to_string(),
1801                "completed_at" => "terminal_at_ms::text".to_string(),
1802                "cancel_reason" => "cancellation_reason::text".to_string(),
1803                // required_capabilities is `text[]` on PG; scanner
1804                // callers expect a CSV string on Valkey. Convert.
1805                "required_capabilities" => {
1806                    "array_to_string(required_capabilities, ',')".to_string()
1807                }
1808                // Everything else lives under raw_fields JSONB.
1809                other => {
1810                    // Strict allowlist of raw_fields names the scanner
1811                    // code reads. Any other name returns NULL.
1812                    match other {
1813                        "current_waitpoint_id"
1814                        | "current_worker_instance_id"
1815                        | "budget_ids"
1816                        | "quota_policy_id" => format!("raw_fields ->> '{other}'"),
1817                        _ => "NULL".to_string(),
1818                    }
1819                }
1820            };
1821            projections.push(expr);
1822        }
1823        let projection_sql = projections.join(", ");
1824        let query = format!(
1825            "SELECT {projection_sql} FROM ff_exec_core \
1826             WHERE partition_key = $1 AND execution_id = $2"
1827        );
1828        let row_opt = sqlx::query(&query)
1829            .bind(partition_key)
1830            .bind(exec_uuid)
1831            .fetch_optional(self.pool())
1832            .await
1833            .map_err(|e| EngineError::Transport {
1834                backend: "postgres",
1835                source: format!("read_exec_core_fields: {e}").into(),
1836            })?;
1837
1838        let mut out = std::collections::HashMap::with_capacity(fields.len());
1839        if let Some(row) = row_opt {
1840            use sqlx::Row;
1841            for (idx, field) in fields.iter().enumerate() {
1842                let val: Option<String> =
1843                    row.try_get(idx).map_err(|e| EngineError::Transport {
1844                        backend: "postgres",
1845                        source: format!("read_exec_core_fields[{field}]: {e}").into(),
1846                    })?;
1847                out.insert((*field).to_string(), val);
1848            }
1849        } else {
1850            for field in fields {
1851                out.insert((*field).to_string(), None);
1852            }
1853        }
1854        Ok(out)
1855    }
1856
1857    // ── PR-7b Wave 0a: clock primitive ──
1858
1859    async fn server_time_ms(&self) -> Result<u64, EngineError> {
1860        // `clock_timestamp()` — `now()` returns the transaction start
1861        // timestamp and would be stale under any long-running tx.
1862        // Scanners use this to compute "due" windows, so the wall-
1863        // clock read must be fresh. Matches the `flow.rs` convention.
1864        let ms: i64 = sqlx::query_scalar(
1865            "SELECT (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint",
1866        )
1867            .fetch_one(self.pool())
1868            .await
1869            .map_err(|e| EngineError::Transport {
1870                backend: "postgres",
1871                source: format!("server_time_ms: {e}").into(),
1872            })?;
1873        if ms < 0 {
1874            return Err(EngineError::Transport {
1875                backend: "postgres",
1876                source: "server_time_ms: negative epoch".into(),
1877            });
1878        }
1879        Ok(ms as u64)
1880    }
1881
1882    // ── RFC-025 Phase 3 — worker registry ───────────────────────
1883    //
1884    // Bodies live in `crate::worker_registry`; the overrides here
1885    // just forward to those free functions so the trait
1886    // `#[cfg(feature = ..)]` gates match the module-level gate.
1887
1888    #[cfg(feature = "core")]
1889    #[tracing::instrument(name = "pg.register_worker", skip_all)]
1890    async fn register_worker(
1891        &self,
1892        args: ff_core::contracts::RegisterWorkerArgs,
1893    ) -> Result<ff_core::contracts::RegisterWorkerOutcome, EngineError> {
1894        worker_registry::register_worker(&self.pool, args).await
1895    }
1896
1897    #[cfg(feature = "core")]
1898    #[tracing::instrument(name = "pg.heartbeat_worker", skip_all)]
1899    async fn heartbeat_worker(
1900        &self,
1901        args: ff_core::contracts::HeartbeatWorkerArgs,
1902    ) -> Result<ff_core::contracts::HeartbeatWorkerOutcome, EngineError> {
1903        worker_registry::heartbeat_worker(&self.pool, args).await
1904    }
1905
1906    #[cfg(feature = "core")]
1907    #[tracing::instrument(name = "pg.mark_worker_dead", skip_all)]
1908    async fn mark_worker_dead(
1909        &self,
1910        args: ff_core::contracts::MarkWorkerDeadArgs,
1911    ) -> Result<ff_core::contracts::MarkWorkerDeadOutcome, EngineError> {
1912        worker_registry::mark_worker_dead(&self.pool, args).await
1913    }
1914
1915    // List-expired-leases joins ff_attempt+ff_exec_core, which live in
1916    // the `worker_registry` module (gated on `core`). Require both
1917    // features to keep the body's dep chain intact; a standalone
1918    // suspension-only build has no lease-bearing tables to scan.
1919    #[cfg(all(feature = "core", feature = "suspension"))]
1920    #[tracing::instrument(name = "pg.list_expired_leases", skip_all)]
1921    async fn list_expired_leases(
1922        &self,
1923        args: ff_core::contracts::ListExpiredLeasesArgs,
1924    ) -> Result<ff_core::contracts::ListExpiredLeasesResult, EngineError> {
1925        worker_registry::list_expired_leases(&self.pool, args).await
1926    }
1927
1928    #[cfg(feature = "core")]
1929    #[tracing::instrument(name = "pg.list_workers", skip_all)]
1930    async fn list_workers(
1931        &self,
1932        args: ff_core::contracts::ListWorkersArgs,
1933    ) -> Result<ff_core::contracts::ListWorkersResult, EngineError> {
1934        worker_registry::list_workers(&self.pool, args).await
1935    }
1936}
1937
1938/// Resolve a `CompletionPayload` to the matching
1939/// `ff_completion_event.event_id` for PR-7b cascade dispatch.
1940///
1941/// Mirrors the cursor-walk previously inlined in
1942/// `ff-engine::completion_listener::run_completion_listener_postgres` —
1943/// keyed on `(partition_key, execution_id_uuid, occurred_at_ms)`. The
1944/// `partition_key` scoping is what makes this hit the
1945/// `ff_completion_event_lookup_idx` composite instead of devolving into
1946/// a cross-partition seq-scan; it's recoverable from the textual
1947/// `ExecutionId` (`"<partition>:<uuid>"`).
1948///
1949/// Returns `None` if the outbox row isn't visible yet (race with the
1950/// producing tx), the payload's execution id can't be parsed, or the
1951/// partition prefix isn't an `i16` — all recoverable via the
1952/// dependency_reconciler backstop. Transient sqlx errors are logged at
1953/// `warn` with the query inputs and also fall back to `None`; the
1954/// listener retries on the next payload and the reconciler covers the
1955/// worst case.
1956async fn resolve_event_id(
1957    pool: &PgPool,
1958    payload: &ff_core::backend::CompletionPayload,
1959) -> Option<i64> {
1960    let eid_str = payload.execution_id.as_str();
1961    // ExecutionId is `{fp:N}:<uuid>`; split on the rightmost `:` so the
1962    // `{fp:N}` hash-tag prefix stays intact.
1963    let uuid_str = eid_str.rsplit_once(':').map(|(_, u)| u)?;
1964    let uuid = uuid::Uuid::parse_str(uuid_str).ok()?;
1965    // The payload's ExecutionId is already validated (construction
1966    // enforces the `{fp:N}:<uuid>` shape), so `.partition()` is
1967    // infallible. Narrow to `i16` for the partition_key column.
1968    let partition_key = i16::try_from(payload.execution_id.partition()).ok()?;
1969    let occurred_at_ms = payload.produced_at_ms.0;
1970
1971    match sqlx::query_scalar::<_, i64>(
1972        "SELECT event_id FROM ff_completion_event \
1973         WHERE partition_key = $1 AND execution_id = $2 AND occurred_at_ms = $3 \
1974         ORDER BY event_id ASC LIMIT 1",
1975    )
1976    .bind(partition_key)
1977    .bind(uuid)
1978    .bind(occurred_at_ms)
1979    .fetch_optional(pool)
1980    .await
1981    {
1982        Ok(row) => row,
1983        Err(err) => {
1984            tracing::warn!(
1985                partition_key,
1986                execution_id = %uuid,
1987                occurred_at_ms,
1988                error = %err,
1989                "resolve_event_id: ff_completion_event lookup failed; falling back to \
1990                 dependency_reconciler backstop"
1991            );
1992            None
1993        }
1994    }
1995}
1996
1997/// Narrow a `Partition`'s `u16` index into the `i16` the Postgres
1998/// schema uses as its partition-key column. FlowFabric partitions
1999/// max out well below `i16::MAX` in practice (production deployments
2000/// run a few hundred partitions per family), but the conversion is
2001/// still fallible at the type level. Surface overflow as
2002/// `ValidationKind::InvalidInput` rather than silently substituting a
2003/// fallback — a silently mis-routed reconciler would
2004/// corrupt-by-omission without diagnostics.
2005fn partition_index_to_i16(partition: Partition) -> Result<i16, EngineError> {
2006    i16::try_from(partition.index).map_err(|_| EngineError::Validation {
2007        kind: ff_core::engine_error::ValidationKind::InvalidInput,
2008        detail: format!(
2009            "partition index {} exceeds i16 range (max {})",
2010            partition.index,
2011            i16::MAX
2012        ),
2013    })
2014}
2015
2016/// Minimum recommended `max_locks_per_transaction`. Partition-heavy
2017/// schemas (256 hash partitions per logical table) can exceed the
2018/// Postgres default of `64` per tx under modest concurrent bench
2019/// load — the Wave 7c bench hit `out of shared memory` at 16 workers
2020/// × 10k tasks with the default and unblocked at `512`. We warn at
2021/// boot rather than hard-fail because operators may legitimately
2022/// run with a tuned value that still exceeds 64 but sits below our
2023/// threshold.
2024const MIN_MAX_LOCKS_PER_TRANSACTION: i64 = 256;
2025
2026/// Probe `max_locks_per_transaction` at connect time + log a warning
2027/// when the current value is below the production-safe threshold.
2028/// Never fails the connect — probe errors are logged at debug and
2029/// swallowed (pg_show may be restricted on exotic deploys).
2030async fn warn_if_max_locks_low(pool: &PgPool) {
2031    let row: Result<(String,), sqlx::Error> =
2032        sqlx::query_as("SHOW max_locks_per_transaction")
2033            .fetch_one(pool)
2034            .await;
2035    match row {
2036        Ok((raw,)) => emit_max_locks_decision(&raw),
2037        Err(e) => {
2038            tracing::debug!("failed to probe max_locks_per_transaction: {e}");
2039        }
2040    }
2041}
2042
2043/// Pure decision surface for the max-locks probe — extracted for
2044/// unit-testability (the live probe is gated by a running Postgres).
2045/// Returns the integer value when a warning SHOULD fire, `None`
2046/// otherwise (either the raw is valid + at/above threshold, or the
2047/// raw is unparseable — the latter is debug-only).
2048fn max_locks_warn_value(raw: &str) -> Option<i64> {
2049    match raw.parse::<i64>() {
2050        Ok(v) if v < MIN_MAX_LOCKS_PER_TRANSACTION => Some(v),
2051        Ok(_) => None,
2052        Err(e) => {
2053            tracing::debug!(raw, "failed to parse max_locks_per_transaction: {e}");
2054            None
2055        }
2056    }
2057}
2058
2059fn emit_max_locks_decision(raw: &str) {
2060    if let Some(v) = max_locks_warn_value(raw) {
2061        tracing::warn!(
2062            current = v,
2063            recommended = MIN_MAX_LOCKS_PER_TRANSACTION,
2064            "postgres max_locks_per_transaction={v} is below the recommended \
2065             minimum ({MIN_MAX_LOCKS_PER_TRANSACTION}); partition-heavy workloads \
2066             may hit 'out of shared memory' under concurrent load. \
2067             See docs/operator-guide-postgres.md."
2068        );
2069    }
2070}
2071
2072#[cfg(test)]
2073mod max_locks_tests {
2074    use super::{max_locks_warn_value, MIN_MAX_LOCKS_PER_TRANSACTION};
2075
2076    #[test]
2077    fn warns_when_below_threshold() {
2078        assert_eq!(max_locks_warn_value("64"), Some(64));
2079        assert_eq!(
2080            max_locks_warn_value(&(MIN_MAX_LOCKS_PER_TRANSACTION - 1).to_string()),
2081            Some(MIN_MAX_LOCKS_PER_TRANSACTION - 1)
2082        );
2083    }
2084
2085    #[test]
2086    fn silent_at_or_above_threshold() {
2087        assert_eq!(
2088            max_locks_warn_value(&MIN_MAX_LOCKS_PER_TRANSACTION.to_string()),
2089            None
2090        );
2091        assert_eq!(max_locks_warn_value("1024"), None);
2092    }
2093
2094    #[test]
2095    fn silent_for_unparseable_raw() {
2096        assert_eq!(max_locks_warn_value("not-a-number"), None);
2097    }
2098}
2099
2100#[cfg(test)]
2101mod partition_index_tests {
2102    use super::partition_index_to_i16;
2103    use ff_core::engine_error::{EngineError, ValidationKind};
2104    use ff_core::partition::{Partition, PartitionFamily};
2105
2106    #[test]
2107    fn accepts_values_within_i16_range() {
2108        let p = Partition { family: PartitionFamily::Flow, index: 0 };
2109        assert_eq!(partition_index_to_i16(p).unwrap(), 0);
2110
2111        let p = Partition { family: PartitionFamily::Flow, index: 255 };
2112        assert_eq!(partition_index_to_i16(p).unwrap(), 255);
2113
2114        let p = Partition { family: PartitionFamily::Budget, index: i16::MAX as u16 };
2115        assert_eq!(partition_index_to_i16(p).unwrap(), i16::MAX);
2116    }
2117
2118    #[test]
2119    fn rejects_overflow_above_i16_max() {
2120        let p = Partition { family: PartitionFamily::Flow, index: (i16::MAX as u16) + 1 };
2121        let err = partition_index_to_i16(p).unwrap_err();
2122        match err {
2123            EngineError::Validation { kind, detail } => {
2124                assert_eq!(kind, ValidationKind::InvalidInput);
2125                assert!(detail.contains("exceeds i16 range"), "unexpected detail: {detail}");
2126            }
2127            other => panic!("expected Validation error, got {other:?}"),
2128        }
2129    }
2130
2131    #[test]
2132    fn rejects_u16_max() {
2133        let p = Partition { family: PartitionFamily::Quota, index: u16::MAX };
2134        assert!(matches!(
2135            partition_index_to_i16(p),
2136            Err(EngineError::Validation { kind: ValidationKind::InvalidInput, .. })
2137        ));
2138    }
2139}