Skip to main content

ff_core/
engine_backend.rs

1//! The `EngineBackend` trait — abstracting FlowFabric's write surface.
2//!
3//! **RFC-012 Stage 1a:** this is the trait landing. The
4//! Valkey-backed impl lives in `ff-backend-valkey`; future backends
5//! (Postgres) add a sibling crate with their own impl. ff-sdk's
6//! `FlowFabricWorker` gains `connect_with(backend)` /
7//! `backend(&self)` accessors so consumers that want to bring their
8//! own backend (tests, future non-Valkey deployments) can hand one
9//! in. The hot-path migration of `ClaimedTask` / `FlowFabricWorker`
10//! to forward through the trait lands across Stages 1b-1d.
11//!
12//! # Object safety
13//!
14//! `EngineBackend` is object-safe: all methods are `async fn` behind
15//! `#[async_trait]` and take `&self`. Consumers can hold
16//! `Arc<dyn EngineBackend>` for heterogenous-backend deployments.
17//! The trait is `Send + Sync + 'static` per RFC-012 §4.1; every impl
18//! must honour that bound.
19//!
20//! # Error surface
21//!
22//! Every method returns [`Result<_, EngineError>`]. `EngineError`'s
23//! `Transport` variant carries a boxed `dyn Error + Send + Sync`;
24//! Valkey-backed transport faults box a
25//! `ff_script::error::ScriptError` (downcast via
26//! `ff_script::engine_error_ext::transport_script_ref`). Other
27//! backends box their native error type and set the `backend` tag
28//! accordingly.
29//!
30//! # Atomicity contract
31//!
32//! Per-op state transitions MUST be atomic (RFC-012 §3.4). On Valkey
33//! this is the single-FCALL-per-op property; on Postgres it is the
34//! per-transaction property. A backend that cannot honour atomicity
35//! for a given op either MUST NOT implement `EngineBackend` or MUST
36//! return `EngineError::Unavailable { op }` for the affected method.
37//!
38//! # Replay semantics
39//!
40//! `complete`, `fail`, `cancel`, `suspend`, `delay`, `wait_children`
41//! are idempotent under replay — calling twice with the same handle
42//! and args returns the same outcome (success on first call, typed
43//! `State` / `Contention` on subsequent calls where the fence triple
44//! no longer matches a live lease).
45
46use std::time::Duration;
47
48use async_trait::async_trait;
49
50use crate::backend::{
51    AppendFrameOutcome, CancelFlowPolicy, CancelFlowWait, CapabilitySet, ClaimPolicy,
52    FailOutcome, FailureClass, FailureReason, Frame, Handle, LeaseRenewal, PendingWaitpoint,
53    PrepareOutcome, ReclaimToken, ResumeSignal, SummaryDocument, TailVisibility,
54};
55use crate::contracts::{
56    CancelFlowResult, ExecutionSnapshot, FlowSnapshot, ReportUsageResult,
57    RotateWaitpointHmacSecretAllArgs, RotateWaitpointHmacSecretAllResult, SeedOutcome,
58    SeedWaitpointHmacSecretArgs, SuspendArgs, SuspendOutcome,
59};
60#[cfg(feature = "core")]
61use crate::contracts::{
62    AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
63    ApplyDependencyToChildResult, BudgetStatus, CancelExecutionArgs, CancelExecutionResult,
64    CancelFlowArgs, ChangePriorityArgs, ChangePriorityResult, ClaimForWorkerArgs,
65    ClaimForWorkerOutcome, ClaimResumedExecutionArgs, ClaimResumedExecutionResult,
66    CreateBudgetArgs, CreateBudgetResult, CreateExecutionArgs, CreateExecutionResult,
67    CreateFlowArgs, CreateFlowResult, CreateQuotaPolicyArgs, CreateQuotaPolicyResult,
68    DeliverSignalArgs, DeliverSignalResult, EdgeDirection, EdgeSnapshot, ExecutionInfo,
69    ListExecutionsPage, ListFlowsPage, ListLanesPage, ListPendingWaitpointsArgs,
70    ListPendingWaitpointsResult, ListSuspendedPage, ReplayExecutionArgs, ReplayExecutionResult,
71    ReportUsageAdminArgs, ResetBudgetArgs, ResetBudgetResult, RevokeLeaseArgs, RevokeLeaseResult,
72    StageDependencyEdgeArgs, StageDependencyEdgeResult,
73};
74#[cfg(feature = "core")]
75use crate::state::PublicState;
76#[cfg(feature = "core")]
77use crate::partition::PartitionKey;
78#[cfg(feature = "streaming")]
79use crate::contracts::{StreamCursor, StreamFrames};
80use crate::engine_error::EngineError;
81#[cfg(feature = "streaming")]
82use crate::types::AttemptIndex;
83#[cfg(feature = "core")]
84use crate::types::EdgeId;
85use crate::types::{BudgetId, ExecutionId, FlowId, LaneId, TimestampMs};
86
87/// The engine write surface — a single trait a backend implementation
88/// honours to serve a `FlowFabricWorker`.
89///
90/// See RFC-012 §3.1 for the inventory rationale and §3.3 for the
91/// type-level shape. 16 methods (Round-7 added `create_waitpoint`;
92/// `append_frame` return widened; `report_usage` return replaced —
93/// RFC-012 §R7). Issue #150 added the two trigger-surface methods
94/// (`deliver_signal` / `claim_resumed_execution`).
95///
96/// # Note on `complete` payload shape
97///
98/// The RFC §3.3 sketch uses `Option<Bytes>`; the Stage 1a trait uses
99/// `Option<Vec<u8>>` to match the existing
100/// `ff_sdk::ClaimedTask::complete` signature and avoid adding a
101/// `bytes` public-type dep for zero consumer benefit. Round-4 §7.17
102/// resolved the payload container debate to `Box<[u8]>` in the
103/// public type (see `HandleOpaque`); `Option<Vec<u8>>` is the
104/// zero-churn choice consistent with today's code. Consumers that
105/// need `&[u8]` can borrow via `.as_deref()` on the Option.
106#[async_trait]
107pub trait EngineBackend: Send + Sync + 'static {
108    // ── Claim + lifecycle ──
109
110    /// Fresh-work claim. Returns `Ok(None)` when no work is currently
111    /// available; `Err` only on transport or input-validation faults.
112    async fn claim(
113        &self,
114        lane: &LaneId,
115        capabilities: &CapabilitySet,
116        policy: ClaimPolicy,
117    ) -> Result<Option<Handle>, EngineError>;
118
119    /// Renew a held lease. Returns the updated expiry + epoch on
120    /// success; typed `State::StaleLease` / `State::LeaseExpired`
121    /// when the lease has been stolen or timed out.
122    async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError>;
123
124    /// Numeric-progress heartbeat.
125    ///
126    /// Writes scalar `progress_percent` / `progress_message` fields on
127    /// `exec_core`; each call overwrites the previous value. This does
128    /// NOT append to the output stream — stream-frame producers must use
129    /// [`append_frame`](Self::append_frame) instead.
130    async fn progress(
131        &self,
132        handle: &Handle,
133        percent: Option<u8>,
134        message: Option<String>,
135    ) -> Result<(), EngineError>;
136
137    /// Append one stream frame. Distinct from [`progress`](Self::progress)
138    /// per RFC-012 §3.1.1 K#6. Returns the backend-assigned stream entry
139    /// id and post-append frame count (RFC-012 §R7.2.1).
140    ///
141    /// Stream-frame producers (arbitrary `frame_type` + payload, consumed
142    /// via the read/tail surfaces) MUST use this method rather than
143    /// [`progress`](Self::progress); the latter updates scalar fields on
144    /// `exec_core` and is invisible to stream consumers.
145    async fn append_frame(
146        &self,
147        handle: &Handle,
148        frame: Frame,
149    ) -> Result<AppendFrameOutcome, EngineError>;
150
151    /// Terminal success. Borrows `handle` (round-4 M-D2) so callers
152    /// can retry under `EngineError::Transport` without losing the
153    /// cookie. Payload is `Option<Vec<u8>>` per the note above.
154    async fn complete(&self, handle: &Handle, payload: Option<Vec<u8>>) -> Result<(), EngineError>;
155
156    /// Terminal failure with classification. Returns [`FailOutcome`]
157    /// so the caller learns whether a retry was scheduled.
158    async fn fail(
159        &self,
160        handle: &Handle,
161        reason: FailureReason,
162        classification: FailureClass,
163    ) -> Result<FailOutcome, EngineError>;
164
165    /// Cooperative cancel by the worker holding the lease.
166    async fn cancel(&self, handle: &Handle, reason: &str) -> Result<(), EngineError>;
167
168    /// Suspend the execution awaiting a typed resume condition
169    /// (RFC-013 Stage 1d).
170    ///
171    /// Borrows `handle` (round-4 M-D2). Terminal-looking behaviour is
172    /// expressed through [`SuspendOutcome`]:
173    ///
174    /// * [`SuspendOutcome::Suspended`] — the pre-suspend handle is
175    ///   logically invalidated; the fresh `HandleKind::Suspended`
176    ///   handle inside the variant supersedes it. Runtime enforcement
177    ///   via the fence triple: subsequent ops against the stale handle
178    ///   surface as `Contention(LeaseConflict)`.
179    /// * [`SuspendOutcome::AlreadySatisfied`] — buffered signals on a
180    ///   pending waitpoint already matched the resume condition at
181    ///   suspension time. The lease is NOT released; the caller's
182    ///   pre-suspend handle remains valid.
183    ///
184    /// See RFC-013 §2 for the type shapes, §3 for the replay /
185    /// idempotency contract, §4 for the error taxonomy.
186    async fn suspend(
187        &self,
188        handle: &Handle,
189        args: SuspendArgs,
190    ) -> Result<SuspendOutcome, EngineError>;
191
192    /// Issue a pending waitpoint for future signal delivery.
193    ///
194    /// Waitpoints have two states in the Valkey wire contract:
195    /// **pending** (token issued, not yet backing a suspension) and
196    /// **active** (bound to a suspension). This method creates a
197    /// waitpoint in the **pending** state. A later `suspend` call
198    /// transitions a pending waitpoint to active (see Lua
199    /// `use_pending_waitpoint` ARGV flag at
200    /// `flowfabric.lua:3603,3641,3690`) — or, if buffered signals
201    /// already satisfy its condition, the suspend call returns
202    /// `SuspendOutcome::AlreadySatisfied` and the waitpoint activates
203    /// without ever releasing the lease.
204    ///
205    /// Pending-waitpoint expiry is a first-class terminal error on
206    /// the wire (`PendingWaitpointExpired` at
207    /// `ff-script/src/error.rs:170,403-408`). The attempt retains its
208    /// lease while the waitpoint is pending; signals delivered to
209    /// this waitpoint are buffered server-side (RFC-012 §R7.2.2).
210    async fn create_waitpoint(
211        &self,
212        handle: &Handle,
213        waitpoint_key: &str,
214        expires_in: Duration,
215    ) -> Result<PendingWaitpoint, EngineError>;
216
217    /// Non-mutating observation of signals that satisfied the handle's
218    /// resume condition.
219    async fn observe_signals(&self, handle: &Handle) -> Result<Vec<ResumeSignal>, EngineError>;
220
221    /// Consume a reclaim grant to mint a resumed-kind handle. Returns
222    /// `Ok(None)` when the grant's target execution is no longer
223    /// resumable (already reclaimed, terminal, etc.).
224    async fn claim_from_reclaim(&self, token: ReclaimToken) -> Result<Option<Handle>, EngineError>;
225
226    // Round-5 amendment: lease-releasing peers of `suspend`.
227
228    /// Park the execution until `delay_until`, releasing the lease.
229    async fn delay(&self, handle: &Handle, delay_until: TimestampMs) -> Result<(), EngineError>;
230
231    /// Mark the execution as waiting for its child flow to complete,
232    /// releasing the lease.
233    async fn wait_children(&self, handle: &Handle) -> Result<(), EngineError>;
234
235    // ── Read / admin ──
236
237    /// Snapshot an execution by id. `Ok(None)` ⇒ no such execution.
238    async fn describe_execution(
239        &self,
240        id: &ExecutionId,
241    ) -> Result<Option<ExecutionSnapshot>, EngineError>;
242
243    /// Snapshot a flow by id. `Ok(None)` ⇒ no such flow.
244    async fn describe_flow(&self, id: &FlowId) -> Result<Option<FlowSnapshot>, EngineError>;
245
246    /// List dependency edges adjacent to an execution. Read-only; the
247    /// backend resolves the subject execution's flow, reads the
248    /// direction-specific adjacency SET, and decodes each member's
249    /// flow-scoped `edge:<edge_id>` hash.
250    ///
251    /// Returns an empty `Vec` when the subject has no edges on the
252    /// requested side — including standalone executions (no owning
253    /// flow). Ordering is unspecified: the underlying adjacency SET
254    /// is an unordered SMEMBERS read. Callers that need deterministic
255    /// order should sort by [`EdgeSnapshot::edge_id`] /
256    /// [`EdgeSnapshot::created_at`] themselves.
257    ///
258    /// Parse failures on the edge hash surface as
259    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]
260    /// — unknown fields, missing required fields, endpoint mismatches
261    /// against the adjacency SET all fail loud rather than silently
262    /// returning partial results.
263    ///
264    /// Gated on the `core` feature — edge reads are part of the
265    /// minimal engine surface a Postgres-style backend must honour.
266    ///
267    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
268    #[cfg(feature = "core")]
269    async fn list_edges(
270        &self,
271        flow_id: &FlowId,
272        direction: EdgeDirection,
273    ) -> Result<Vec<EdgeSnapshot>, EngineError>;
274
275    /// Snapshot a single dependency edge by its owning flow + edge id.
276    ///
277    /// `Ok(None)` when the edge hash is absent (never staged, or
278    /// staged under a different flow than `flow_id`). Parse failures
279    /// on a present edge hash surface as
280    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]
281    /// — the stored `flow_id` field is cross-checked against the
282    /// caller's expected `flow_id` so a wrong-key read fails loud
283    /// rather than returning an unrelated edge.
284    ///
285    /// Gated on the `core` feature — single-edge reads are part of
286    /// the minimal snapshot surface an alternate backend must honour
287    /// alongside [`Self::describe_execution`] / [`Self::describe_flow`]
288    /// / [`Self::list_edges`].
289    ///
290    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
291    #[cfg(feature = "core")]
292    async fn describe_edge(
293        &self,
294        flow_id: &FlowId,
295        edge_id: &EdgeId,
296    ) -> Result<Option<EdgeSnapshot>, EngineError>;
297
298    /// Resolve an execution's owning flow id, if any.
299    ///
300    /// `Ok(None)` when the execution's core record is absent or has
301    /// no associated flow (standalone execution). A present-but-
302    /// malformed `flow_id` field surfaces as
303    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`].
304    ///
305    /// Gated on the `core` feature. Used by ff-sdk's
306    /// `list_outgoing_edges` / `list_incoming_edges` to pivot from a
307    /// consumer-supplied `ExecutionId` to the `FlowId` required by
308    /// [`Self::list_edges`]. A Valkey backend serves this with a
309    /// single `HGET exec_core flow_id`; a Postgres backend serves it
310    /// with the equivalent single-column row lookup.
311    ///
312    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
313    #[cfg(feature = "core")]
314    async fn resolve_execution_flow_id(
315        &self,
316        eid: &ExecutionId,
317    ) -> Result<Option<FlowId>, EngineError>;
318
319    /// List flows on a partition with cursor-based pagination (issue
320    /// #185).
321    ///
322    /// Returns a [`ListFlowsPage`] of [`FlowSummary`](crate::contracts::FlowSummary)
323    /// rows ordered by `flow_id` (UUID byte-lexicographic). `cursor`
324    /// is `None` for the first page; callers forward the returned
325    /// `next_cursor` verbatim to continue iteration, and the listing
326    /// is exhausted when `next_cursor` is `None`. `limit` is the
327    /// maximum number of rows to return on this page — implementations
328    /// MAY return fewer (end of partition) but MUST NOT exceed it.
329    ///
330    /// Ordering rationale: flow ids are UUIDs, and both Valkey
331    /// (sort after-the-fact) and Postgres (`ORDER BY flow_id`) can
332    /// agree on byte-lexicographic order — the same order
333    /// `FlowId::to_string()` produces for canonical hyphenated UUIDs.
334    /// Mapping to `cursor > flow_id` keeps the contract backend-
335    /// independent.
336    ///
337    /// # Postgres implementation pattern
338    ///
339    /// A Postgres-backed implementation serves this directly with
340    ///
341    /// ```sql
342    /// SELECT flow_id, created_at_ms, public_flow_state
343    ///   FROM ff_flow
344    ///  WHERE partition_key = $1
345    ///    AND ($2::uuid IS NULL OR flow_id > $2)
346    ///  ORDER BY flow_id
347    ///  LIMIT $3 + 1;
348    /// ```
349    ///
350    /// — reading one extra row to decide whether `next_cursor` should
351    /// be set to the last row's `flow_id`. The Valkey implementation
352    /// maintains the `ff:idx:{fp:N}:flow_index` SET and performs the
353    /// sort + slice client-side (SMEMBERS then sort-by-UUID-bytes),
354    /// pipelining `HGETALL flow_core` for each row on the page.
355    ///
356    /// Gated on the `core` feature — flow listing is part of the
357    /// minimal engine surface a Postgres-style backend must honour.
358    #[cfg(feature = "core")]
359    async fn list_flows(
360        &self,
361        partition: PartitionKey,
362        cursor: Option<FlowId>,
363        limit: usize,
364    ) -> Result<ListFlowsPage, EngineError>;
365
366    /// Enumerate registered lanes with cursor-based pagination.
367    ///
368    /// Lanes are global (not partition-scoped) — the backend serves
369    /// this from its lane registry and does NOT accept a
370    /// [`crate::partition::Partition`] argument. Results are sorted
371    /// by [`LaneId`] name so the ordering is stable across calls and
372    /// cursors address a deterministic position in the sort.
373    ///
374    /// * `cursor` — exclusive lower bound. `None` starts from the
375    ///   first lane. To continue a walk, pass the previous page's
376    ///   [`ListLanesPage::next_cursor`].
377    /// * `limit` — hard cap on the number of lanes returned in the
378    ///   page. Backends MAY round this down when the registry size
379    ///   is smaller; they MUST NOT return more than `limit`.
380    ///
381    /// [`ListLanesPage::next_cursor`] is `Some(last_lane_in_page)`
382    /// iff at least one more lane exists after the returned page,
383    /// and `None` on the final page. Callers loop until `next_cursor`
384    /// is `None` to read the full registry.
385    ///
386    /// Gated on the `core` feature — lane enumeration is part of the
387    /// minimal snapshot surface an alternate backend must honour
388    /// alongside [`Self::describe_flow`] / [`Self::list_edges`].
389    #[cfg(feature = "core")]
390    async fn list_lanes(
391        &self,
392        cursor: Option<LaneId>,
393        limit: usize,
394    ) -> Result<ListLanesPage, EngineError>;
395
396    /// List suspended executions in one partition, cursor-paginated,
397    /// with each entry's suspension `reason_code` populated (issue
398    /// #183).
399    ///
400    /// Consumer-facing "what's blocked on what?" panels (ff-board's
401    /// suspended-executions view, operator CLIs) need the reason in
402    /// the list response so the UI does not round-trip per row to
403    /// `describe_execution` for a field it knows it needs. `reason`
404    /// on [`SuspendedExecutionEntry`] carries the free-form
405    /// `suspension:current.reason_code` field — see the type rustdoc
406    /// for the String-not-enum rationale.
407    ///
408    /// `cursor` is opaque to callers; pass `None` to start a fresh
409    /// scan and feed the returned [`ListSuspendedPage::next_cursor`]
410    /// back in on subsequent pages until it comes back `None`.
411    /// `limit` bounds the `entries` count; backends MAY return fewer
412    /// when the partition is exhausted.
413    ///
414    /// Ordering is by ascending `suspended_at_ms` (the per-lane
415    /// suspended ZSET score == `timeout_at` or the no-timeout
416    /// sentinel) with execution id as a lex tiebreak, so cursor
417    /// continuation is deterministic across calls.
418    ///
419    /// Gated on the `core` feature — suspended-list enumeration is
420    /// part of the minimal engine surface a Postgres-style backend
421    /// must honour.
422    #[cfg(feature = "core")]
423    async fn list_suspended(
424        &self,
425        partition: PartitionKey,
426        cursor: Option<ExecutionId>,
427        limit: usize,
428    ) -> Result<ListSuspendedPage, EngineError>;
429
430    /// Forward-only paginated listing of the executions indexed under
431    /// one partition.
432    ///
433    /// Reads the partition-wide `ff:idx:{p:N}:all_executions` set,
434    /// sorts lexicographically on `ExecutionId`, and returns the page
435    /// of ids strictly greater than `cursor` (or starting from the
436    /// smallest id when `cursor = None`). The returned
437    /// [`ListExecutionsPage::next_cursor`] is the last id on the page
438    /// iff at least one more id exists past it; `None` signals
439    /// end-of-stream.
440    ///
441    /// `limit` is the maximum number of ids returned on this page. A
442    /// `limit` of `0` returns an empty page with `next_cursor = None`.
443    /// Backends MAY cap `limit` internally (Valkey: 1000) and return
444    /// fewer ids than requested; callers continue paginating until
445    /// `next_cursor == None`.
446    ///
447    /// Ordering is stable under concurrent inserts for already-emitted
448    /// ids (an id less-than-or-equal-to the caller's cursor is never
449    /// re-emitted in later pages) but new inserts past the cursor WILL
450    /// appear in subsequent pages — consistent with forward-only
451    /// cursor semantics.
452    ///
453    /// Gated on the `core` feature — partition-scoped listing is part
454    /// of the minimal engine surface every backend must honour.
455    #[cfg(feature = "core")]
456    async fn list_executions(
457        &self,
458        partition: PartitionKey,
459        cursor: Option<ExecutionId>,
460        limit: usize,
461    ) -> Result<ListExecutionsPage, EngineError>;
462
463    // ── Trigger ops (issue #150) ──
464
465    /// Deliver an external signal to a suspended execution's waitpoint.
466    ///
467    /// The backend atomically records the signal, evaluates the resume
468    /// condition, and — when satisfied — transitions the execution
469    /// from `suspended` to `runnable` (or buffers the signal when the
470    /// waitpoint is still `pending`). Duplicate delivery — same
471    /// `idempotency_key` + waitpoint — surfaces as
472    /// [`DeliverSignalResult::Duplicate`] with the pre-existing
473    /// `signal_id` rather than mutating state twice.
474    ///
475    /// Input validation (HMAC token presence, payload size limits,
476    /// signal-name shape) is the backend's responsibility; callers
477    /// pass a fully populated [`DeliverSignalArgs`] and receive typed
478    /// outcomes or typed errors (`ScriptError::invalid_token`,
479    /// `ScriptError::token_expired`, `ScriptError::ExecutionNotFound`
480    /// surfaced via [`EngineError::Transport`] on the Valkey backend).
481    ///
482    /// Gated on the `core` feature — signal delivery is part of the
483    /// minimal trigger surface every backend must honour so ff-server
484    /// / REST handlers can dispatch against `Arc<dyn EngineBackend>`
485    /// without knowing which backend is running underneath.
486    #[cfg(feature = "core")]
487    async fn deliver_signal(
488        &self,
489        args: DeliverSignalArgs,
490    ) -> Result<DeliverSignalResult, EngineError>;
491
492    /// Claim a resumed execution — a previously-suspended attempt that
493    /// has cleared its resume condition (e.g. via
494    /// [`Self::deliver_signal`]) and now needs a worker to pick up the
495    /// same attempt index.
496    ///
497    /// Distinct from [`Self::claim`] (fresh work) and
498    /// [`Self::claim_from_reclaim`] (grant-based ownership transfer
499    /// after a crash): the resumed-claim path re-binds an existing
500    /// attempt rather than minting a new one. The backend issues a
501    /// fresh `lease_id` + bumps the `lease_epoch`, preserving
502    /// `attempt_id` / `attempt_index` so stream frames and progress
503    /// updates continue on the same attempt.
504    ///
505    /// Typed failures surface via `ScriptError` → `EngineError`:
506    /// `NotAResumedExecution` when the attempt state is not
507    /// `attempt_interrupted`, `ExecutionNotLeaseable` when the
508    /// lifecycle phase is not `runnable`, and `InvalidClaimGrant`
509    /// when the grant key is missing or was already consumed.
510    ///
511    /// Gated on the `core` feature — resumed-claim is part of the
512    /// minimal trigger surface every backend must honour.
513    #[cfg(feature = "core")]
514    async fn claim_resumed_execution(
515        &self,
516        args: ClaimResumedExecutionArgs,
517    ) -> Result<ClaimResumedExecutionResult, EngineError>;
518
519    /// Operator-initiated cancellation of a flow and (optionally) its
520    /// member executions. See RFC-012 §3.1.1 for the policy /wait
521    /// matrix.
522    async fn cancel_flow(
523        &self,
524        id: &FlowId,
525        policy: CancelFlowPolicy,
526        wait: CancelFlowWait,
527    ) -> Result<CancelFlowResult, EngineError>;
528
529    /// RFC-016 Stage A: set the inbound-edge-group policy for a
530    /// downstream execution. Must be called before the first
531    /// `add_dependency(... -> downstream_execution_id)` — the backend
532    /// rejects with [`EngineError::Conflict`] if edges have already
533    /// been staged for this group.
534    ///
535    /// Stage A honours only
536    /// [`EdgeDependencyPolicy::AllOf`](crate::contracts::EdgeDependencyPolicy::AllOf);
537    /// the `AnyOf` / `Quorum` variants return
538    /// [`EngineError::Validation`] with
539    /// `detail = "stage A supports AllOf only; AnyOf/Quorum land in stage B"`
540    /// until Stage B's resolver lands.
541    #[cfg(feature = "core")]
542    async fn set_edge_group_policy(
543        &self,
544        flow_id: &FlowId,
545        downstream_execution_id: &ExecutionId,
546        policy: crate::contracts::EdgeDependencyPolicy,
547    ) -> Result<crate::contracts::SetEdgeGroupPolicyResult, EngineError>;
548
549    // ── HMAC secret rotation (v0.7 migration-master Q4) ──
550
551    /// Rotate the waitpoint HMAC signing kid **cluster-wide**.
552    ///
553    /// **v0.7 migration-master Q4 (adjudicated 2026-04-24).**
554    /// Additive trait surface so Valkey and Postgres backends can
555    /// both expose the "rotate everywhere" semantic under one name.
556    ///
557    /// * Valkey impl fans out an `ff_rotate_waitpoint_hmac_secret`
558    ///   FCALL per execution partition. `entries.len() == num_flow_partitions`
559    ///   and per-partition failures are surfaced as inner `Err`
560    ///   entries — the call as a whole does not fail when one
561    ///   partition's FCALL fails, matching
562    ///   [`ff_sdk::admin::rotate_waitpoint_hmac_secret_all_partitions`]'s
563    ///   partial-success contract.
564    /// * Postgres impl (Wave 4) writes one row to
565    ///   `ff_waitpoint_hmac(kid, secret, rotated_at)` and returns a
566    ///   single-entry vec with `partition = 0`.
567    ///
568    /// The default impl returns
569    /// [`EngineError::Unavailable`] with
570    /// `op = "rotate_waitpoint_hmac_secret_all"` so backends that
571    /// haven't implemented the method surface the miss loudly rather
572    /// than silently no-op'ing. Both concrete backends override.
573    async fn rotate_waitpoint_hmac_secret_all(
574        &self,
575        _args: RotateWaitpointHmacSecretAllArgs,
576    ) -> Result<RotateWaitpointHmacSecretAllResult, EngineError> {
577        Err(EngineError::Unavailable {
578            op: "rotate_waitpoint_hmac_secret_all",
579        })
580    }
581
582    /// Seed the initial waitpoint HMAC secret for a fresh deployment
583    /// (issue #280).
584    ///
585    /// **Idempotent.** If a `current_kid` (Valkey per-partition) or
586    /// an active kid row (Postgres) already exists with the given
587    /// `kid`, the method returns
588    /// [`SeedOutcome::AlreadySeeded`] without overwriting, reporting
589    /// whether the stored secret matches the caller-supplied one via
590    /// `same_secret`. Callers (cairn boot, operator tooling) invoke
591    /// this on every boot and let the backend decide whether to
592    /// install — removing the client-side "check then HSET" race that
593    /// cairn's raw-HSET boot path silently tolerated.
594    ///
595    /// For rotation of an already-seeded secret, use
596    /// [`Self::rotate_waitpoint_hmac_secret_all`] instead; seed is
597    /// install-only.
598    ///
599    /// The default impl returns [`EngineError::Unavailable`] with
600    /// `op = "seed_waitpoint_hmac_secret"` so backends that haven't
601    /// implemented the method surface the miss loudly.
602    async fn seed_waitpoint_hmac_secret(
603        &self,
604        _args: SeedWaitpointHmacSecretArgs,
605    ) -> Result<SeedOutcome, EngineError> {
606        Err(EngineError::Unavailable {
607            op: "seed_waitpoint_hmac_secret",
608        })
609    }
610
611    // ── Budget ──
612
613    /// Report usage against a budget and check limits. Returns the
614    /// typed [`ReportUsageResult`] variant; backends enforce
615    /// idempotency via the caller-supplied
616    /// [`UsageDimensions::dedup_key`] (RFC-012 §R7.2.3 — replaces
617    /// the pre-Round-7 `AdmissionDecision` return).
618    async fn report_usage(
619        &self,
620        handle: &Handle,
621        budget: &BudgetId,
622        dimensions: crate::backend::UsageDimensions,
623    ) -> Result<ReportUsageResult, EngineError>;
624
625    // ── Stream reads (RFC-012 Stage 1c tranche-4; issue #87) ──
626
627    /// Read frames from a completed or in-flight attempt's stream.
628    ///
629    /// `from` / `to` are [`StreamCursor`] values — `StreamCursor::Start`
630    /// / `StreamCursor::End` are equivalent to XRANGE `-` / `+`, and
631    /// `StreamCursor::At("<id>")` reads from a concrete entry id.
632    ///
633    /// Input validation (count_limit bounds, cursor shape) is the
634    /// caller's responsibility — SDK-side wrappers in
635    /// [`ff-sdk`](https://docs.rs/ff-sdk) enforce bounds before
636    /// forwarding. Backends MAY additionally reject out-of-range
637    /// input via [`EngineError::Validation`].
638    ///
639    /// Gated on the `streaming` feature — stream reads are part of
640    /// the stream-subset surface a backend without XREAD-like
641    /// primitives may omit.
642    #[cfg(feature = "streaming")]
643    async fn read_stream(
644        &self,
645        execution_id: &ExecutionId,
646        attempt_index: AttemptIndex,
647        from: StreamCursor,
648        to: StreamCursor,
649        count_limit: u64,
650    ) -> Result<StreamFrames, EngineError>;
651
652    /// Tail a live attempt's stream.
653    ///
654    /// `after` is an exclusive [`StreamCursor`] — entries with id
655    /// strictly greater than `after` are returned. `StreamCursor::Start`
656    /// / `StreamCursor::End` are NOT accepted here; callers MUST pass
657    /// a concrete id (or `StreamCursor::from_beginning()`). The SDK
658    /// wrapper rejects the open markers before reaching the backend.
659    ///
660    /// `block_ms == 0` → non-blocking peek. `block_ms > 0` → blocks up
661    /// to that many ms for a new entry.
662    ///
663    /// `visibility` (RFC-015 §6.1) filters the returned entries by
664    /// their stored [`StreamMode`](crate::backend::StreamMode)
665    /// `mode` field. Default
666    /// [`TailVisibility::All`](crate::backend::TailVisibility::All)
667    /// preserves v1 behaviour.
668    ///
669    /// Gated on the `streaming` feature — see [`read_stream`](Self::read_stream).
670    #[cfg(feature = "streaming")]
671    async fn tail_stream(
672        &self,
673        execution_id: &ExecutionId,
674        attempt_index: AttemptIndex,
675        after: StreamCursor,
676        block_ms: u64,
677        count_limit: u64,
678        visibility: TailVisibility,
679    ) -> Result<StreamFrames, EngineError>;
680
681    /// Read the rolling summary document for an attempt (RFC-015 §6.3).
682    ///
683    /// Returns `Ok(None)` when no [`StreamMode::DurableSummary`](crate::backend::StreamMode::DurableSummary)
684    /// frame has ever been appended for the attempt. Non-blocking Hash
685    /// read; safe to call from any consumer without holding the lease.
686    ///
687    /// Gated on the `streaming` feature — summary reads are part of
688    /// the stream-subset surface.
689    #[cfg(feature = "streaming")]
690    async fn read_summary(
691        &self,
692        execution_id: &ExecutionId,
693        attempt_index: AttemptIndex,
694    ) -> Result<Option<SummaryDocument>, EngineError>;
695
696    // ── RFC-017 Stage A — Ingress (5) ──────────────────────────
697    //
698    // Every method in this block has a default impl returning
699    // `EngineError::Unavailable { op }` per RFC-017 §5.3. Concrete
700    // backends override each method with a real body. A missing
701    // override surfaces as a loud typed error at the call site rather
702    // than a silent no-op.
703
704    /// Create an execution. Ingress row 6 (RFC-017 §4). Wraps
705    /// `ff_create_execution` on Valkey; `INSERT INTO ff_execution ...`
706    /// on Postgres. The `idempotency_key` + backend-side default
707    /// `dedup_ttl_ms = 86400000` make duplicate submissions idempotent.
708    #[cfg(feature = "core")]
709    async fn create_execution(
710        &self,
711        _args: CreateExecutionArgs,
712    ) -> Result<CreateExecutionResult, EngineError> {
713        Err(EngineError::Unavailable {
714            op: "create_execution",
715        })
716    }
717
718    /// Create a flow header. Ingress row 5.
719    #[cfg(feature = "core")]
720    async fn create_flow(
721        &self,
722        _args: CreateFlowArgs,
723    ) -> Result<CreateFlowResult, EngineError> {
724        Err(EngineError::Unavailable { op: "create_flow" })
725    }
726
727    /// Atomically add an execution to a flow (single-FCALL co-located
728    /// commit on Valkey; single-transaction UPSERT on Postgres).
729    #[cfg(feature = "core")]
730    async fn add_execution_to_flow(
731        &self,
732        _args: AddExecutionToFlowArgs,
733    ) -> Result<AddExecutionToFlowResult, EngineError> {
734        Err(EngineError::Unavailable {
735            op: "add_execution_to_flow",
736        })
737    }
738
739    /// Stage a dependency edge between flow members. CAS-guarded on
740    /// `graph_revision` — stale rev returns `Contention(StaleGraphRevision)`.
741    #[cfg(feature = "core")]
742    async fn stage_dependency_edge(
743        &self,
744        _args: StageDependencyEdgeArgs,
745    ) -> Result<StageDependencyEdgeResult, EngineError> {
746        Err(EngineError::Unavailable {
747            op: "stage_dependency_edge",
748        })
749    }
750
751    /// Apply a staged dependency edge to its downstream child.
752    #[cfg(feature = "core")]
753    async fn apply_dependency_to_child(
754        &self,
755        _args: ApplyDependencyToChildArgs,
756    ) -> Result<ApplyDependencyToChildResult, EngineError> {
757        Err(EngineError::Unavailable {
758            op: "apply_dependency_to_child",
759        })
760    }
761
762    // ── RFC-017 Stage A — Operator control (4) ─────────────────
763
764    /// Operator-initiated execution cancel (row 2).
765    #[cfg(feature = "core")]
766    async fn cancel_execution(
767        &self,
768        _args: CancelExecutionArgs,
769    ) -> Result<CancelExecutionResult, EngineError> {
770        Err(EngineError::Unavailable {
771            op: "cancel_execution",
772        })
773    }
774
775    /// Re-score an execution's eligibility priority (row 17).
776    #[cfg(feature = "core")]
777    async fn change_priority(
778        &self,
779        _args: ChangePriorityArgs,
780    ) -> Result<ChangePriorityResult, EngineError> {
781        Err(EngineError::Unavailable {
782            op: "change_priority",
783        })
784    }
785
786    /// Replay a terminal execution (row 22). Variadic KEYS handling
787    /// (inbound-edge pre-read) is hidden inside the Valkey impl per
788    /// RFC-017 §4 row 3.
789    #[cfg(feature = "core")]
790    async fn replay_execution(
791        &self,
792        _args: ReplayExecutionArgs,
793    ) -> Result<ReplayExecutionResult, EngineError> {
794        Err(EngineError::Unavailable {
795            op: "replay_execution",
796        })
797    }
798
799    /// Operator-initiated lease revoke (row 19).
800    #[cfg(feature = "core")]
801    async fn revoke_lease(
802        &self,
803        _args: RevokeLeaseArgs,
804    ) -> Result<RevokeLeaseResult, EngineError> {
805        Err(EngineError::Unavailable { op: "revoke_lease" })
806    }
807
808    // ── RFC-017 Stage A — Budget + quota admin (5) ─────────────
809
810    /// Create a budget definition (row 6).
811    #[cfg(feature = "core")]
812    async fn create_budget(
813        &self,
814        _args: CreateBudgetArgs,
815    ) -> Result<CreateBudgetResult, EngineError> {
816        Err(EngineError::Unavailable {
817            op: "create_budget",
818        })
819    }
820
821    /// Reset a budget's usage counters (row 10).
822    #[cfg(feature = "core")]
823    async fn reset_budget(
824        &self,
825        _args: ResetBudgetArgs,
826    ) -> Result<ResetBudgetResult, EngineError> {
827        Err(EngineError::Unavailable { op: "reset_budget" })
828    }
829
830    /// Create a quota policy (row 7).
831    #[cfg(feature = "core")]
832    async fn create_quota_policy(
833        &self,
834        _args: CreateQuotaPolicyArgs,
835    ) -> Result<CreateQuotaPolicyResult, EngineError> {
836        Err(EngineError::Unavailable {
837            op: "create_quota_policy",
838        })
839    }
840
841    /// Read-only budget status for operator visibility (row 8).
842    #[cfg(feature = "core")]
843    async fn get_budget_status(
844        &self,
845        _id: &BudgetId,
846    ) -> Result<BudgetStatus, EngineError> {
847        Err(EngineError::Unavailable {
848            op: "get_budget_status",
849        })
850    }
851
852    /// Admin-path `report_usage` (row 9 + RFC-017 §5 round-1 F4).
853    /// Distinct from the existing [`Self::report_usage`] which takes
854    /// a worker handle — the admin path has no lease context.
855    #[cfg(feature = "core")]
856    async fn report_usage_admin(
857        &self,
858        _budget: &BudgetId,
859        _args: ReportUsageAdminArgs,
860    ) -> Result<ReportUsageResult, EngineError> {
861        Err(EngineError::Unavailable {
862            op: "report_usage_admin",
863        })
864    }
865
866    // ── RFC-017 Stage A — Read + diagnostics (3) ───────────────
867
868    /// Fetch the stored result payload for a completed execution
869    /// (row 4). Returns `Ok(None)` when the execution is missing, not
870    /// yet complete, or its payload was trimmed by retention policy.
871    async fn get_execution_result(
872        &self,
873        _id: &ExecutionId,
874    ) -> Result<Option<Vec<u8>>, EngineError> {
875        Err(EngineError::Unavailable {
876            op: "get_execution_result",
877        })
878    }
879
880    /// List the pending-or-active waitpoints for an execution, cursor
881    /// paginated (row 5 / §8). Stage A preserves the existing
882    /// `PendingWaitpointInfo` shape; Stage D ships the §8 HMAC
883    /// sanitisation + `(token_kid, token_fingerprint)` schema.
884    #[cfg(feature = "core")]
885    async fn list_pending_waitpoints(
886        &self,
887        _args: ListPendingWaitpointsArgs,
888    ) -> Result<ListPendingWaitpointsResult, EngineError> {
889        Err(EngineError::Unavailable {
890            op: "list_pending_waitpoints",
891        })
892    }
893
894    /// Backend-level reachability probe (row 1). Valkey: `PING`;
895    /// Postgres: `SELECT 1`.
896    async fn ping(&self) -> Result<(), EngineError> {
897        Err(EngineError::Unavailable { op: "ping" })
898    }
899
900    // ── RFC-017 Stage A — Scheduling (1) ───────────────────────
901
902    /// Scheduler-routed claim entrypoint (row 18, RFC-017 §7). Valkey
903    /// forwards to its `ff_scheduler::Scheduler` cursor; Postgres
904    /// forwards to `PostgresScheduler`'s `FOR UPDATE SKIP LOCKED`
905    /// path.
906    ///
907    /// Backends that carry an embedded scheduler (e.g. `ValkeyBackend`
908    /// constructed via `with_embedded_scheduler`, or `PostgresBackend`
909    /// with its `with_scanners` sibling) route the claim through it.
910    /// Backends without a wired scheduler return
911    /// [`EngineError::Unavailable`]. HTTP consumers use
912    /// `FlowFabricWorker::claim_via_server` instead.
913    #[cfg(feature = "core")]
914    async fn claim_for_worker(
915        &self,
916        _args: ClaimForWorkerArgs,
917    ) -> Result<ClaimForWorkerOutcome, EngineError> {
918        Err(EngineError::Unavailable {
919            op: "claim_for_worker",
920        })
921    }
922
923    // ── Cross-cutting (RFC-017 Stage B trait-lift) ──────────────
924
925    /// Static observability label identifying the backend family in
926    /// logs + metrics (RFC-017 §5.4 + §9 Stage B). Default impl
927    /// returns `"unknown"` so legacy `impl EngineBackend` blocks that
928    /// have not upgraded keep compiling; every in-tree backend
929    /// overrides — `ValkeyBackend` → `"valkey"`, `PostgresBackend` →
930    /// `"postgres"`.
931    fn backend_label(&self) -> &'static str {
932        "unknown"
933    }
934
935    /// RFC-018 Stage A: snapshot of this backend's identity + the
936    /// capability matrix it can actually service. Consumers use this
937    /// at startup to gate UI features / choose between alternative
938    /// code paths before dispatching. See
939    /// `rfcs/RFC-018-backend-capability-discovery.md` for the full
940    /// discovery contract and the four owner-adjudicated open
941    /// questions (granularity: coarse; version: struct; sync; no
942    /// event stream).
943    ///
944    /// Default: returns an empty matrix tagged `family = "unknown"`
945    /// so pre-RFC-018 out-of-tree backends keep compiling and
946    /// consumers treat "no rows" as "dispatch and catch
947    /// [`EngineError::Unavailable`]" (pre-RFC-018 behaviour).
948    /// Concrete in-tree backends (`ValkeyBackend`, `PostgresBackend`)
949    /// override to populate the real matrix.
950    ///
951    /// Sync (no `.await`): backend-static info should not require a
952    /// probe on every query. Dynamic probes happen once at
953    /// `connect*` time and cache the result.
954    fn capabilities_matrix(&self) -> crate::capability::CapabilityMatrix {
955        crate::capability::CapabilityMatrix::new(crate::capability::BackendIdentity::new(
956            "unknown",
957            crate::capability::Version::new(0, 0, 0),
958            "unknown",
959        ))
960    }
961
962    /// Issue #281: run one-time backend-specific boot preparation.
963    ///
964    /// Intended to run ONCE per deployment startup — NOT per request.
965    /// Idempotent and safe for consumers to call on every application
966    /// boot; backends that have nothing to do return
967    /// [`PrepareOutcome::NoOp`] without side effects.
968    ///
969    /// Per-backend behaviour:
970    ///
971    /// * **Valkey** — issues `FUNCTION LOAD REPLACE` for the
972    ///   `flowfabric` Lua library (with bounded retry on transient
973    ///   transport faults; permanent compile errors surface as
974    ///   [`EngineError::Transport`] without retry). Returns
975    ///   [`PrepareOutcome::Applied`] carrying
976    ///   `"FUNCTION LOAD (flowfabric lib v<N>)"`.
977    /// * **Postgres** — returns [`PrepareOutcome::NoOp`]. Schema
978    ///   migrations are applied out-of-band per
979    ///   `rfcs/drafts/v0.7-migration-master.md §Q12`; the backend
980    ///   runs a schema-version check at connect time and refuses to
981    ///   start on mismatch, so no boot-side prepare work remains.
982    /// * **Default impl** — returns [`PrepareOutcome::NoOp`] so
983    ///   out-of-tree backends without preparation work compile
984    ///   without boilerplate.
985    ///
986    /// # Relationship to the in-tree boot path
987    ///
988    /// `ValkeyBackend::initialize_deployment` (called from
989    /// `Server::start_with_metrics`) already invokes
990    /// [`ensure_library`](ff_script::loader::ensure_library) inline as
991    /// its step 4; that path is unchanged. `prepare()` exists as a
992    /// **trait-surface entry point** so consumers that construct an
993    /// `Arc<dyn EngineBackend>` outside of `Server` (e.g.
994    /// cairn-fabric's boot path at `cairn-fabric/src/boot.rs`) can
995    /// run the same preparation without reaching into
996    /// backend-specific modules. The overlap is intentional: calling
997    /// both `prepare()` and `initialize_deployment` is safe because
998    /// `FUNCTION LOAD REPLACE` is idempotent under the version
999    /// check.
1000    ///
1001    /// # Layer forwarding
1002    ///
1003    /// Layer impls (`HookedBackend`, ff-sdk layers) do NOT forward
1004    /// `prepare` today — consistent with `backend_label` / `ping` /
1005    /// `shutdown_prepare`. Consumers that wrap a backend in layers
1006    /// MUST call `prepare()` on the raw backend before wrapping, or
1007    /// accept the default [`PrepareOutcome::NoOp`].
1008    async fn prepare(&self) -> Result<PrepareOutcome, EngineError> {
1009        Ok(PrepareOutcome::NoOp)
1010    }
1011
1012    /// Drain-before-shutdown hook (RFC-017 §5.4). The server calls
1013    /// this before draining its own background tasks so backend-
1014    /// scoped primitives (Valkey stream semaphore, Postgres sqlx
1015    /// pool, …) can close their gates and await in-flight work up to
1016    /// `grace`.
1017    ///
1018    /// Default impl returns `Ok(())` — a no-op backend has nothing
1019    /// backend-scoped to drain. Concrete backends whose data plane
1020    /// owns resources (connection pools, semaphores, listeners)
1021    /// override with a real body.
1022    async fn shutdown_prepare(&self, _grace: Duration) -> Result<(), EngineError> {
1023        Ok(())
1024    }
1025
1026    // ── RFC-017 Stage E2 — `Server::client` removal (header + reads) ───
1027
1028    /// RFC-017 Stage E2: the "header" portion of `cancel_flow` — run the
1029    /// atomic flow-state flip (Valkey: `ff_cancel_flow` FCALL; Postgres:
1030    /// `cancel_flow_once` tx), decode policy + membership, and surface
1031    /// the `flow_already_terminal` idempotency branch as a first-class
1032    /// [`CancelFlowHeader::AlreadyTerminal`] so the Server can build
1033    /// the wire [`CancelFlowResult`] without reaching for a raw
1034    /// `Client`. Separate from the existing
1035    /// [`EngineBackend::cancel_flow`] entry point (which takes the
1036    /// enum-typed `(policy, wait)` split and returns the wait-collapsed
1037    /// `CancelFlowResult`) because the Server owns its own
1038    /// wait-dispatch + member-cancel machinery via
1039    /// [`EngineBackend::cancel_execution`] + backlog ack.
1040    ///
1041    /// Default impl returns [`EngineError::Unavailable`] so un-migrated
1042    /// backends surface the miss loudly.
1043    #[cfg(feature = "core")]
1044    async fn cancel_flow_header(
1045        &self,
1046        _args: CancelFlowArgs,
1047    ) -> Result<crate::contracts::CancelFlowHeader, EngineError> {
1048        Err(EngineError::Unavailable {
1049            op: "cancel_flow_header",
1050        })
1051    }
1052
1053    /// RFC-017 Stage E2: best-effort acknowledgement that one member of
1054    /// a `cancel_all` flow has completed its per-member cancel. Drains
1055    /// the member from the flow's `pending_cancels` set and, if empty,
1056    /// removes the flow from the partition-level `cancel_backlog`
1057    /// (Valkey: `ff_ack_cancel_member` FCALL; Postgres: table write —
1058    /// default `Unavailable` until Wave 9).
1059    ///
1060    /// Failures are swallowed by the caller — the cancel-backlog
1061    /// reconciler is the authoritative drain — but a typed error here
1062    /// lets the caller log a backend-scoped context string.
1063    #[cfg(feature = "core")]
1064    async fn ack_cancel_member(
1065        &self,
1066        _flow_id: &FlowId,
1067        _execution_id: &ExecutionId,
1068    ) -> Result<(), EngineError> {
1069        Err(EngineError::Unavailable {
1070            op: "ack_cancel_member",
1071        })
1072    }
1073
1074    /// RFC-017 Stage E2: full-shape execution read used by the
1075    /// `GET /v1/executions/{id}` HTTP route. Returns the legacy
1076    /// [`ExecutionInfo`] wire shape (not the decoupled
1077    /// [`ExecutionSnapshot`]) so the existing HTTP response bytes stay
1078    /// identical across the migration.
1079    ///
1080    /// `Ok(None)` ⇒ no such execution. Default `Unavailable` because
1081    /// the Valkey HGETALL-and-parse is backend-specific.
1082    #[cfg(feature = "core")]
1083    async fn read_execution_info(
1084        &self,
1085        _id: &ExecutionId,
1086    ) -> Result<Option<ExecutionInfo>, EngineError> {
1087        Err(EngineError::Unavailable {
1088            op: "read_execution_info",
1089        })
1090    }
1091
1092    /// RFC-017 Stage E2: narrow `public_state` read used by the
1093    /// `GET /v1/executions/{id}/state` HTTP route. Returns `Ok(None)`
1094    /// when the execution is missing. Default `Unavailable`.
1095    #[cfg(feature = "core")]
1096    async fn read_execution_state(
1097        &self,
1098        _id: &ExecutionId,
1099    ) -> Result<Option<PublicState>, EngineError> {
1100        Err(EngineError::Unavailable {
1101            op: "read_execution_state",
1102        })
1103    }
1104
1105    // ── RFC-019 Stage A — Stream-cursor subscriptions ─────────────
1106    //
1107    // Four owner-adjudicated families (RFC-019 §Open Questions #5):
1108    // `lease_history`, `completion`, `signal_delivery`,
1109    // `instance_tags`. Each returns a `StreamSubscription`
1110    // (`Pin<Box<dyn Stream<Item = Result<StreamEvent, EngineError>> +
1111    // Send>>`); the consumer drives with `StreamExt::next`.
1112    //
1113    // The cursor is backend-opaque bytes; see
1114    // [`crate::stream_subscribe`] for the shared cursor codec + event
1115    // payload. All defaults return `EngineError::Unavailable` per
1116    // RFC-017 trait-growth conventions. Stage A ships Valkey
1117    // `subscribe_lease_history` + Postgres `subscribe_completion`;
1118    // the other six (family × backend) combinations stay `Unavailable`
1119    // and are tracked in the RFC-019 Stage B follow-up issues.
1120
1121    /// Subscribe to lease lifecycle events (expired / reclaimed /
1122    /// revoked) for the partition this backend is configured with.
1123    ///
1124    /// Cross-partition fan-out is consumer-side merge: subscribe
1125    /// per-partition backend instance and interleave on the read
1126    /// side. Yields
1127    /// `Err(EngineError::StreamDisconnected { cursor })` on backend
1128    /// disconnect; resume by calling this method again with the
1129    /// returned cursor.
1130    async fn subscribe_lease_history(
1131        &self,
1132        _cursor: crate::stream_subscribe::StreamCursor,
1133    ) -> Result<crate::stream_subscribe::StreamSubscription, EngineError> {
1134        Err(EngineError::Unavailable {
1135            op: "subscribe_lease_history",
1136        })
1137    }
1138
1139    /// Subscribe to completion events (terminal state transitions).
1140    /// Postgres: wraps the existing `ff_completion_event` outbox +
1141    /// LISTEN/NOTIFY machinery. Valkey: Stage B follow-up.
1142    async fn subscribe_completion(
1143        &self,
1144        _cursor: crate::stream_subscribe::StreamCursor,
1145    ) -> Result<crate::stream_subscribe::StreamSubscription, EngineError> {
1146        Err(EngineError::Unavailable {
1147            op: "subscribe_completion",
1148        })
1149    }
1150
1151    /// Subscribe to signal-delivery events (waitpoint arming /
1152    /// satisfied). Stage B follow-up.
1153    async fn subscribe_signal_delivery(
1154        &self,
1155        _cursor: crate::stream_subscribe::StreamCursor,
1156    ) -> Result<crate::stream_subscribe::StreamSubscription, EngineError> {
1157        Err(EngineError::Unavailable {
1158            op: "subscribe_signal_delivery",
1159        })
1160    }
1161
1162    /// Subscribe to instance-tag events (tag attached / cleared).
1163    /// Stage B follow-up.
1164    async fn subscribe_instance_tags(
1165        &self,
1166        _cursor: crate::stream_subscribe::StreamCursor,
1167    ) -> Result<crate::stream_subscribe::StreamSubscription, EngineError> {
1168        Err(EngineError::Unavailable {
1169            op: "subscribe_instance_tags",
1170        })
1171    }
1172}
1173
1174/// Object-safety assertion: `dyn EngineBackend` compiles iff every
1175/// method is dyn-compatible. Kept as a compile-time guard so a future
1176/// trait change that accidentally breaks dyn-safety fails the build
1177/// at this site rather than at every downstream `Arc<dyn
1178/// EngineBackend>` use.
1179#[allow(dead_code)]
1180fn _assert_dyn_compatible(_: &dyn EngineBackend) {}
1181
1182/// Polling interval for [`wait_for_flow_cancellation`]. Tight enough
1183/// that a local single-node cancel cascade observes `cancelled` within
1184/// one or two polls; slack enough that a `WaitIndefinite` caller does
1185/// not hammer `describe_flow` on a live cluster.
1186const CANCEL_WAIT_POLL_INTERVAL: Duration = Duration::from_millis(100);
1187
1188/// Defensive ceiling for [`CancelFlowWait::WaitIndefinite`] — if the
1189/// reconciler cascade has not converged in five minutes, something is
1190/// wedged and returning `Timeout` is strictly more useful than blocking
1191/// forever. RFC-012 §3.1.1 expects real-world cascades to finish within
1192/// `reconciler_interval + grace`, which is orders of magnitude below
1193/// this.
1194const CANCEL_WAIT_INDEFINITE_CEILING: Duration = Duration::from_secs(300);
1195
1196/// Poll `backend.describe_flow(flow_id)` until `public_flow_state` is
1197/// `"cancelled"` or `deadline` elapses.
1198///
1199/// Shared by every backend's `cancel_flow` trait impl that honours
1200/// [`CancelFlowWait::WaitTimeout`] / [`CancelFlowWait::WaitIndefinite`].
1201/// The underlying `cancel_flow` FCALL / SQL transaction flips the
1202/// flow-level state synchronously; member cancellations dispatch
1203/// asynchronously via the reconciler, which also flips
1204/// `public_flow_state` to `cancelled` once the cascade completes. This
1205/// helper waits for that terminal flip.
1206///
1207/// Returns:
1208/// * `Ok(())` once `public_flow_state = "cancelled"` is observed.
1209/// * `Err(EngineError::Timeout { op: "cancel_flow", elapsed })` when
1210///   `deadline` elapses first. `elapsed` is the wait budget (the
1211///   requested timeout), not wall-clock precision.
1212/// * `Err(e)` if `describe_flow` itself errors (propagated).
1213pub async fn wait_for_flow_cancellation<B: EngineBackend + ?Sized>(
1214    backend: &B,
1215    flow_id: &crate::types::FlowId,
1216    deadline: Duration,
1217) -> Result<(), EngineError> {
1218    let start = std::time::Instant::now();
1219    loop {
1220        match backend.describe_flow(flow_id).await? {
1221            Some(snap) if snap.public_flow_state == "cancelled" => return Ok(()),
1222            // `None` (flow removed) is also terminal from the caller's
1223            // perspective — nothing left to wait on.
1224            None => return Ok(()),
1225            Some(_) => {}
1226        }
1227        if start.elapsed() >= deadline {
1228            return Err(EngineError::Timeout {
1229                op: "cancel_flow",
1230                elapsed: deadline,
1231            });
1232        }
1233        tokio::time::sleep(CANCEL_WAIT_POLL_INTERVAL).await;
1234    }
1235}
1236
1237/// Convert a [`CancelFlowWait`] into the deadline passed to
1238/// [`wait_for_flow_cancellation`]. `NoWait` returns `None` — the caller
1239/// must skip the wait entirely.
1240pub fn cancel_flow_wait_deadline(wait: CancelFlowWait) -> Option<Duration> {
1241    // `CancelFlowWait` is `#[non_exhaustive]`; this match lives in the
1242    // defining crate so the exhaustiveness check keeps the compiler
1243    // honest. Future variants must be wired here explicitly.
1244    match wait {
1245        CancelFlowWait::NoWait => None,
1246        CancelFlowWait::WaitTimeout(d) => Some(d),
1247        CancelFlowWait::WaitIndefinite => Some(CANCEL_WAIT_INDEFINITE_CEILING),
1248    }
1249}
1250
1251#[cfg(test)]
1252mod tests {
1253    use super::*;
1254    use crate::capability::{Capability, CapabilityStatus};
1255
1256    /// A zero-state backend stub used to exercise the default
1257    /// `capabilities_matrix()` impl without pulling in a real
1258    /// transport. Only the default method is under test here; every
1259    /// other method is unreachable on this type.
1260    struct DefaultBackend;
1261
1262    #[async_trait]
1263    impl EngineBackend for DefaultBackend {
1264        async fn claim(
1265            &self,
1266            _lane: &LaneId,
1267            _capabilities: &CapabilitySet,
1268            _policy: ClaimPolicy,
1269        ) -> Result<Option<Handle>, EngineError> {
1270            unreachable!()
1271        }
1272        async fn renew(&self, _handle: &Handle) -> Result<LeaseRenewal, EngineError> {
1273            unreachable!()
1274        }
1275        async fn progress(
1276            &self,
1277            _handle: &Handle,
1278            _percent: Option<u8>,
1279            _message: Option<String>,
1280        ) -> Result<(), EngineError> {
1281            unreachable!()
1282        }
1283        async fn append_frame(
1284            &self,
1285            _handle: &Handle,
1286            _frame: Frame,
1287        ) -> Result<AppendFrameOutcome, EngineError> {
1288            unreachable!()
1289        }
1290        async fn complete(
1291            &self,
1292            _handle: &Handle,
1293            _payload: Option<Vec<u8>>,
1294        ) -> Result<(), EngineError> {
1295            unreachable!()
1296        }
1297        async fn fail(
1298            &self,
1299            _handle: &Handle,
1300            _reason: FailureReason,
1301            _classification: FailureClass,
1302        ) -> Result<FailOutcome, EngineError> {
1303            unreachable!()
1304        }
1305        async fn cancel(&self, _handle: &Handle, _reason: &str) -> Result<(), EngineError> {
1306            unreachable!()
1307        }
1308        async fn suspend(
1309            &self,
1310            _handle: &Handle,
1311            _args: SuspendArgs,
1312        ) -> Result<SuspendOutcome, EngineError> {
1313            unreachable!()
1314        }
1315        async fn create_waitpoint(
1316            &self,
1317            _handle: &Handle,
1318            _waitpoint_key: &str,
1319            _expires_in: Duration,
1320        ) -> Result<PendingWaitpoint, EngineError> {
1321            unreachable!()
1322        }
1323        async fn observe_signals(
1324            &self,
1325            _handle: &Handle,
1326        ) -> Result<Vec<ResumeSignal>, EngineError> {
1327            unreachable!()
1328        }
1329        async fn claim_from_reclaim(
1330            &self,
1331            _token: ReclaimToken,
1332        ) -> Result<Option<Handle>, EngineError> {
1333            unreachable!()
1334        }
1335        async fn delay(
1336            &self,
1337            _handle: &Handle,
1338            _delay_until: TimestampMs,
1339        ) -> Result<(), EngineError> {
1340            unreachable!()
1341        }
1342        async fn wait_children(&self, _handle: &Handle) -> Result<(), EngineError> {
1343            unreachable!()
1344        }
1345        async fn describe_execution(
1346            &self,
1347            _id: &ExecutionId,
1348        ) -> Result<Option<ExecutionSnapshot>, EngineError> {
1349            unreachable!()
1350        }
1351        async fn describe_flow(
1352            &self,
1353            _id: &FlowId,
1354        ) -> Result<Option<FlowSnapshot>, EngineError> {
1355            unreachable!()
1356        }
1357        #[cfg(feature = "core")]
1358        async fn list_edges(
1359            &self,
1360            _flow_id: &FlowId,
1361            _direction: EdgeDirection,
1362        ) -> Result<Vec<EdgeSnapshot>, EngineError> {
1363            unreachable!()
1364        }
1365        #[cfg(feature = "core")]
1366        async fn describe_edge(
1367            &self,
1368            _flow_id: &FlowId,
1369            _edge_id: &EdgeId,
1370        ) -> Result<Option<EdgeSnapshot>, EngineError> {
1371            unreachable!()
1372        }
1373        #[cfg(feature = "core")]
1374        async fn resolve_execution_flow_id(
1375            &self,
1376            _eid: &ExecutionId,
1377        ) -> Result<Option<FlowId>, EngineError> {
1378            unreachable!()
1379        }
1380        #[cfg(feature = "core")]
1381        async fn list_flows(
1382            &self,
1383            _partition: PartitionKey,
1384            _cursor: Option<FlowId>,
1385            _limit: usize,
1386        ) -> Result<ListFlowsPage, EngineError> {
1387            unreachable!()
1388        }
1389        #[cfg(feature = "core")]
1390        async fn list_lanes(
1391            &self,
1392            _cursor: Option<LaneId>,
1393            _limit: usize,
1394        ) -> Result<ListLanesPage, EngineError> {
1395            unreachable!()
1396        }
1397        #[cfg(feature = "core")]
1398        async fn list_suspended(
1399            &self,
1400            _partition: PartitionKey,
1401            _cursor: Option<ExecutionId>,
1402            _limit: usize,
1403        ) -> Result<ListSuspendedPage, EngineError> {
1404            unreachable!()
1405        }
1406        #[cfg(feature = "core")]
1407        async fn list_executions(
1408            &self,
1409            _partition: PartitionKey,
1410            _cursor: Option<ExecutionId>,
1411            _limit: usize,
1412        ) -> Result<ListExecutionsPage, EngineError> {
1413            unreachable!()
1414        }
1415        #[cfg(feature = "core")]
1416        async fn deliver_signal(
1417            &self,
1418            _args: DeliverSignalArgs,
1419        ) -> Result<DeliverSignalResult, EngineError> {
1420            unreachable!()
1421        }
1422        #[cfg(feature = "core")]
1423        async fn claim_resumed_execution(
1424            &self,
1425            _args: ClaimResumedExecutionArgs,
1426        ) -> Result<ClaimResumedExecutionResult, EngineError> {
1427            unreachable!()
1428        }
1429        async fn cancel_flow(
1430            &self,
1431            _id: &FlowId,
1432            _policy: CancelFlowPolicy,
1433            _wait: CancelFlowWait,
1434        ) -> Result<CancelFlowResult, EngineError> {
1435            unreachable!()
1436        }
1437        #[cfg(feature = "core")]
1438        async fn set_edge_group_policy(
1439            &self,
1440            _flow_id: &FlowId,
1441            _downstream_execution_id: &ExecutionId,
1442            _policy: crate::contracts::EdgeDependencyPolicy,
1443        ) -> Result<crate::contracts::SetEdgeGroupPolicyResult, EngineError> {
1444            unreachable!()
1445        }
1446        async fn report_usage(
1447            &self,
1448            _handle: &Handle,
1449            _budget: &BudgetId,
1450            _dimensions: crate::backend::UsageDimensions,
1451        ) -> Result<ReportUsageResult, EngineError> {
1452            unreachable!()
1453        }
1454        #[cfg(feature = "streaming")]
1455        async fn read_stream(
1456            &self,
1457            _execution_id: &ExecutionId,
1458            _attempt_index: AttemptIndex,
1459            _from: StreamCursor,
1460            _to: StreamCursor,
1461            _count_limit: u64,
1462        ) -> Result<StreamFrames, EngineError> {
1463            unreachable!()
1464        }
1465        #[cfg(feature = "streaming")]
1466        async fn tail_stream(
1467            &self,
1468            _execution_id: &ExecutionId,
1469            _attempt_index: AttemptIndex,
1470            _after: StreamCursor,
1471            _block_ms: u64,
1472            _count_limit: u64,
1473            _visibility: TailVisibility,
1474        ) -> Result<StreamFrames, EngineError> {
1475            unreachable!()
1476        }
1477        #[cfg(feature = "streaming")]
1478        async fn read_summary(
1479            &self,
1480            _execution_id: &ExecutionId,
1481            _attempt_index: AttemptIndex,
1482        ) -> Result<Option<SummaryDocument>, EngineError> {
1483            unreachable!()
1484        }
1485    }
1486
1487    /// The default `capabilities_matrix()` impl returns an empty
1488    /// matrix tagged `family = "unknown"` so pre-RFC-018 out-of-tree
1489    /// backends keep compiling and consumers can distinguish
1490    /// "backend predates RFC-018" from "backend reports concrete
1491    /// rows." Every concrete in-tree backend overrides.
1492    #[test]
1493    fn default_capabilities_matrix_is_unknown_family() {
1494        let b = DefaultBackend;
1495        let m = b.capabilities_matrix();
1496        assert_eq!(m.identity.family, "unknown");
1497        assert_eq!(
1498            m.identity.version,
1499            crate::capability::Version::new(0, 0, 0)
1500        );
1501        assert_eq!(m.identity.rfc017_stage, "unknown");
1502        assert!(m.caps.is_empty());
1503        // Any capability resolves to Unknown on a default matrix.
1504        assert_eq!(m.get(Capability::Ping), CapabilityStatus::Unknown);
1505        assert!(!m.supports(Capability::Ping));
1506    }
1507}