Skip to main content

ff_core/
engine_backend.rs

1//! The `EngineBackend` trait — abstracting FlowFabric's write surface.
2//!
3//! **RFC-012 Stage 1a:** this is the trait landing. The
4//! Valkey-backed impl lives in `ff-backend-valkey`; future backends
5//! (Postgres) add a sibling crate with their own impl. ff-sdk's
6//! `FlowFabricWorker` gains `connect_with(backend)` /
7//! `backend(&self)` accessors so consumers that want to bring their
8//! own backend (tests, future non-Valkey deployments) can hand one
9//! in. The hot-path migration of `ClaimedTask` / `FlowFabricWorker`
10//! to forward through the trait lands across Stages 1b-1d.
11//!
12//! # Object safety
13//!
14//! `EngineBackend` is object-safe: all methods are `async fn` behind
15//! `#[async_trait]` and take `&self`. Consumers can hold
16//! `Arc<dyn EngineBackend>` for heterogenous-backend deployments.
17//! The trait is `Send + Sync + 'static` per RFC-012 §4.1; every impl
18//! must honour that bound.
19//!
20//! # Error surface
21//!
22//! Every method returns [`Result<_, EngineError>`]. `EngineError`'s
23//! `Transport` variant carries a boxed `dyn Error + Send + Sync`;
24//! Valkey-backed transport faults box a
25//! `ff_script::error::ScriptError` (downcast via
26//! `ff_script::engine_error_ext::transport_script_ref`). Other
27//! backends box their native error type and set the `backend` tag
28//! accordingly.
29//!
30//! # Atomicity contract
31//!
32//! Per-op state transitions MUST be atomic (RFC-012 §3.4). On Valkey
33//! this is the single-FCALL-per-op property; on Postgres it is the
34//! per-transaction property. A backend that cannot honour atomicity
35//! for a given op either MUST NOT implement `EngineBackend` or MUST
36//! return `EngineError::Unavailable { op }` for the affected method.
37//!
38//! # Replay semantics
39//!
40//! `complete`, `fail`, `cancel`, `suspend`, `delay`, `wait_children`
41//! are idempotent under replay — calling twice with the same handle
42//! and args returns the same outcome (success on first call, typed
43//! `State` / `Contention` on subsequent calls where the fence triple
44//! no longer matches a live lease).
45
46use std::time::Duration;
47
48use async_trait::async_trait;
49
50use crate::backend::{
51    AppendFrameOutcome, CancelFlowPolicy, CancelFlowWait, CapabilitySet, ClaimPolicy,
52    FailOutcome, FailureClass, FailureReason, Frame, Handle, LeaseRenewal, PendingWaitpoint,
53    PrepareOutcome, ResumeSignal, ResumeToken,
54};
55// `SummaryDocument` and `TailVisibility` are referenced only inside
56// `#[cfg(feature = "streaming")]` trait methods below, so the imports
57// must be gated to avoid an unused-imports warning on the non-streaming
58// build.
59#[cfg(feature = "streaming")]
60use crate::backend::{SummaryDocument, TailVisibility};
61use crate::contracts::{
62    CancelFlowResult, ExecutionContext, ExecutionSnapshot, FlowSnapshot, IssueReclaimGrantArgs,
63    IssueReclaimGrantOutcome, ReclaimExecutionArgs, ReclaimExecutionOutcome, ReportUsageResult,
64    RotateWaitpointHmacSecretAllArgs, RotateWaitpointHmacSecretAllResult, SeedOutcome,
65    SeedWaitpointHmacSecretArgs, SuspendArgs, SuspendOutcome,
66};
67#[cfg(feature = "core")]
68use crate::contracts::{
69    AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
70    ApplyDependencyToChildResult, BudgetStatus, CancelExecutionArgs, CancelExecutionResult,
71    CancelFlowArgs, ChangePriorityArgs, ChangePriorityResult, ClaimExecutionArgs,
72    ClaimExecutionResult, ClaimForWorkerArgs, ClaimForWorkerOutcome, ClaimResumedExecutionArgs,
73    ClaimResumedExecutionResult,
74    BlockRouteArgs, BlockRouteOutcome, CreateBudgetArgs, CreateBudgetResult,
75    CreateExecutionArgs, CreateExecutionResult, CreateFlowArgs, CreateFlowResult,
76    CreateQuotaPolicyArgs, CreateQuotaPolicyResult,
77    DeliverSignalArgs, DeliverSignalResult, EdgeDirection, EdgeSnapshot, ExecutionInfo,
78    IssueClaimGrantArgs, IssueClaimGrantOutcome, ScanEligibleArgs,
79    ListExecutionsPage, ListFlowsPage, ListLanesPage, ListPendingWaitpointsArgs,
80    ListPendingWaitpointsResult, ListSuspendedPage, ReplayExecutionArgs, ReplayExecutionResult,
81    ReportUsageAdminArgs, ResetBudgetArgs, ResetBudgetResult, RevokeLeaseArgs, RevokeLeaseResult,
82    StageDependencyEdgeArgs, StageDependencyEdgeResult,
83};
84#[cfg(feature = "core")]
85use crate::state::PublicState;
86#[cfg(feature = "core")]
87use crate::partition::PartitionKey;
88#[cfg(feature = "streaming")]
89use crate::contracts::{StreamCursor, StreamFrames};
90use crate::engine_error::EngineError;
91#[cfg(feature = "core")]
92use crate::types::EdgeId;
93use crate::types::{AttemptIndex, BudgetId, ExecutionId, FlowId, LaneId, LeaseFence, TimestampMs};
94
95/// The engine write surface — a single trait a backend implementation
96/// honours to serve a `FlowFabricWorker`.
97///
98/// See RFC-012 §3.1 for the inventory rationale and §3.3 for the
99/// type-level shape. 16 methods (Round-7 added `create_waitpoint`;
100/// `append_frame` return widened; `report_usage` return replaced —
101/// RFC-012 §R7). Issue #150 added the two trigger-surface methods
102/// (`deliver_signal` / `claim_resumed_execution`).
103///
104/// # Note on `complete` payload shape
105///
106/// The RFC §3.3 sketch uses `Option<Bytes>`; the Stage 1a trait uses
107/// `Option<Vec<u8>>` to match the existing
108/// `ff_sdk::ClaimedTask::complete` signature and avoid adding a
109/// `bytes` public-type dep for zero consumer benefit. Round-4 §7.17
110/// resolved the payload container debate to `Box<[u8]>` in the
111/// public type (see `HandleOpaque`); `Option<Vec<u8>>` is the
112/// zero-churn choice consistent with today's code. Consumers that
113/// need `&[u8]` can borrow via `.as_deref()` on the Option.
114#[async_trait]
115pub trait EngineBackend: Send + Sync + 'static {
116    // ── Claim + lifecycle ──
117
118    /// Fresh-work claim. Returns `Ok(None)` when no work is currently
119    /// available; `Err` only on transport or input-validation faults.
120    async fn claim(
121        &self,
122        lane: &LaneId,
123        capabilities: &CapabilitySet,
124        policy: ClaimPolicy,
125    ) -> Result<Option<Handle>, EngineError>;
126
127    /// Renew a held lease. Returns the updated expiry + epoch on
128    /// success; typed `State::StaleLease` / `State::LeaseExpired`
129    /// when the lease has been stolen or timed out.
130    async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError>;
131
132    /// Numeric-progress heartbeat.
133    ///
134    /// Writes scalar `progress_percent` / `progress_message` fields on
135    /// `exec_core`; each call overwrites the previous value. This does
136    /// NOT append to the output stream — stream-frame producers must use
137    /// [`append_frame`](Self::append_frame) instead.
138    async fn progress(
139        &self,
140        handle: &Handle,
141        percent: Option<u8>,
142        message: Option<String>,
143    ) -> Result<(), EngineError>;
144
145    /// Append one stream frame. Distinct from [`progress`](Self::progress)
146    /// per RFC-012 §3.1.1 K#6. Returns the backend-assigned stream entry
147    /// id and post-append frame count (RFC-012 §R7.2.1).
148    ///
149    /// Stream-frame producers (arbitrary `frame_type` + payload, consumed
150    /// via the read/tail surfaces) MUST use this method rather than
151    /// [`progress`](Self::progress); the latter updates scalar fields on
152    /// `exec_core` and is invisible to stream consumers.
153    async fn append_frame(
154        &self,
155        handle: &Handle,
156        frame: Frame,
157    ) -> Result<AppendFrameOutcome, EngineError>;
158
159    /// Terminal success. Borrows `handle` (round-4 M-D2) so callers
160    /// can retry under `EngineError::Transport` without losing the
161    /// cookie. Payload is `Option<Vec<u8>>` per the note above.
162    async fn complete(&self, handle: &Handle, payload: Option<Vec<u8>>) -> Result<(), EngineError>;
163
164    /// Terminal failure with classification. Returns [`FailOutcome`]
165    /// so the caller learns whether a retry was scheduled.
166    async fn fail(
167        &self,
168        handle: &Handle,
169        reason: FailureReason,
170        classification: FailureClass,
171    ) -> Result<FailOutcome, EngineError>;
172
173    /// Cooperative cancel by the worker holding the lease.
174    async fn cancel(&self, handle: &Handle, reason: &str) -> Result<(), EngineError>;
175
176    /// Suspend the execution awaiting a typed resume condition
177    /// (RFC-013 Stage 1d).
178    ///
179    /// Borrows `handle` (round-4 M-D2). Terminal-looking behaviour is
180    /// expressed through [`SuspendOutcome`]:
181    ///
182    /// * [`SuspendOutcome::Suspended`] — the pre-suspend handle is
183    ///   logically invalidated; the fresh `HandleKind::Suspended`
184    ///   handle inside the variant supersedes it. Runtime enforcement
185    ///   via the fence triple: subsequent ops against the stale handle
186    ///   surface as `Contention(LeaseConflict)`.
187    /// * [`SuspendOutcome::AlreadySatisfied`] — buffered signals on a
188    ///   pending waitpoint already matched the resume condition at
189    ///   suspension time. The lease is NOT released; the caller's
190    ///   pre-suspend handle remains valid.
191    ///
192    /// See RFC-013 §2 for the type shapes, §3 for the replay /
193    /// idempotency contract, §4 for the error taxonomy.
194    async fn suspend(
195        &self,
196        handle: &Handle,
197        args: SuspendArgs,
198    ) -> Result<SuspendOutcome, EngineError>;
199
200    /// Suspend by execution id + lease fence triple, for service-layer
201    /// callers that hold a run record / lease-claim descriptor but no
202    /// worker [`Handle`] (cairn issue #322).
203    ///
204    /// Semantics mirror [`Self::suspend`] exactly — the same
205    /// [`SuspendArgs`] validation, the same [`SuspendOutcome`]
206    /// lifecycle, the same RFC-013 §3 dedup / replay contract. The
207    /// only difference is the fencing source: instead of the
208    /// `(lease_id, lease_epoch, attempt_id)` fields embedded in a
209    /// `Handle`, the backend fences against the triple passed directly.
210    /// Attempt-index, lane, and worker-instance metadata that
211    /// [`Self::suspend`] reads from the handle payload are recovered
212    /// from the backend's authoritative execution record (Valkey:
213    /// `exec_core` HGETs; Postgres: `ff_attempt` row lookup).
214    ///
215    /// The default impl returns [`EngineError::Unavailable`] so
216    /// existing backend impls remain non-breaking. Production backends
217    /// (Valkey, Postgres) override.
218    async fn suspend_by_triple(
219        &self,
220        exec_id: ExecutionId,
221        triple: LeaseFence,
222        args: SuspendArgs,
223    ) -> Result<SuspendOutcome, EngineError> {
224        let _ = (exec_id, triple, args);
225        Err(EngineError::Unavailable {
226            op: "suspend_by_triple",
227        })
228    }
229
230    /// Issue a pending waitpoint for future signal delivery.
231    ///
232    /// Waitpoints have two states in the Valkey wire contract:
233    /// **pending** (token issued, not yet backing a suspension) and
234    /// **active** (bound to a suspension). This method creates a
235    /// waitpoint in the **pending** state. A later `suspend` call
236    /// transitions a pending waitpoint to active (see Lua
237    /// `use_pending_waitpoint` ARGV flag at
238    /// `flowfabric.lua:3603,3641,3690`) — or, if buffered signals
239    /// already satisfy its condition, the suspend call returns
240    /// `SuspendOutcome::AlreadySatisfied` and the waitpoint activates
241    /// without ever releasing the lease.
242    ///
243    /// Pending-waitpoint expiry is a first-class terminal error on
244    /// the wire (`PendingWaitpointExpired` at
245    /// `ff-script/src/error.rs:170,403-408`). The attempt retains its
246    /// lease while the waitpoint is pending; signals delivered to
247    /// this waitpoint are buffered server-side (RFC-012 §R7.2.2).
248    async fn create_waitpoint(
249        &self,
250        handle: &Handle,
251        waitpoint_key: &str,
252        expires_in: Duration,
253    ) -> Result<PendingWaitpoint, EngineError>;
254
255    /// Non-mutating observation of signals that satisfied the handle's
256    /// resume condition.
257    async fn observe_signals(&self, handle: &Handle) -> Result<Vec<ResumeSignal>, EngineError>;
258
259    /// Consume a resume grant (via [`ResumeToken`]) to mint a
260    /// resumed-kind handle. Routes to `ff_claim_resumed_execution` on
261    /// Valkey / the epoch-bump reconciler on PG/SQLite. Returns
262    /// `Ok(None)` when the grant's target execution is no longer
263    /// resumable (already reclaimed, terminal, etc.).
264    ///
265    /// **Renamed from `claim_from_reclaim` (RFC-024 PR-B+C).** The
266    /// pre-rename name advertised "reclaim" but the semantic has
267    /// always been resume-after-suspend. The new lease-reclaim path
268    /// lives on [`Self::reclaim_execution`].
269    async fn claim_from_resume_grant(
270        &self,
271        token: ResumeToken,
272    ) -> Result<Option<Handle>, EngineError>;
273
274    /// Issue a lease-reclaim grant (RFC-024 §3.2). Admits executions
275    /// in `lease_expired_reclaimable` or `lease_revoked` state to the
276    /// reclaim path; the returned [`IssueReclaimGrantOutcome::Granted`]
277    /// carries a [`crate::contracts::ReclaimGrant`] which is then fed
278    /// to [`Self::reclaim_execution`] to mint a fresh attempt.
279    ///
280    /// Default impl returns [`EngineError::Unavailable`] — PR-D (PG),
281    /// PR-E (SQLite), and PR-F (Valkey) override with real bodies.
282    async fn issue_reclaim_grant(
283        &self,
284        _args: IssueReclaimGrantArgs,
285    ) -> Result<IssueReclaimGrantOutcome, EngineError> {
286        Err(EngineError::Unavailable {
287            op: "issue_reclaim_grant",
288        })
289    }
290
291    /// Consume a [`crate::contracts::ReclaimGrant`] to mint a fresh
292    /// attempt for a previously lease-expired / lease-revoked
293    /// execution (RFC-024 §3.2). Creates a new attempt row, bumps the
294    /// execution's `lease_reclaim_count`, and mints a
295    /// [`crate::backend::HandleKind::Reclaimed`] handle.
296    ///
297    /// Default impl returns [`EngineError::Unavailable`] — PR-D (PG),
298    /// PR-E (SQLite), and PR-F (Valkey) override with real bodies.
299    async fn reclaim_execution(
300        &self,
301        _args: ReclaimExecutionArgs,
302    ) -> Result<ReclaimExecutionOutcome, EngineError> {
303        Err(EngineError::Unavailable {
304            op: "reclaim_execution",
305        })
306    }
307
308    // Round-5 amendment: lease-releasing peers of `suspend`.
309
310    /// Park the execution until `delay_until`, releasing the lease.
311    async fn delay(&self, handle: &Handle, delay_until: TimestampMs) -> Result<(), EngineError>;
312
313    /// Mark the execution as waiting for its child flow to complete,
314    /// releasing the lease.
315    async fn wait_children(&self, handle: &Handle) -> Result<(), EngineError>;
316
317    // ── Read / admin ──
318
319    /// Snapshot an execution by id. `Ok(None)` ⇒ no such execution.
320    async fn describe_execution(
321        &self,
322        id: &ExecutionId,
323    ) -> Result<Option<ExecutionSnapshot>, EngineError>;
324
325    /// Point-read of the execution-scoped `(input_payload,
326    /// execution_kind, tags)` bundle used by the SDK worker when
327    /// assembling a `ClaimedTask` (see `ff_sdk::ClaimedTask`) after a
328    /// successful claim.
329    ///
330    /// No default impl — every `EngineBackend` must answer this
331    /// explicitly. Distinct from [`Self::describe_execution`]
332    /// (read-model projection) because the SDK needs the raw payload
333    /// bytes + kind + tags immediately post-claim, and the snapshot
334    /// projection deliberately omits the payload bytes.
335    ///
336    /// Per-backend shape:
337    ///
338    /// * **Valkey** — pipelined `GET :payload` + `HGETALL :core`
339    ///   + `HGETALL :tags` on the execution's partition (same pattern
340    ///   as [`Self::describe_execution`]).
341    /// * **Postgres** — single `SELECT payload, raw_fields` on
342    ///   `ff_exec_core` keyed by `(partition_key, execution_id)`;
343    ///   `execution_kind` + `tags` live in `raw_fields` JSONB.
344    /// * **SQLite** — identical shape to Postgres.
345    ///
346    /// Returns [`EngineError::Validation { kind: ValidationKind::InvalidInput, .. }`](crate::engine_error::EngineError::Validation)
347    /// when the execution does not exist — the SDK worker only calls
348    /// this after a successful claim, so a missing row is a loud
349    /// storage-tier invariant violation rather than a routine `Ok(None)`.
350    async fn read_execution_context(
351        &self,
352        execution_id: &ExecutionId,
353    ) -> Result<ExecutionContext, EngineError>;
354
355    /// Point-read of the execution's current attempt-index **pointer**
356    /// — the index of the currently-leased attempt row.
357    ///
358    /// Distinct from [`Self::read_total_attempt_count`]: this method
359    /// names the attempt that *already exists* (pointer), whereas
360    /// `read_total_attempt_count` is the monotonic claim counter used
361    /// to compute the next fresh attempt index. See the sibling's
362    /// rustdoc for the retry-path scenario that motivates the split.
363    ///
364    /// Used on the SDK worker's `claim_from_resume_grant` path —
365    /// specifically the private `claim_resumed_execution` helper —
366    /// immediately before dispatching [`Self::claim_resumed_execution`].
367    /// The returned index is fed into
368    /// [`ClaimResumedExecutionArgs::current_attempt_index`](crate::contracts::ClaimResumedExecutionArgs)
369    /// so the backend's script / transaction targets the correct
370    /// existing attempt row (KEYS[6] on Valkey; `ff_attempt` PK tuple
371    /// on PG/SQLite).
372    ///
373    /// Per-backend shape:
374    ///
375    /// * **Valkey** — `HGET {exec}:core current_attempt_index` on the
376    ///   execution's partition. Single command. Both the
377    ///   **missing-field** case (`exec_core` present but
378    ///   `current_attempt_index` absent or empty-string, i.e. pre-claim
379    ///   state) **and** the **missing-row** case (no `exec_core` hash
380    ///   at all) read back as `AttemptIndex(0)`. This preserves the
381    ///   pre-PR-3 inline-`HGET` semantic and is safe because Valkey's
382    ///   happy path requires `exec_core` to exist before this method
383    ///   is reached — the SDK only calls `read_current_attempt_index`
384    ///   post-grant, and grant issuance is gated on `exec_core`
385    ///   presence. A genuinely absent row would surface as the proper
386    ///   business-logic error (`NotAResumedExecution` /
387    ///   `ExecutionNotLeaseable`) on the downstream FCALL.
388    /// * **Postgres** — `SELECT attempt_index FROM ff_exec_core
389    ///   WHERE partition_key = $1 AND execution_id = $2`. The column
390    ///   is `NOT NULL DEFAULT 0` so a pre-claim row reads back as `0`
391    ///   (matching Valkey's missing-field case). **Missing row**
392    ///   surfaces as [`EngineError::Validation { kind:
393    ///   ValidationKind::InvalidInput, .. }`](crate::engine_error::EngineError::Validation)
394    ///   — diverges from Valkey's missing-row `→ 0` mapping.
395    /// * **SQLite** — `SELECT attempt_index FROM ff_exec_core
396    ///   WHERE partition_key = ? AND execution_id = ?`; identical
397    ///   semantics to Postgres (missing-row → `InvalidInput`).
398    ///
399    /// **Cross-backend asymmetry on missing row is intentional.** The
400    /// SDK happy path never observes it (grant issuance on Valkey
401    /// requires `exec_core`, and PG/SQLite currently return
402    /// `Unavailable` from `claim_from_grant` per
403    /// `project_claim_from_grant_pg_sqlite_gap.md`). Consumers writing
404    /// backend-agnostic tooling against this method directly must
405    /// treat the missing-row case as backend-dependent — match on
406    /// `InvalidInput` for PG/SQLite, and treat an unexpected `0` as
407    /// the Valkey equivalent signal.
408    ///
409    /// The default impl returns [`EngineError::Unavailable`] so the
410    /// trait addition is non-breaking for out-of-tree backends (same
411    /// precedent as [`Self::read_execution_context`] landing in v0.12
412    /// PR-1).
413    async fn read_current_attempt_index(
414        &self,
415        _execution_id: &ExecutionId,
416    ) -> Result<AttemptIndex, EngineError> {
417        Err(EngineError::Unavailable {
418            op: "read_current_attempt_index",
419        })
420    }
421
422    /// Point-read of the execution's **total attempt counter** — the
423    /// monotonic count of claims that have ever fired against this
424    /// execution (including the in-flight one once claimed).
425    ///
426    /// Used on the SDK worker's `claim_from_grant` / `claim_execution`
427    /// path — the next attempt-index for a fresh claim is this
428    /// counter's current value (so `1` on the second retry after the
429    /// first attempt failed terminally). This is semantically distinct
430    /// from [`Self::read_current_attempt_index`], which is a *pointer*
431    /// at the currently-leased attempt row and is only meaningful on
432    /// the `claim_from_resume_grant` path (where a live attempt already
433    /// exists and we want to re-seat its lease rather than mint a new
434    /// attempt row).
435    ///
436    /// Reading the pointer on the `claim_from_grant` path was a live
437    /// bug: on the retry-of-a-retry scenario the pointer still named
438    /// the *previous* terminal-failed attempt, so the newly-minted
439    /// attempt collided with it (Valkey KEYS[6]) or mis-targeted the
440    /// PG/SQLite `ff_attempt` PK tuple. This method fixes that by
441    /// reading the counter that Lua 5920 / PG `ff_claim_execution` /
442    /// SQLite `claim_impl` all already consult when computing the
443    /// next attempt index.
444    ///
445    /// Per-backend shape:
446    ///
447    /// * **Valkey** — `HGET {exec}:core total_attempt_count` on the
448    ///   execution's partition. Single command; pre-claim read (field
449    ///   absent or empty) maps to `0`.
450    /// * **Postgres** — `SELECT raw_fields->>'total_attempt_count'
451    ///   FROM ff_exec_core WHERE (partition_key, execution_id) = ...`.
452    ///   The field lives in the JSONB `raw_fields` bag rather than a
453    ///   dedicated column (mirrors how `create_execution_impl` seeds
454    ///   it on row creation). Missing row → `InvalidInput`; missing
455    ///   field → `0`.
456    /// * **SQLite** — `SELECT CAST(json_extract(raw_fields,
457    ///   '$.total_attempt_count') AS INTEGER) FROM ff_exec_core
458    ///   WHERE ...`. Same JSON-in-`raw_fields` shape as PG; uses the
459    ///   same `json_extract` idiom already employed in
460    ///   `ff-backend-sqlite/src/queries/operator.rs` for replay_count.
461    ///
462    /// The default impl returns [`EngineError::Unavailable`] so the
463    /// trait addition is non-breaking for out-of-tree backends (same
464    /// precedent as [`Self::read_current_attempt_index`] landing in
465    /// v0.12 PR-3).
466    async fn read_total_attempt_count(
467        &self,
468        _execution_id: &ExecutionId,
469    ) -> Result<AttemptIndex, EngineError> {
470        Err(EngineError::Unavailable {
471            op: "read_total_attempt_count",
472        })
473    }
474
475    /// Snapshot a flow by id. `Ok(None)` ⇒ no such flow.
476    async fn describe_flow(&self, id: &FlowId) -> Result<Option<FlowSnapshot>, EngineError>;
477
478    /// List dependency edges adjacent to an execution. Read-only; the
479    /// backend resolves the subject execution's flow, reads the
480    /// direction-specific adjacency SET, and decodes each member's
481    /// flow-scoped `edge:<edge_id>` hash.
482    ///
483    /// Returns an empty `Vec` when the subject has no edges on the
484    /// requested side — including standalone executions (no owning
485    /// flow). Ordering is unspecified: the underlying adjacency SET
486    /// is an unordered SMEMBERS read. Callers that need deterministic
487    /// order should sort by [`EdgeSnapshot::edge_id`] /
488    /// [`EdgeSnapshot::created_at`] themselves.
489    ///
490    /// Parse failures on the edge hash surface as
491    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]
492    /// — unknown fields, missing required fields, endpoint mismatches
493    /// against the adjacency SET all fail loud rather than silently
494    /// returning partial results.
495    ///
496    /// Gated on the `core` feature — edge reads are part of the
497    /// minimal engine surface a Postgres-style backend must honour.
498    ///
499    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
500    #[cfg(feature = "core")]
501    async fn list_edges(
502        &self,
503        _flow_id: &FlowId,
504        _direction: EdgeDirection,
505    ) -> Result<Vec<EdgeSnapshot>, EngineError> {
506        Err(EngineError::Unavailable { op: "list_edges" })
507    }
508
509    /// Snapshot a single dependency edge by its owning flow + edge id.
510    ///
511    /// `Ok(None)` when the edge hash is absent (never staged, or
512    /// staged under a different flow than `flow_id`). Parse failures
513    /// on a present edge hash surface as
514    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]
515    /// — the stored `flow_id` field is cross-checked against the
516    /// caller's expected `flow_id` so a wrong-key read fails loud
517    /// rather than returning an unrelated edge.
518    ///
519    /// Gated on the `core` feature — single-edge reads are part of
520    /// the minimal snapshot surface an alternate backend must honour
521    /// alongside [`Self::describe_execution`] / [`Self::describe_flow`]
522    /// / [`Self::list_edges`].
523    ///
524    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
525    #[cfg(feature = "core")]
526    async fn describe_edge(
527        &self,
528        _flow_id: &FlowId,
529        _edge_id: &EdgeId,
530    ) -> Result<Option<EdgeSnapshot>, EngineError> {
531        Err(EngineError::Unavailable {
532            op: "describe_edge",
533        })
534    }
535
536    /// Resolve an execution's owning flow id, if any.
537    ///
538    /// `Ok(None)` when the execution's core record is absent or has
539    /// no associated flow (standalone execution). A present-but-
540    /// malformed `flow_id` field surfaces as
541    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`].
542    ///
543    /// Gated on the `core` feature. Used by ff-sdk's
544    /// `list_outgoing_edges` / `list_incoming_edges` to pivot from a
545    /// consumer-supplied `ExecutionId` to the `FlowId` required by
546    /// [`Self::list_edges`]. A Valkey backend serves this with a
547    /// single `HGET exec_core flow_id`; a Postgres backend serves it
548    /// with the equivalent single-column row lookup.
549    ///
550    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
551    #[cfg(feature = "core")]
552    async fn resolve_execution_flow_id(
553        &self,
554        _eid: &ExecutionId,
555    ) -> Result<Option<FlowId>, EngineError> {
556        Err(EngineError::Unavailable {
557            op: "resolve_execution_flow_id",
558        })
559    }
560
561    /// List flows on a partition with cursor-based pagination (issue
562    /// #185).
563    ///
564    /// Returns a [`ListFlowsPage`] of [`FlowSummary`](crate::contracts::FlowSummary)
565    /// rows ordered by `flow_id` (UUID byte-lexicographic). `cursor`
566    /// is `None` for the first page; callers forward the returned
567    /// `next_cursor` verbatim to continue iteration, and the listing
568    /// is exhausted when `next_cursor` is `None`. `limit` is the
569    /// maximum number of rows to return on this page — implementations
570    /// MAY return fewer (end of partition) but MUST NOT exceed it.
571    ///
572    /// Ordering rationale: flow ids are UUIDs, and both Valkey
573    /// (sort after-the-fact) and Postgres (`ORDER BY flow_id`) can
574    /// agree on byte-lexicographic order — the same order
575    /// `FlowId::to_string()` produces for canonical hyphenated UUIDs.
576    /// Mapping to `cursor > flow_id` keeps the contract backend-
577    /// independent.
578    ///
579    /// # Postgres implementation pattern
580    ///
581    /// A Postgres-backed implementation serves this directly with
582    ///
583    /// ```sql
584    /// SELECT flow_id, created_at_ms, public_flow_state
585    ///   FROM ff_flow
586    ///  WHERE partition_key = $1
587    ///    AND ($2::uuid IS NULL OR flow_id > $2)
588    ///  ORDER BY flow_id
589    ///  LIMIT $3 + 1;
590    /// ```
591    ///
592    /// — reading one extra row to decide whether `next_cursor` should
593    /// be set to the last row's `flow_id`. The Valkey implementation
594    /// maintains the `ff:idx:{fp:N}:flow_index` SET and performs the
595    /// sort + slice client-side (SMEMBERS then sort-by-UUID-bytes),
596    /// pipelining `HGETALL flow_core` for each row on the page.
597    ///
598    /// Gated on the `core` feature — flow listing is part of the
599    /// minimal engine surface a Postgres-style backend must honour.
600    #[cfg(feature = "core")]
601    async fn list_flows(
602        &self,
603        _partition: PartitionKey,
604        _cursor: Option<FlowId>,
605        _limit: usize,
606    ) -> Result<ListFlowsPage, EngineError> {
607        Err(EngineError::Unavailable { op: "list_flows" })
608    }
609
610    /// Enumerate registered lanes with cursor-based pagination.
611    ///
612    /// Lanes are global (not partition-scoped) — the backend serves
613    /// this from its lane registry and does NOT accept a
614    /// [`crate::partition::Partition`] argument. Results are sorted
615    /// by [`LaneId`] name so the ordering is stable across calls and
616    /// cursors address a deterministic position in the sort.
617    ///
618    /// * `cursor` — exclusive lower bound. `None` starts from the
619    ///   first lane. To continue a walk, pass the previous page's
620    ///   [`ListLanesPage::next_cursor`].
621    /// * `limit` — hard cap on the number of lanes returned in the
622    ///   page. Backends MAY round this down when the registry size
623    ///   is smaller; they MUST NOT return more than `limit`.
624    ///
625    /// [`ListLanesPage::next_cursor`] is `Some(last_lane_in_page)`
626    /// iff at least one more lane exists after the returned page,
627    /// and `None` on the final page. Callers loop until `next_cursor`
628    /// is `None` to read the full registry.
629    ///
630    /// Gated on the `core` feature — lane enumeration is part of the
631    /// minimal snapshot surface an alternate backend must honour
632    /// alongside [`Self::describe_flow`] / [`Self::list_edges`].
633    #[cfg(feature = "core")]
634    async fn list_lanes(
635        &self,
636        _cursor: Option<LaneId>,
637        _limit: usize,
638    ) -> Result<ListLanesPage, EngineError> {
639        Err(EngineError::Unavailable { op: "list_lanes" })
640    }
641
642    /// List suspended executions in one partition, cursor-paginated,
643    /// with each entry's suspension `reason_code` populated (issue
644    /// #183).
645    ///
646    /// Consumer-facing "what's blocked on what?" panels (ff-board's
647    /// suspended-executions view, operator CLIs) need the reason in
648    /// the list response so the UI does not round-trip per row to
649    /// `describe_execution` for a field it knows it needs. `reason`
650    /// on [`SuspendedExecutionEntry`] carries the free-form
651    /// `suspension:current.reason_code` field — see the type rustdoc
652    /// for the String-not-enum rationale.
653    ///
654    /// `cursor` is opaque to callers; pass `None` to start a fresh
655    /// scan and feed the returned [`ListSuspendedPage::next_cursor`]
656    /// back in on subsequent pages until it comes back `None`.
657    /// `limit` bounds the `entries` count; backends MAY return fewer
658    /// when the partition is exhausted.
659    ///
660    /// Ordering is by ascending `suspended_at_ms` (the per-lane
661    /// suspended ZSET score == `timeout_at` or the no-timeout
662    /// sentinel) with execution id as a lex tiebreak, so cursor
663    /// continuation is deterministic across calls.
664    ///
665    /// Gated on the `core` feature — suspended-list enumeration is
666    /// part of the minimal engine surface a Postgres-style backend
667    /// must honour.
668    #[cfg(feature = "core")]
669    async fn list_suspended(
670        &self,
671        _partition: PartitionKey,
672        _cursor: Option<ExecutionId>,
673        _limit: usize,
674    ) -> Result<ListSuspendedPage, EngineError> {
675        Err(EngineError::Unavailable {
676            op: "list_suspended",
677        })
678    }
679
680    /// Forward-only paginated listing of the executions indexed under
681    /// one partition.
682    ///
683    /// Reads the partition-wide `ff:idx:{p:N}:all_executions` set,
684    /// sorts lexicographically on `ExecutionId`, and returns the page
685    /// of ids strictly greater than `cursor` (or starting from the
686    /// smallest id when `cursor = None`). The returned
687    /// [`ListExecutionsPage::next_cursor`] is the last id on the page
688    /// iff at least one more id exists past it; `None` signals
689    /// end-of-stream.
690    ///
691    /// `limit` is the maximum number of ids returned on this page. A
692    /// `limit` of `0` returns an empty page with `next_cursor = None`.
693    /// Backends MAY cap `limit` internally (Valkey: 1000) and return
694    /// fewer ids than requested; callers continue paginating until
695    /// `next_cursor == None`.
696    ///
697    /// Ordering is stable under concurrent inserts for already-emitted
698    /// ids (an id less-than-or-equal-to the caller's cursor is never
699    /// re-emitted in later pages) but new inserts past the cursor WILL
700    /// appear in subsequent pages — consistent with forward-only
701    /// cursor semantics.
702    ///
703    /// Gated on the `core` feature — partition-scoped listing is part
704    /// of the minimal engine surface every backend must honour.
705    #[cfg(feature = "core")]
706    async fn list_executions(
707        &self,
708        _partition: PartitionKey,
709        _cursor: Option<ExecutionId>,
710        _limit: usize,
711    ) -> Result<ListExecutionsPage, EngineError> {
712        Err(EngineError::Unavailable {
713            op: "list_executions",
714        })
715    }
716
717    // ── Trigger ops (issue #150) ──
718
719    /// Deliver an external signal to a suspended execution's waitpoint.
720    ///
721    /// The backend atomically records the signal, evaluates the resume
722    /// condition, and — when satisfied — transitions the execution
723    /// from `suspended` to `runnable` (or buffers the signal when the
724    /// waitpoint is still `pending`). Duplicate delivery — same
725    /// `idempotency_key` + waitpoint — surfaces as
726    /// [`DeliverSignalResult::Duplicate`] with the pre-existing
727    /// `signal_id` rather than mutating state twice.
728    ///
729    /// Input validation (HMAC token presence, payload size limits,
730    /// signal-name shape) is the backend's responsibility; callers
731    /// pass a fully populated [`DeliverSignalArgs`] and receive typed
732    /// outcomes or typed errors (`ScriptError::invalid_token`,
733    /// `ScriptError::token_expired`, `ScriptError::ExecutionNotFound`
734    /// surfaced via [`EngineError::Transport`] on the Valkey backend).
735    ///
736    /// Gated on the `core` feature — signal delivery is part of the
737    /// minimal trigger surface every backend must honour so ff-server
738    /// / REST handlers can dispatch against `Arc<dyn EngineBackend>`
739    /// without knowing which backend is running underneath.
740    #[cfg(feature = "core")]
741    async fn deliver_signal(
742        &self,
743        _args: DeliverSignalArgs,
744    ) -> Result<DeliverSignalResult, EngineError> {
745        Err(EngineError::Unavailable {
746            op: "deliver_signal",
747        })
748    }
749
750    /// Claim a resumed execution — a previously-suspended attempt that
751    /// has cleared its resume condition (e.g. via
752    /// [`Self::deliver_signal`]) and now needs a worker to pick up the
753    /// same attempt index.
754    ///
755    /// Distinct from [`Self::claim`] (fresh work) and
756    /// [`Self::claim_from_resume_grant`] (grant-based ownership transfer
757    /// after a crash): the resumed-claim path re-binds an existing
758    /// attempt rather than minting a new one. The backend issues a
759    /// fresh `lease_id` + bumps the `lease_epoch`, preserving
760    /// `attempt_id` / `attempt_index` so stream frames and progress
761    /// updates continue on the same attempt.
762    ///
763    /// Typed failures surface via `ScriptError` → `EngineError`:
764    /// `NotAResumedExecution` when the attempt state is not
765    /// `attempt_interrupted`, `ExecutionNotLeaseable` when the
766    /// lifecycle phase is not `runnable`, and `InvalidClaimGrant`
767    /// when the grant key is missing or was already consumed.
768    ///
769    /// Gated on the `core` feature — resumed-claim is part of the
770    /// minimal trigger surface every backend must honour.
771    #[cfg(feature = "core")]
772    async fn claim_resumed_execution(
773        &self,
774        _args: ClaimResumedExecutionArgs,
775    ) -> Result<ClaimResumedExecutionResult, EngineError> {
776        Err(EngineError::Unavailable {
777            op: "claim_resumed_execution",
778        })
779    }
780
781    /// Scan a lane's eligible ZSET on one partition for
782    /// highest-priority executions awaiting a worker (v0.12 PR-5).
783    ///
784    /// Lifted from the SDK-side `ZRANGEBYSCORE` inline on
785    /// `FlowFabricWorker::claim_next` — the scheduler-bypass scanner
786    /// gated behind `direct-valkey-claim`. The trait method itself is
787    /// backend-agnostic; consumers that drive the scanner loop
788    /// (bench harnesses, single-tenant dev) compose it with
789    /// [`Self::issue_claim_grant`] + [`Self::claim_execution`] to
790    /// replicate the pre-PR-5 `claim_next` body.
791    ///
792    /// # Backend coverage
793    ///
794    /// * **Valkey** — `ZRANGEBYSCORE eligible_zset -inf +inf LIMIT 0 <limit>`
795    ///   on the lane's partition-scoped eligible key. Single
796    ///   command; no script round-trip. Wire shape is byte-for-byte
797    ///   identical to the pre-PR SDK inline call so bench traces
798    ///   match pre-PR without new `#[tracing::instrument]` span names.
799    /// * **Postgres / SQLite** — use the `Err(Unavailable)` default.
800    ///   PG/SQLite consumers drive work through the scheduler-routed
801    ///   [`Self::claim_for_worker`] path instead of the scanner
802    ///   primitives exposed here; lifting the scheduler itself onto
803    ///   the trait is RFC-024 follow-up scope. See
804    ///   `project_claim_from_grant_pg_sqlite_gap.md` for motivation.
805    ///
806    /// Default impl returns [`EngineError::Unavailable`] so the trait
807    /// addition is non-breaking for out-of-tree backends. Same
808    /// precedent as [`Self::claim_execution`] landing in v0.12 PR-4.
809    #[cfg(feature = "core")]
810    async fn scan_eligible_executions(
811        &self,
812        _args: ScanEligibleArgs,
813    ) -> Result<Vec<ExecutionId>, EngineError> {
814        Err(EngineError::Unavailable {
815            op: "scan_eligible_executions",
816        })
817    }
818
819    /// Issue a claim grant — the scheduler's admission write — for a
820    /// single execution on a single lane (v0.12 PR-5).
821    ///
822    /// Lifted from the SDK-side `ff_issue_claim_grant` inline helper
823    /// on `FlowFabricWorker::claim_next`. The backend atomically
824    /// writes the grant hash, appends to the per-worker grant index,
825    /// and removes the execution from the lane's eligible ZSET.
826    ///
827    /// Typed rejects surface via [`EngineError::Validation`]:
828    /// `CapabilityMismatch` when the worker's capabilities do not
829    /// cover the execution's `required_capabilities`, `InvalidInput`
830    /// for malformed args. Transport faults surface via
831    /// [`EngineError::Transport`].
832    ///
833    /// # Backend coverage
834    ///
835    /// * **Valkey** — one `ff_issue_claim_grant` FCALL. KEYS/ARGV
836    ///   shape is byte-for-byte identical to the pre-PR SDK inline
837    ///   call; bench traces match pre-PR.
838    /// * **Postgres / SQLite** — `Err(Unavailable)` default; use
839    ///   [`Self::claim_for_worker`] instead. See
840    ///   [`Self::scan_eligible_executions`] for the cross-link
841    ///   rationale.
842    #[cfg(feature = "core")]
843    async fn issue_claim_grant(
844        &self,
845        _args: IssueClaimGrantArgs,
846    ) -> Result<IssueClaimGrantOutcome, EngineError> {
847        Err(EngineError::Unavailable {
848            op: "issue_claim_grant",
849        })
850    }
851
852    /// Move an execution from a lane's eligible ZSET into its
853    /// blocked_route ZSET (v0.12 PR-5).
854    ///
855    /// Lifted from the SDK-side `ff_block_execution_for_admission`
856    /// inline helper on `FlowFabricWorker::claim_next`. Called after
857    /// a [`Self::issue_claim_grant`] `CapabilityMismatch` reject —
858    /// without a block step, the inline scanner would re-pick the
859    /// same top-of-ZSET every tick (parity with
860    /// `ff-scheduler::Scheduler::block_candidate`).
861    ///
862    /// The engine's unblock scanner periodically promotes
863    /// blocked_route back to eligible once a worker with matching
864    /// caps registers.
865    ///
866    /// # Backend coverage
867    ///
868    /// * **Valkey** — one `ff_block_execution_for_admission` FCALL.
869    /// * **Postgres / SQLite** — `Err(Unavailable)` default; the
870    ///   scheduler-routed [`Self::claim_for_worker`] path handles
871    ///   admission rejects server-side.
872    #[cfg(feature = "core")]
873    async fn block_route(
874        &self,
875        _args: BlockRouteArgs,
876    ) -> Result<BlockRouteOutcome, EngineError> {
877        Err(EngineError::Unavailable { op: "block_route" })
878    }
879
880    /// Consume a scheduler-issued claim grant to mint a fresh attempt.
881    ///
882    /// The SDK's grant-consumer path — paired with `FlowFabricWorker::claim_from_grant`
883    /// in `ff-sdk` — routes through this method. The scheduler has
884    /// already validated budget / quota / capabilities and written a
885    /// grant (Valkey `claim_grant` hash); this call atomically
886    /// consumes that grant and creates the attempt row, mints
887    /// `lease_id` + `lease_epoch`, and returns a
888    /// [`ClaimExecutionResult::Claimed`] carrying the minted lease
889    /// triple.
890    ///
891    /// Distinct from [`Self::claim`] (the scheduler-bypass scanner
892    /// used by the `direct-valkey-claim` feature) — this method
893    /// assumes the grant already exists and skips capability / ZSET
894    /// scanning. The Valkey impl fires exactly one `ff_claim_execution`
895    /// FCALL.
896    ///
897    /// Typed failures surface via `ScriptError` → `EngineError`:
898    /// `UseClaimResumedExecution` when the attempt is actually
899    /// `attempt_interrupted` (caller should retry via
900    /// [`Self::claim_resumed_execution`] — see `ContentionKind` at
901    /// `ff_core::engine_error`), `InvalidClaimGrant` when the grant is
902    /// missing / consumed / worker-mismatched, `CapabilityMismatch`
903    /// when the execution's `required_capabilities` drifted after
904    /// grant issuance.
905    ///
906    /// # Backend coverage
907    ///
908    /// * **Valkey** — implemented in `ff-backend-valkey` (one
909    ///   `ff_claim_execution` FCALL).
910    /// * **Postgres / SQLite** — use the `Err(Unavailable)` default in
911    ///   this PR. Grants on PG / SQLite today flow through
912    ///   `PostgresScheduler::claim_for_worker` (a sibling struct, not
913    ///   an `EngineBackend` method); wiring the default-over-trait
914    ///   behaviour into a PG / SQLite `claim_execution` impl lands
915    ///   with a future RFC-024 grant-consumer extension.
916    ///
917    /// The default impl returns [`EngineError::Unavailable`] so the
918    /// trait addition is non-breaking for out-of-tree backends. Same
919    /// precedent as [`Self::read_current_attempt_index`] landing in
920    /// v0.12 PR-3.
921    #[cfg(feature = "core")]
922    async fn claim_execution(
923        &self,
924        _args: ClaimExecutionArgs,
925    ) -> Result<ClaimExecutionResult, EngineError> {
926        Err(EngineError::Unavailable {
927            op: "claim_execution",
928        })
929    }
930
931    /// Operator-initiated cancellation of a flow and (optionally) its
932    /// member executions. See RFC-012 §3.1.1 for the policy /wait
933    /// matrix.
934    async fn cancel_flow(
935        &self,
936        id: &FlowId,
937        policy: CancelFlowPolicy,
938        wait: CancelFlowWait,
939    ) -> Result<CancelFlowResult, EngineError>;
940
941    /// RFC-016 Stage A: set the inbound-edge-group policy for a
942    /// downstream execution. Must be called before the first
943    /// `add_dependency(... -> downstream_execution_id)` — the backend
944    /// rejects with [`EngineError::Conflict`] if edges have already
945    /// been staged for this group.
946    ///
947    /// Stage A honours only
948    /// [`EdgeDependencyPolicy::AllOf`](crate::contracts::EdgeDependencyPolicy::AllOf);
949    /// the `AnyOf` / `Quorum` variants return
950    /// [`EngineError::Validation`] with
951    /// `detail = "stage A supports AllOf only; AnyOf/Quorum land in stage B"`
952    /// until Stage B's resolver lands.
953    #[cfg(feature = "core")]
954    async fn set_edge_group_policy(
955        &self,
956        _flow_id: &FlowId,
957        _downstream_execution_id: &ExecutionId,
958        _policy: crate::contracts::EdgeDependencyPolicy,
959    ) -> Result<crate::contracts::SetEdgeGroupPolicyResult, EngineError> {
960        Err(EngineError::Unavailable {
961            op: "set_edge_group_policy",
962        })
963    }
964
965    // ── HMAC secret rotation (v0.7 migration-master Q4) ──
966
967    /// Rotate the waitpoint HMAC signing kid **cluster-wide**.
968    ///
969    /// **v0.7 migration-master Q4 (adjudicated 2026-04-24).**
970    /// Additive trait surface so Valkey and Postgres backends can
971    /// both expose the "rotate everywhere" semantic under one name.
972    ///
973    /// * Valkey impl fans out an `ff_rotate_waitpoint_hmac_secret`
974    ///   FCALL per execution partition. `entries.len() == num_flow_partitions`
975    ///   and per-partition failures are surfaced as inner `Err`
976    ///   entries — the call as a whole does not fail when one
977    ///   partition's FCALL fails, matching
978    ///   [`ff_sdk::admin::rotate_waitpoint_hmac_secret_all_partitions`]'s
979    ///   partial-success contract.
980    /// * Postgres impl (Wave 4) writes one row to
981    ///   `ff_waitpoint_hmac(kid, secret, rotated_at)` and returns a
982    ///   single-entry vec with `partition = 0`.
983    ///
984    /// The default impl returns
985    /// [`EngineError::Unavailable`] with
986    /// `op = "rotate_waitpoint_hmac_secret_all"` so backends that
987    /// haven't implemented the method surface the miss loudly rather
988    /// than silently no-op'ing. Both concrete backends override.
989    async fn rotate_waitpoint_hmac_secret_all(
990        &self,
991        _args: RotateWaitpointHmacSecretAllArgs,
992    ) -> Result<RotateWaitpointHmacSecretAllResult, EngineError> {
993        Err(EngineError::Unavailable {
994            op: "rotate_waitpoint_hmac_secret_all",
995        })
996    }
997
998    /// Seed the initial waitpoint HMAC secret for a fresh deployment
999    /// (issue #280).
1000    ///
1001    /// **Idempotent.** If a `current_kid` (Valkey per-partition) or
1002    /// an active kid row (Postgres) already exists with the given
1003    /// `kid`, the method returns
1004    /// [`SeedOutcome::AlreadySeeded`] without overwriting, reporting
1005    /// whether the stored secret matches the caller-supplied one via
1006    /// `same_secret`. Callers (cairn boot, operator tooling) invoke
1007    /// this on every boot and let the backend decide whether to
1008    /// install — removing the client-side "check then HSET" race that
1009    /// cairn's raw-HSET boot path silently tolerated.
1010    ///
1011    /// For rotation of an already-seeded secret, use
1012    /// [`Self::rotate_waitpoint_hmac_secret_all`] instead; seed is
1013    /// install-only.
1014    ///
1015    /// The default impl returns [`EngineError::Unavailable`] with
1016    /// `op = "seed_waitpoint_hmac_secret"` so backends that haven't
1017    /// implemented the method surface the miss loudly.
1018    async fn seed_waitpoint_hmac_secret(
1019        &self,
1020        _args: SeedWaitpointHmacSecretArgs,
1021    ) -> Result<SeedOutcome, EngineError> {
1022        Err(EngineError::Unavailable {
1023            op: "seed_waitpoint_hmac_secret",
1024        })
1025    }
1026
1027    // ── Budget ──
1028
1029    /// Report usage against a budget and check limits. Returns the
1030    /// typed [`ReportUsageResult`] variant; backends enforce
1031    /// idempotency via the caller-supplied
1032    /// [`UsageDimensions::dedup_key`] (RFC-012 §R7.2.3 — replaces
1033    /// the pre-Round-7 `AdmissionDecision` return).
1034    async fn report_usage(
1035        &self,
1036        handle: &Handle,
1037        budget: &BudgetId,
1038        dimensions: crate::backend::UsageDimensions,
1039    ) -> Result<ReportUsageResult, EngineError>;
1040
1041    // ── Stream reads (RFC-012 Stage 1c tranche-4; issue #87) ──
1042
1043    /// Read frames from a completed or in-flight attempt's stream.
1044    ///
1045    /// `from` / `to` are [`StreamCursor`] values — `StreamCursor::Start`
1046    /// / `StreamCursor::End` are equivalent to XRANGE `-` / `+`, and
1047    /// `StreamCursor::At("<id>")` reads from a concrete entry id.
1048    ///
1049    /// Input validation (count_limit bounds, cursor shape) is the
1050    /// caller's responsibility — SDK-side wrappers in
1051    /// [`ff-sdk`](https://docs.rs/ff-sdk) enforce bounds before
1052    /// forwarding. Backends MAY additionally reject out-of-range
1053    /// input via [`EngineError::Validation`].
1054    ///
1055    /// Gated on the `streaming` feature — stream reads are part of
1056    /// the stream-subset surface a backend without XREAD-like
1057    /// primitives may omit.
1058    #[cfg(feature = "streaming")]
1059    async fn read_stream(
1060        &self,
1061        _execution_id: &ExecutionId,
1062        _attempt_index: AttemptIndex,
1063        _from: StreamCursor,
1064        _to: StreamCursor,
1065        _count_limit: u64,
1066    ) -> Result<StreamFrames, EngineError> {
1067        Err(EngineError::Unavailable { op: "read_stream" })
1068    }
1069
1070    /// Tail a live attempt's stream.
1071    ///
1072    /// `after` is an exclusive [`StreamCursor`] — entries with id
1073    /// strictly greater than `after` are returned. `StreamCursor::Start`
1074    /// / `StreamCursor::End` are NOT accepted here; callers MUST pass
1075    /// a concrete id (or `StreamCursor::from_beginning()`). The SDK
1076    /// wrapper rejects the open markers before reaching the backend.
1077    ///
1078    /// `block_ms == 0` → non-blocking peek. `block_ms > 0` → blocks up
1079    /// to that many ms for a new entry.
1080    ///
1081    /// `visibility` (RFC-015 §6.1) filters the returned entries by
1082    /// their stored [`StreamMode`](crate::backend::StreamMode)
1083    /// `mode` field. Default
1084    /// [`TailVisibility::All`](crate::backend::TailVisibility::All)
1085    /// preserves v1 behaviour.
1086    ///
1087    /// Gated on the `streaming` feature — see [`read_stream`](Self::read_stream).
1088    #[cfg(feature = "streaming")]
1089    async fn tail_stream(
1090        &self,
1091        _execution_id: &ExecutionId,
1092        _attempt_index: AttemptIndex,
1093        _after: StreamCursor,
1094        _block_ms: u64,
1095        _count_limit: u64,
1096        _visibility: TailVisibility,
1097    ) -> Result<StreamFrames, EngineError> {
1098        Err(EngineError::Unavailable { op: "tail_stream" })
1099    }
1100
1101    /// Read the rolling summary document for an attempt (RFC-015 §6.3).
1102    ///
1103    /// Returns `Ok(None)` when no [`StreamMode::DurableSummary`](crate::backend::StreamMode::DurableSummary)
1104    /// frame has ever been appended for the attempt. Non-blocking Hash
1105    /// read; safe to call from any consumer without holding the lease.
1106    ///
1107    /// Gated on the `streaming` feature — summary reads are part of
1108    /// the stream-subset surface.
1109    #[cfg(feature = "streaming")]
1110    async fn read_summary(
1111        &self,
1112        _execution_id: &ExecutionId,
1113        _attempt_index: AttemptIndex,
1114    ) -> Result<Option<SummaryDocument>, EngineError> {
1115        Err(EngineError::Unavailable {
1116            op: "read_summary",
1117        })
1118    }
1119
1120    // ── RFC-017 Stage A — Ingress (5) ──────────────────────────
1121    //
1122    // Every method in this block has a default impl returning
1123    // `EngineError::Unavailable { op }` per RFC-017 §5.3. Concrete
1124    // backends override each method with a real body. A missing
1125    // override surfaces as a loud typed error at the call site rather
1126    // than a silent no-op.
1127
1128    /// Create an execution. Ingress row 6 (RFC-017 §4). Wraps
1129    /// `ff_create_execution` on Valkey; `INSERT INTO ff_execution ...`
1130    /// on Postgres. The `idempotency_key` + backend-side default
1131    /// `dedup_ttl_ms = 86400000` make duplicate submissions idempotent.
1132    #[cfg(feature = "core")]
1133    async fn create_execution(
1134        &self,
1135        _args: CreateExecutionArgs,
1136    ) -> Result<CreateExecutionResult, EngineError> {
1137        Err(EngineError::Unavailable {
1138            op: "create_execution",
1139        })
1140    }
1141
1142    /// Create a flow header. Ingress row 5.
1143    #[cfg(feature = "core")]
1144    async fn create_flow(
1145        &self,
1146        _args: CreateFlowArgs,
1147    ) -> Result<CreateFlowResult, EngineError> {
1148        Err(EngineError::Unavailable { op: "create_flow" })
1149    }
1150
1151    /// Atomically add an execution to a flow (single-FCALL co-located
1152    /// commit on Valkey; single-transaction UPSERT on Postgres).
1153    #[cfg(feature = "core")]
1154    async fn add_execution_to_flow(
1155        &self,
1156        _args: AddExecutionToFlowArgs,
1157    ) -> Result<AddExecutionToFlowResult, EngineError> {
1158        Err(EngineError::Unavailable {
1159            op: "add_execution_to_flow",
1160        })
1161    }
1162
1163    /// Stage a dependency edge between flow members. CAS-guarded on
1164    /// `graph_revision` — stale rev returns `Contention(StaleGraphRevision)`.
1165    #[cfg(feature = "core")]
1166    async fn stage_dependency_edge(
1167        &self,
1168        _args: StageDependencyEdgeArgs,
1169    ) -> Result<StageDependencyEdgeResult, EngineError> {
1170        Err(EngineError::Unavailable {
1171            op: "stage_dependency_edge",
1172        })
1173    }
1174
1175    /// Apply a staged dependency edge to its downstream child.
1176    #[cfg(feature = "core")]
1177    async fn apply_dependency_to_child(
1178        &self,
1179        _args: ApplyDependencyToChildArgs,
1180    ) -> Result<ApplyDependencyToChildResult, EngineError> {
1181        Err(EngineError::Unavailable {
1182            op: "apply_dependency_to_child",
1183        })
1184    }
1185
1186    // ── RFC-017 Stage A — Operator control (4) ─────────────────
1187
1188    /// Operator-initiated execution cancel (row 2).
1189    #[cfg(feature = "core")]
1190    async fn cancel_execution(
1191        &self,
1192        _args: CancelExecutionArgs,
1193    ) -> Result<CancelExecutionResult, EngineError> {
1194        Err(EngineError::Unavailable {
1195            op: "cancel_execution",
1196        })
1197    }
1198
1199    /// Re-score an execution's eligibility priority (row 17).
1200    #[cfg(feature = "core")]
1201    async fn change_priority(
1202        &self,
1203        _args: ChangePriorityArgs,
1204    ) -> Result<ChangePriorityResult, EngineError> {
1205        Err(EngineError::Unavailable {
1206            op: "change_priority",
1207        })
1208    }
1209
1210    /// Replay a terminal execution (row 22). Variadic KEYS handling
1211    /// (inbound-edge pre-read) is hidden inside the Valkey impl per
1212    /// RFC-017 §4 row 3.
1213    #[cfg(feature = "core")]
1214    async fn replay_execution(
1215        &self,
1216        _args: ReplayExecutionArgs,
1217    ) -> Result<ReplayExecutionResult, EngineError> {
1218        Err(EngineError::Unavailable {
1219            op: "replay_execution",
1220        })
1221    }
1222
1223    /// Operator-initiated lease revoke (row 19).
1224    #[cfg(feature = "core")]
1225    async fn revoke_lease(
1226        &self,
1227        _args: RevokeLeaseArgs,
1228    ) -> Result<RevokeLeaseResult, EngineError> {
1229        Err(EngineError::Unavailable { op: "revoke_lease" })
1230    }
1231
1232    // ── RFC-017 Stage A — Budget + quota admin (5) ─────────────
1233
1234    /// Create a budget definition (row 6).
1235    #[cfg(feature = "core")]
1236    async fn create_budget(
1237        &self,
1238        _args: CreateBudgetArgs,
1239    ) -> Result<CreateBudgetResult, EngineError> {
1240        Err(EngineError::Unavailable {
1241            op: "create_budget",
1242        })
1243    }
1244
1245    /// Reset a budget's usage counters (row 10).
1246    #[cfg(feature = "core")]
1247    async fn reset_budget(
1248        &self,
1249        _args: ResetBudgetArgs,
1250    ) -> Result<ResetBudgetResult, EngineError> {
1251        Err(EngineError::Unavailable { op: "reset_budget" })
1252    }
1253
1254    /// Create a quota policy (row 7).
1255    #[cfg(feature = "core")]
1256    async fn create_quota_policy(
1257        &self,
1258        _args: CreateQuotaPolicyArgs,
1259    ) -> Result<CreateQuotaPolicyResult, EngineError> {
1260        Err(EngineError::Unavailable {
1261            op: "create_quota_policy",
1262        })
1263    }
1264
1265    /// Read-only budget status for operator visibility (row 8).
1266    #[cfg(feature = "core")]
1267    async fn get_budget_status(
1268        &self,
1269        _id: &BudgetId,
1270    ) -> Result<BudgetStatus, EngineError> {
1271        Err(EngineError::Unavailable {
1272            op: "get_budget_status",
1273        })
1274    }
1275
1276    /// Admin-path `report_usage` (row 9 + RFC-017 §5 round-1 F4).
1277    /// Distinct from the existing [`Self::report_usage`] which takes
1278    /// a worker handle — the admin path has no lease context.
1279    #[cfg(feature = "core")]
1280    async fn report_usage_admin(
1281        &self,
1282        _budget: &BudgetId,
1283        _args: ReportUsageAdminArgs,
1284    ) -> Result<ReportUsageResult, EngineError> {
1285        Err(EngineError::Unavailable {
1286            op: "report_usage_admin",
1287        })
1288    }
1289
1290    // ── RFC-017 Stage A — Read + diagnostics (3) ───────────────
1291
1292    /// Fetch the stored result payload for a completed execution
1293    /// (row 4). Returns `Ok(None)` when the execution is missing, not
1294    /// yet complete, or its payload was trimmed by retention policy.
1295    async fn get_execution_result(
1296        &self,
1297        _id: &ExecutionId,
1298    ) -> Result<Option<Vec<u8>>, EngineError> {
1299        Err(EngineError::Unavailable {
1300            op: "get_execution_result",
1301        })
1302    }
1303
1304    /// List the pending-or-active waitpoints for an execution, cursor
1305    /// paginated (row 5 / §8). Stage A preserves the existing
1306    /// `PendingWaitpointInfo` shape; Stage D ships the §8 HMAC
1307    /// sanitisation + `(token_kid, token_fingerprint)` schema.
1308    #[cfg(feature = "core")]
1309    async fn list_pending_waitpoints(
1310        &self,
1311        _args: ListPendingWaitpointsArgs,
1312    ) -> Result<ListPendingWaitpointsResult, EngineError> {
1313        Err(EngineError::Unavailable {
1314            op: "list_pending_waitpoints",
1315        })
1316    }
1317
1318    /// Backend-level reachability probe (row 1). Valkey: `PING`;
1319    /// Postgres: `SELECT 1`.
1320    async fn ping(&self) -> Result<(), EngineError> {
1321        Err(EngineError::Unavailable { op: "ping" })
1322    }
1323
1324    // ── RFC-017 Stage A — Scheduling (1) ───────────────────────
1325
1326    /// Scheduler-routed claim entrypoint (row 18, RFC-017 §7). Valkey
1327    /// forwards to its `ff_scheduler::Scheduler` cursor; Postgres
1328    /// forwards to `PostgresScheduler`'s `FOR UPDATE SKIP LOCKED`
1329    /// path.
1330    ///
1331    /// Backends that carry an embedded scheduler (e.g. `ValkeyBackend`
1332    /// constructed via `with_embedded_scheduler`, or `PostgresBackend`
1333    /// with its `with_scanners` sibling) route the claim through it.
1334    /// Backends without a wired scheduler return
1335    /// [`EngineError::Unavailable`]. HTTP consumers use
1336    /// `FlowFabricWorker::claim_via_server` instead.
1337    #[cfg(feature = "core")]
1338    async fn claim_for_worker(
1339        &self,
1340        _args: ClaimForWorkerArgs,
1341    ) -> Result<ClaimForWorkerOutcome, EngineError> {
1342        Err(EngineError::Unavailable {
1343            op: "claim_for_worker",
1344        })
1345    }
1346
1347    // ── Cross-cutting (RFC-017 Stage B trait-lift) ──────────────
1348
1349    /// Static observability label identifying the backend family in
1350    /// logs + metrics (RFC-017 §5.4 + §9 Stage B). Default impl
1351    /// returns `"unknown"` so legacy `impl EngineBackend` blocks that
1352    /// have not upgraded keep compiling; every in-tree backend
1353    /// overrides — `ValkeyBackend` → `"valkey"`, `PostgresBackend` →
1354    /// `"postgres"`.
1355    fn backend_label(&self) -> &'static str {
1356        "unknown"
1357    }
1358
1359    /// Backend downcast escape hatch (v0.12 PR-7a transitional).
1360    ///
1361    /// Scanner supervisors in `ff-engine` still dispatch through a
1362    /// concrete `ferriskey::Client`; to keep the engine's public
1363    /// boundary backend-agnostic (`Arc<dyn EngineBackend>`) while the
1364    /// scanner internals remain Valkey-shaped, the engine downcasts
1365    /// via this method and reaches in for the embedded client. Every
1366    /// backend that wants to be consumed by `Engine::start_with_completions`
1367    /// overrides this to return `self` as `&dyn Any`; the default
1368    /// returns a placeholder so a stray `downcast_ref` fails cleanly
1369    /// rather than risking unsound behaviour.
1370    ///
1371    /// v0.13 (PR-7b) will trait-ify individual scanners onto
1372    /// `EngineBackend` and retire `ff-engine`'s dependence on this
1373    /// downcast path. The method itself will remain on the trait
1374    /// (likely deprecated) rather than be removed — removing a
1375    /// public trait method is a breaking change for external
1376    /// `impl EngineBackend` blocks.
1377    fn as_any(&self) -> &(dyn std::any::Any + 'static) {
1378        // Placeholder so the default does not expose `Self` for
1379        // downcast. Backends override to return `self`.
1380        &()
1381    }
1382
1383    /// RFC-018 Stage A: snapshot of this backend's identity + the
1384    /// flat `Supports` surface it can actually service. Consumers use
1385    /// this at startup to gate UI features / choose between alternative
1386    /// code paths before dispatching. See
1387    /// `rfcs/RFC-018-backend-capability-discovery.md` for the full
1388    /// discovery contract and the four owner-adjudicated open
1389    /// questions (granularity: coarse; version: struct; sync; no
1390    /// event stream).
1391    ///
1392    /// Default: returns a value tagged `family = "unknown"` with every
1393    /// `supports.*` bool `false`, so pre-RFC-018 out-of-tree backends
1394    /// keep compiling and consumers treat "all false" as "dispatch
1395    /// and catch [`EngineError::Unavailable`]" (pre-RFC-018 behaviour).
1396    /// Concrete in-tree backends (`ValkeyBackend`, `PostgresBackend`)
1397    /// override to populate a real value.
1398    ///
1399    /// Sync (no `.await`): backend-static info should not require a
1400    /// probe on every query. Dynamic probes happen once at
1401    /// `connect*` time and cache the result.
1402    fn capabilities(&self) -> crate::capability::Capabilities {
1403        crate::capability::Capabilities::new(
1404            crate::capability::BackendIdentity::new(
1405                "unknown",
1406                crate::capability::Version::new(0, 0, 0),
1407                "unknown",
1408            ),
1409            crate::capability::Supports::none(),
1410        )
1411    }
1412
1413    /// Issue #281: run one-time backend-specific boot preparation.
1414    ///
1415    /// Intended to run ONCE per deployment startup — NOT per request.
1416    /// Idempotent and safe for consumers to call on every application
1417    /// boot; backends that have nothing to do return
1418    /// [`PrepareOutcome::NoOp`] without side effects.
1419    ///
1420    /// Per-backend behaviour:
1421    ///
1422    /// * **Valkey** — issues `FUNCTION LOAD REPLACE` for the
1423    ///   `flowfabric` Lua library (with bounded retry on transient
1424    ///   transport faults; permanent compile errors surface as
1425    ///   [`EngineError::Transport`] without retry). Returns
1426    ///   [`PrepareOutcome::Applied`] carrying
1427    ///   `"FUNCTION LOAD (flowfabric lib v<N>)"`.
1428    /// * **Postgres** — returns [`PrepareOutcome::NoOp`]. Schema
1429    ///   migrations are applied out-of-band per
1430    ///   `rfcs/drafts/v0.7-migration-master.md §Q12`; the backend
1431    ///   runs a schema-version check at connect time and refuses to
1432    ///   start on mismatch, so no boot-side prepare work remains.
1433    /// * **Default impl** — returns [`PrepareOutcome::NoOp`] so
1434    ///   out-of-tree backends without preparation work compile
1435    ///   without boilerplate.
1436    ///
1437    /// # Relationship to the in-tree boot path
1438    ///
1439    /// `ValkeyBackend::initialize_deployment` (called from
1440    /// `Server::start_with_metrics`) already invokes
1441    /// [`ensure_library`](ff_script::loader::ensure_library) inline as
1442    /// its step 4; that path is unchanged. `prepare()` exists as a
1443    /// **trait-surface entry point** so consumers that construct an
1444    /// `Arc<dyn EngineBackend>` outside of `Server` (e.g.
1445    /// cairn-fabric's boot path at `cairn-fabric/src/boot.rs`) can
1446    /// run the same preparation without reaching into
1447    /// backend-specific modules. The overlap is intentional: calling
1448    /// both `prepare()` and `initialize_deployment` is safe because
1449    /// `FUNCTION LOAD REPLACE` is idempotent under the version
1450    /// check.
1451    ///
1452    /// # Layer forwarding
1453    ///
1454    /// Layer impls (`HookedBackend`, ff-sdk layers) do NOT forward
1455    /// `prepare` today — consistent with `backend_label` / `ping` /
1456    /// `shutdown_prepare`. Consumers that wrap a backend in layers
1457    /// MUST call `prepare()` on the raw backend before wrapping, or
1458    /// accept the default [`PrepareOutcome::NoOp`].
1459    async fn prepare(&self) -> Result<PrepareOutcome, EngineError> {
1460        Ok(PrepareOutcome::NoOp)
1461    }
1462
1463    /// Drain-before-shutdown hook (RFC-017 §5.4). The server calls
1464    /// this before draining its own background tasks so backend-
1465    /// scoped primitives (Valkey stream semaphore, Postgres sqlx
1466    /// pool, …) can close their gates and await in-flight work up to
1467    /// `grace`.
1468    ///
1469    /// Default impl returns `Ok(())` — a no-op backend has nothing
1470    /// backend-scoped to drain. Concrete backends whose data plane
1471    /// owns resources (connection pools, semaphores, listeners)
1472    /// override with a real body.
1473    async fn shutdown_prepare(&self, _grace: Duration) -> Result<(), EngineError> {
1474        Ok(())
1475    }
1476
1477    // ── RFC-017 Stage E2 — `Server::client` removal (header + reads) ───
1478
1479    /// RFC-017 Stage E2: the "header" portion of `cancel_flow` — run the
1480    /// atomic flow-state flip (Valkey: `ff_cancel_flow` FCALL; Postgres:
1481    /// `cancel_flow_once` tx), decode policy + membership, and surface
1482    /// the `flow_already_terminal` idempotency branch as a first-class
1483    /// [`CancelFlowHeader::AlreadyTerminal`] so the Server can build
1484    /// the wire [`CancelFlowResult`] without reaching for a raw
1485    /// `Client`. Separate from the existing
1486    /// [`EngineBackend::cancel_flow`] entry point (which takes the
1487    /// enum-typed `(policy, wait)` split and returns the wait-collapsed
1488    /// `CancelFlowResult`) because the Server owns its own
1489    /// wait-dispatch + member-cancel machinery via
1490    /// [`EngineBackend::cancel_execution`] + backlog ack.
1491    ///
1492    /// Default impl returns [`EngineError::Unavailable`] so un-migrated
1493    /// backends surface the miss loudly.
1494    #[cfg(feature = "core")]
1495    async fn cancel_flow_header(
1496        &self,
1497        _args: CancelFlowArgs,
1498    ) -> Result<crate::contracts::CancelFlowHeader, EngineError> {
1499        Err(EngineError::Unavailable {
1500            op: "cancel_flow_header",
1501        })
1502    }
1503
1504    /// RFC-017 Stage E2: best-effort acknowledgement that one member of
1505    /// a `cancel_all` flow has completed its per-member cancel. Drains
1506    /// the member from the flow's `pending_cancels` set and, if empty,
1507    /// removes the flow from the partition-level `cancel_backlog`
1508    /// (Valkey: `ff_ack_cancel_member` FCALL; Postgres: table write —
1509    /// default `Unavailable` until Wave 9).
1510    ///
1511    /// Failures are swallowed by the caller — the cancel-backlog
1512    /// reconciler is the authoritative drain — but a typed error here
1513    /// lets the caller log a backend-scoped context string.
1514    #[cfg(feature = "core")]
1515    async fn ack_cancel_member(
1516        &self,
1517        _flow_id: &FlowId,
1518        _execution_id: &ExecutionId,
1519    ) -> Result<(), EngineError> {
1520        Err(EngineError::Unavailable {
1521            op: "ack_cancel_member",
1522        })
1523    }
1524
1525    /// RFC-017 Stage E2: full-shape execution read used by the
1526    /// `GET /v1/executions/{id}` HTTP route. Returns the legacy
1527    /// [`ExecutionInfo`] wire shape (not the decoupled
1528    /// [`ExecutionSnapshot`]) so the existing HTTP response bytes stay
1529    /// identical across the migration.
1530    ///
1531    /// `Ok(None)` ⇒ no such execution. Default `Unavailable` because
1532    /// the Valkey HGETALL-and-parse is backend-specific.
1533    #[cfg(feature = "core")]
1534    async fn read_execution_info(
1535        &self,
1536        _id: &ExecutionId,
1537    ) -> Result<Option<ExecutionInfo>, EngineError> {
1538        Err(EngineError::Unavailable {
1539            op: "read_execution_info",
1540        })
1541    }
1542
1543    /// RFC-017 Stage E2: narrow `public_state` read used by the
1544    /// `GET /v1/executions/{id}/state` HTTP route. Returns `Ok(None)`
1545    /// when the execution is missing. Default `Unavailable`.
1546    #[cfg(feature = "core")]
1547    async fn read_execution_state(
1548        &self,
1549        _id: &ExecutionId,
1550    ) -> Result<Option<PublicState>, EngineError> {
1551        Err(EngineError::Unavailable {
1552            op: "read_execution_state",
1553        })
1554    }
1555
1556    // ── RFC-019 Stage A/B/C — Stream-cursor subscriptions ─────────
1557    //
1558    // Four owner-adjudicated families (RFC-019 §Open Questions #5):
1559    // `lease_history`, `completion`, `signal_delivery`,
1560    // `instance_tags`. Stage C (this crate) promotes each family to
1561    // a typed event enum; consumers `match` on variants instead of
1562    // parsing a backend-shaped byte blob.
1563    //
1564    // Each method returns a family-specific subscription alias (see
1565    // [`crate::stream_events`]). All defaults return
1566    // `EngineError::Unavailable` per RFC-017 trait-growth conventions.
1567
1568    /// Subscribe to lease lifecycle events (acquired / renewed /
1569    /// expired / reclaimed / revoked) for the partition this backend
1570    /// is configured with.
1571    ///
1572    /// Cross-partition fan-out is consumer-side merge: subscribe
1573    /// per-partition backend instance and interleave on the read
1574    /// side. Yields
1575    /// `Err(EngineError::StreamDisconnected { cursor })` on backend
1576    /// disconnect; resume by calling this method again with the
1577    /// returned cursor.
1578    ///
1579    /// `filter` (#282): when `filter.instance_tag` is `Some((k, v))`,
1580    /// only events whose execution carries tag `k = v` are yielded
1581    /// (matching the [`crate::backend::ScannerFilter`] surface from
1582    /// #122). Pass `&ScannerFilter::default()` for unfiltered
1583    /// behaviour. Filtering happens inside the backend stream; the
1584    /// [`crate::stream_events::LeaseHistorySubscription`] return type
1585    /// is unchanged.
1586    async fn subscribe_lease_history(
1587        &self,
1588        _cursor: crate::stream_subscribe::StreamCursor,
1589        _filter: &crate::backend::ScannerFilter,
1590    ) -> Result<crate::stream_events::LeaseHistorySubscription, EngineError> {
1591        Err(EngineError::Unavailable {
1592            op: "subscribe_lease_history",
1593        })
1594    }
1595
1596    /// Subscribe to completion events (terminal state transitions).
1597    ///
1598    /// - **Postgres**: wraps the `ff_completion_event` outbox +
1599    ///   LISTEN/NOTIFY machinery. Durable via event-id cursor.
1600    /// - **Valkey**: wraps the RESP3 `ff:dag:completions` pubsub
1601    ///   subscriber. Pubsub is at-most-once over the live
1602    ///   subscription window; the cursor is always the empty
1603    ///   sentinel. If you need at-least-once replay with durable
1604    ///   cursor resume, use the Postgres backend (see
1605    ///   `docs/POSTGRES_PARITY_MATRIX.md` row `subscribe_completion`).
1606    ///
1607    /// `filter` (#282): see [`Self::subscribe_lease_history`]. Valkey
1608    /// reuses the `subscribe_completions_filtered` per-event HGET
1609    /// gate; Postgres filters inline against the outbox's denormalised
1610    /// `instance_tag` column.
1611    async fn subscribe_completion(
1612        &self,
1613        _cursor: crate::stream_subscribe::StreamCursor,
1614        _filter: &crate::backend::ScannerFilter,
1615    ) -> Result<crate::stream_events::CompletionSubscription, EngineError> {
1616        Err(EngineError::Unavailable {
1617            op: "subscribe_completion",
1618        })
1619    }
1620
1621    /// Subscribe to signal-delivery events (satisfied / buffered /
1622    /// deduped).
1623    ///
1624    /// `filter` (#282): see [`Self::subscribe_lease_history`].
1625    async fn subscribe_signal_delivery(
1626        &self,
1627        _cursor: crate::stream_subscribe::StreamCursor,
1628        _filter: &crate::backend::ScannerFilter,
1629    ) -> Result<crate::stream_events::SignalDeliverySubscription, EngineError> {
1630        Err(EngineError::Unavailable {
1631            op: "subscribe_signal_delivery",
1632        })
1633    }
1634
1635    /// Subscribe to instance-tag events (tag attached / cleared).
1636    ///
1637    /// Producer wiring is deferred per #311 audit ("no concrete
1638    /// demand"); the trait method exists for API uniformity across
1639    /// the four families. Backends currently return
1640    /// `EngineError::Unavailable`.
1641    async fn subscribe_instance_tags(
1642        &self,
1643        _cursor: crate::stream_subscribe::StreamCursor,
1644    ) -> Result<crate::stream_events::InstanceTagSubscription, EngineError> {
1645        Err(EngineError::Unavailable {
1646            op: "subscribe_instance_tags",
1647        })
1648    }
1649}
1650
1651/// Object-safety assertion: `dyn EngineBackend` compiles iff every
1652/// method is dyn-compatible. Kept as a compile-time guard so a future
1653/// trait change that accidentally breaks dyn-safety fails the build
1654/// at this site rather than at every downstream `Arc<dyn
1655/// EngineBackend>` use.
1656#[allow(dead_code)]
1657fn _assert_dyn_compatible(_: &dyn EngineBackend) {}
1658
1659/// Polling interval for [`wait_for_flow_cancellation`]. Tight enough
1660/// that a local single-node cancel cascade observes `cancelled` within
1661/// one or two polls; slack enough that a `WaitIndefinite` caller does
1662/// not hammer `describe_flow` on a live cluster.
1663const CANCEL_WAIT_POLL_INTERVAL: Duration = Duration::from_millis(100);
1664
1665/// Defensive ceiling for [`CancelFlowWait::WaitIndefinite`] — if the
1666/// reconciler cascade has not converged in five minutes, something is
1667/// wedged and returning `Timeout` is strictly more useful than blocking
1668/// forever. RFC-012 §3.1.1 expects real-world cascades to finish within
1669/// `reconciler_interval + grace`, which is orders of magnitude below
1670/// this.
1671const CANCEL_WAIT_INDEFINITE_CEILING: Duration = Duration::from_secs(300);
1672
1673/// Poll `backend.describe_flow(flow_id)` until `public_flow_state` is
1674/// `"cancelled"` or `deadline` elapses.
1675///
1676/// Shared by every backend's `cancel_flow` trait impl that honours
1677/// [`CancelFlowWait::WaitTimeout`] / [`CancelFlowWait::WaitIndefinite`].
1678/// The underlying `cancel_flow` FCALL / SQL transaction flips the
1679/// flow-level state synchronously; member cancellations dispatch
1680/// asynchronously via the reconciler, which also flips
1681/// `public_flow_state` to `cancelled` once the cascade completes. This
1682/// helper waits for that terminal flip.
1683///
1684/// Returns:
1685/// * `Ok(())` once `public_flow_state = "cancelled"` is observed.
1686/// * `Err(EngineError::Timeout { op: "cancel_flow", elapsed })` when
1687///   `deadline` elapses first. `elapsed` is the wait budget (the
1688///   requested timeout), not wall-clock precision.
1689/// * `Err(e)` if `describe_flow` itself errors (propagated).
1690pub async fn wait_for_flow_cancellation<B: EngineBackend + ?Sized>(
1691    backend: &B,
1692    flow_id: &crate::types::FlowId,
1693    deadline: Duration,
1694) -> Result<(), EngineError> {
1695    let start = std::time::Instant::now();
1696    loop {
1697        match backend.describe_flow(flow_id).await? {
1698            Some(snap) if snap.public_flow_state == "cancelled" => return Ok(()),
1699            // `None` (flow removed) is also terminal from the caller's
1700            // perspective — nothing left to wait on.
1701            None => return Ok(()),
1702            Some(_) => {}
1703        }
1704        if start.elapsed() >= deadline {
1705            return Err(EngineError::Timeout {
1706                op: "cancel_flow",
1707                elapsed: deadline,
1708            });
1709        }
1710        tokio::time::sleep(CANCEL_WAIT_POLL_INTERVAL).await;
1711    }
1712}
1713
1714/// Convert a [`CancelFlowWait`] into the deadline passed to
1715/// [`wait_for_flow_cancellation`]. `NoWait` returns `None` — the caller
1716/// must skip the wait entirely.
1717pub fn cancel_flow_wait_deadline(wait: CancelFlowWait) -> Option<Duration> {
1718    // `CancelFlowWait` is `#[non_exhaustive]`; this match lives in the
1719    // defining crate so the exhaustiveness check keeps the compiler
1720    // honest. Future variants must be wired here explicitly.
1721    match wait {
1722        CancelFlowWait::NoWait => None,
1723        CancelFlowWait::WaitTimeout(d) => Some(d),
1724        CancelFlowWait::WaitIndefinite => Some(CANCEL_WAIT_INDEFINITE_CEILING),
1725    }
1726}
1727
1728#[cfg(test)]
1729mod tests {
1730    use super::*;
1731
1732    /// A zero-state backend stub used to exercise the default
1733    /// `capabilities()` impl without pulling in a real
1734    /// transport. Only the default method is under test here; every
1735    /// other method is unreachable on this type.
1736    struct DefaultBackend;
1737
1738    #[async_trait]
1739    impl EngineBackend for DefaultBackend {
1740        async fn claim(
1741            &self,
1742            _lane: &LaneId,
1743            _capabilities: &CapabilitySet,
1744            _policy: ClaimPolicy,
1745        ) -> Result<Option<Handle>, EngineError> {
1746            unreachable!()
1747        }
1748        async fn renew(&self, _handle: &Handle) -> Result<LeaseRenewal, EngineError> {
1749            unreachable!()
1750        }
1751        async fn progress(
1752            &self,
1753            _handle: &Handle,
1754            _percent: Option<u8>,
1755            _message: Option<String>,
1756        ) -> Result<(), EngineError> {
1757            unreachable!()
1758        }
1759        async fn append_frame(
1760            &self,
1761            _handle: &Handle,
1762            _frame: Frame,
1763        ) -> Result<AppendFrameOutcome, EngineError> {
1764            unreachable!()
1765        }
1766        async fn complete(
1767            &self,
1768            _handle: &Handle,
1769            _payload: Option<Vec<u8>>,
1770        ) -> Result<(), EngineError> {
1771            unreachable!()
1772        }
1773        async fn fail(
1774            &self,
1775            _handle: &Handle,
1776            _reason: FailureReason,
1777            _classification: FailureClass,
1778        ) -> Result<FailOutcome, EngineError> {
1779            unreachable!()
1780        }
1781        async fn cancel(&self, _handle: &Handle, _reason: &str) -> Result<(), EngineError> {
1782            unreachable!()
1783        }
1784        async fn suspend(
1785            &self,
1786            _handle: &Handle,
1787            _args: SuspendArgs,
1788        ) -> Result<SuspendOutcome, EngineError> {
1789            unreachable!()
1790        }
1791        async fn create_waitpoint(
1792            &self,
1793            _handle: &Handle,
1794            _waitpoint_key: &str,
1795            _expires_in: Duration,
1796        ) -> Result<PendingWaitpoint, EngineError> {
1797            unreachable!()
1798        }
1799        async fn observe_signals(
1800            &self,
1801            _handle: &Handle,
1802        ) -> Result<Vec<ResumeSignal>, EngineError> {
1803            unreachable!()
1804        }
1805        async fn claim_from_resume_grant(
1806            &self,
1807            _token: ResumeToken,
1808        ) -> Result<Option<Handle>, EngineError> {
1809            unreachable!()
1810        }
1811        async fn delay(
1812            &self,
1813            _handle: &Handle,
1814            _delay_until: TimestampMs,
1815        ) -> Result<(), EngineError> {
1816            unreachable!()
1817        }
1818        async fn wait_children(&self, _handle: &Handle) -> Result<(), EngineError> {
1819            unreachable!()
1820        }
1821        async fn describe_execution(
1822            &self,
1823            _id: &ExecutionId,
1824        ) -> Result<Option<ExecutionSnapshot>, EngineError> {
1825            unreachable!()
1826        }
1827        async fn read_execution_context(
1828            &self,
1829            _execution_id: &ExecutionId,
1830        ) -> Result<ExecutionContext, EngineError> {
1831            Ok(ExecutionContext::new(
1832                Vec::new(),
1833                String::new(),
1834                std::collections::HashMap::new(),
1835            ))
1836        }
1837        async fn read_current_attempt_index(
1838            &self,
1839            _execution_id: &ExecutionId,
1840        ) -> Result<AttemptIndex, EngineError> {
1841            Ok(AttemptIndex::new(0))
1842        }
1843        async fn read_total_attempt_count(
1844            &self,
1845            _execution_id: &ExecutionId,
1846        ) -> Result<AttemptIndex, EngineError> {
1847            Ok(AttemptIndex::new(0))
1848        }
1849        async fn describe_flow(
1850            &self,
1851            _id: &FlowId,
1852        ) -> Result<Option<FlowSnapshot>, EngineError> {
1853            unreachable!()
1854        }
1855        #[cfg(feature = "core")]
1856        async fn list_edges(
1857            &self,
1858            _flow_id: &FlowId,
1859            _direction: EdgeDirection,
1860        ) -> Result<Vec<EdgeSnapshot>, EngineError> {
1861            unreachable!()
1862        }
1863        #[cfg(feature = "core")]
1864        async fn describe_edge(
1865            &self,
1866            _flow_id: &FlowId,
1867            _edge_id: &EdgeId,
1868        ) -> Result<Option<EdgeSnapshot>, EngineError> {
1869            unreachable!()
1870        }
1871        #[cfg(feature = "core")]
1872        async fn resolve_execution_flow_id(
1873            &self,
1874            _eid: &ExecutionId,
1875        ) -> Result<Option<FlowId>, EngineError> {
1876            unreachable!()
1877        }
1878        #[cfg(feature = "core")]
1879        async fn list_flows(
1880            &self,
1881            _partition: PartitionKey,
1882            _cursor: Option<FlowId>,
1883            _limit: usize,
1884        ) -> Result<ListFlowsPage, EngineError> {
1885            unreachable!()
1886        }
1887        #[cfg(feature = "core")]
1888        async fn list_lanes(
1889            &self,
1890            _cursor: Option<LaneId>,
1891            _limit: usize,
1892        ) -> Result<ListLanesPage, EngineError> {
1893            unreachable!()
1894        }
1895        #[cfg(feature = "core")]
1896        async fn list_suspended(
1897            &self,
1898            _partition: PartitionKey,
1899            _cursor: Option<ExecutionId>,
1900            _limit: usize,
1901        ) -> Result<ListSuspendedPage, EngineError> {
1902            unreachable!()
1903        }
1904        #[cfg(feature = "core")]
1905        async fn list_executions(
1906            &self,
1907            _partition: PartitionKey,
1908            _cursor: Option<ExecutionId>,
1909            _limit: usize,
1910        ) -> Result<ListExecutionsPage, EngineError> {
1911            unreachable!()
1912        }
1913        #[cfg(feature = "core")]
1914        async fn deliver_signal(
1915            &self,
1916            _args: DeliverSignalArgs,
1917        ) -> Result<DeliverSignalResult, EngineError> {
1918            unreachable!()
1919        }
1920        #[cfg(feature = "core")]
1921        async fn claim_resumed_execution(
1922            &self,
1923            _args: ClaimResumedExecutionArgs,
1924        ) -> Result<ClaimResumedExecutionResult, EngineError> {
1925            unreachable!()
1926        }
1927        async fn cancel_flow(
1928            &self,
1929            _id: &FlowId,
1930            _policy: CancelFlowPolicy,
1931            _wait: CancelFlowWait,
1932        ) -> Result<CancelFlowResult, EngineError> {
1933            unreachable!()
1934        }
1935        #[cfg(feature = "core")]
1936        async fn set_edge_group_policy(
1937            &self,
1938            _flow_id: &FlowId,
1939            _downstream_execution_id: &ExecutionId,
1940            _policy: crate::contracts::EdgeDependencyPolicy,
1941        ) -> Result<crate::contracts::SetEdgeGroupPolicyResult, EngineError> {
1942            unreachable!()
1943        }
1944        async fn report_usage(
1945            &self,
1946            _handle: &Handle,
1947            _budget: &BudgetId,
1948            _dimensions: crate::backend::UsageDimensions,
1949        ) -> Result<ReportUsageResult, EngineError> {
1950            unreachable!()
1951        }
1952        #[cfg(feature = "streaming")]
1953        async fn read_stream(
1954            &self,
1955            _execution_id: &ExecutionId,
1956            _attempt_index: AttemptIndex,
1957            _from: StreamCursor,
1958            _to: StreamCursor,
1959            _count_limit: u64,
1960        ) -> Result<StreamFrames, EngineError> {
1961            unreachable!()
1962        }
1963        #[cfg(feature = "streaming")]
1964        async fn tail_stream(
1965            &self,
1966            _execution_id: &ExecutionId,
1967            _attempt_index: AttemptIndex,
1968            _after: StreamCursor,
1969            _block_ms: u64,
1970            _count_limit: u64,
1971            _visibility: TailVisibility,
1972        ) -> Result<StreamFrames, EngineError> {
1973            unreachable!()
1974        }
1975        #[cfg(feature = "streaming")]
1976        async fn read_summary(
1977            &self,
1978            _execution_id: &ExecutionId,
1979            _attempt_index: AttemptIndex,
1980        ) -> Result<Option<SummaryDocument>, EngineError> {
1981            unreachable!()
1982        }
1983    }
1984
1985    /// The default `capabilities()` impl returns a value tagged
1986    /// `family = "unknown"` with every `supports.*` bool false, so
1987    /// pre-RFC-018 out-of-tree backends keep compiling and consumers
1988    /// can distinguish "backend predates RFC-018" from "backend
1989    /// reports concrete bools." Every concrete in-tree backend
1990    /// overrides.
1991    #[test]
1992    fn default_capabilities_is_unknown_family_all_false() {
1993        let b = DefaultBackend;
1994        let caps = b.capabilities();
1995        assert_eq!(caps.identity.family, "unknown");
1996        assert_eq!(
1997            caps.identity.version,
1998            crate::capability::Version::new(0, 0, 0)
1999        );
2000        assert_eq!(caps.identity.rfc017_stage, "unknown");
2001        // Every field false on the default (matches `Supports::none()`).
2002        assert_eq!(caps.supports, crate::capability::Supports::none());
2003    }
2004}