Skip to main content

ff_core/
engine_backend.rs

1//! The `EngineBackend` trait — abstracting FlowFabric's write surface.
2//!
3//! **RFC-012 Stage 1a:** this is the trait landing. The
4//! Valkey-backed impl lives in `ff-backend-valkey`; future backends
5//! (Postgres) add a sibling crate with their own impl. ff-sdk's
6//! `FlowFabricWorker` gains `connect_with(backend)` /
7//! `backend(&self)` accessors so consumers that want to bring their
8//! own backend (tests, future non-Valkey deployments) can hand one
9//! in. The hot-path migration of `ClaimedTask` / `FlowFabricWorker`
10//! to forward through the trait lands across Stages 1b-1d.
11//!
12//! # Object safety
13//!
14//! `EngineBackend` is object-safe: all methods are `async fn` behind
15//! `#[async_trait]` and take `&self`. Consumers can hold
16//! `Arc<dyn EngineBackend>` for heterogenous-backend deployments.
17//! The trait is `Send + Sync + 'static` per RFC-012 §4.1; every impl
18//! must honour that bound.
19//!
20//! # Error surface
21//!
22//! Every method returns [`Result<_, EngineError>`]. `EngineError`'s
23//! `Transport` variant carries a boxed `dyn Error + Send + Sync`;
24//! Valkey-backed transport faults box a
25//! `ff_script::error::ScriptError` (downcast via
26//! `ff_script::engine_error_ext::transport_script_ref`). Other
27//! backends box their native error type and set the `backend` tag
28//! accordingly.
29//!
30//! # Atomicity contract
31//!
32//! Per-op state transitions MUST be atomic (RFC-012 §3.4). On Valkey
33//! this is the single-FCALL-per-op property; on Postgres it is the
34//! per-transaction property. A backend that cannot honour atomicity
35//! for a given op either MUST NOT implement `EngineBackend` or MUST
36//! return `EngineError::Unavailable { op }` for the affected method.
37//!
38//! # Replay semantics
39//!
40//! `complete`, `fail`, `cancel`, `suspend`, `delay`, `wait_children`
41//! are idempotent under replay — calling twice with the same handle
42//! and args returns the same outcome (success on first call, typed
43//! `State` / `Contention` on subsequent calls where the fence triple
44//! no longer matches a live lease).
45
46use std::time::Duration;
47
48use async_trait::async_trait;
49
50use crate::backend::{
51    AppendFrameOutcome, CancelFlowPolicy, CancelFlowWait, CapabilitySet, ClaimPolicy,
52    FailOutcome, FailureClass, FailureReason, Frame, Handle, LeaseRenewal, PendingWaitpoint,
53    PrepareOutcome, ReclaimToken, ResumeSignal, SummaryDocument, TailVisibility,
54};
55use crate::contracts::{
56    CancelFlowResult, ExecutionSnapshot, FlowSnapshot, ReportUsageResult,
57    RotateWaitpointHmacSecretAllArgs, RotateWaitpointHmacSecretAllResult, SeedOutcome,
58    SeedWaitpointHmacSecretArgs, SuspendArgs, SuspendOutcome,
59};
60#[cfg(feature = "core")]
61use crate::contracts::{
62    AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
63    ApplyDependencyToChildResult, BudgetStatus, CancelExecutionArgs, CancelExecutionResult,
64    CancelFlowArgs, ChangePriorityArgs, ChangePriorityResult, ClaimForWorkerArgs,
65    ClaimForWorkerOutcome, ClaimResumedExecutionArgs, ClaimResumedExecutionResult,
66    CreateBudgetArgs, CreateBudgetResult, CreateExecutionArgs, CreateExecutionResult,
67    CreateFlowArgs, CreateFlowResult, CreateQuotaPolicyArgs, CreateQuotaPolicyResult,
68    DeliverSignalArgs, DeliverSignalResult, EdgeDirection, EdgeSnapshot, ExecutionInfo,
69    ListExecutionsPage, ListFlowsPage, ListLanesPage, ListPendingWaitpointsArgs,
70    ListPendingWaitpointsResult, ListSuspendedPage, ReplayExecutionArgs, ReplayExecutionResult,
71    ReportUsageAdminArgs, ResetBudgetArgs, ResetBudgetResult, RevokeLeaseArgs, RevokeLeaseResult,
72    StageDependencyEdgeArgs, StageDependencyEdgeResult,
73};
74#[cfg(feature = "core")]
75use crate::state::PublicState;
76#[cfg(feature = "core")]
77use crate::partition::PartitionKey;
78#[cfg(feature = "streaming")]
79use crate::contracts::{StreamCursor, StreamFrames};
80use crate::engine_error::EngineError;
81#[cfg(feature = "streaming")]
82use crate::types::AttemptIndex;
83#[cfg(feature = "core")]
84use crate::types::EdgeId;
85use crate::types::{BudgetId, ExecutionId, FlowId, LaneId, LeaseFence, TimestampMs};
86
87/// The engine write surface — a single trait a backend implementation
88/// honours to serve a `FlowFabricWorker`.
89///
90/// See RFC-012 §3.1 for the inventory rationale and §3.3 for the
91/// type-level shape. 16 methods (Round-7 added `create_waitpoint`;
92/// `append_frame` return widened; `report_usage` return replaced —
93/// RFC-012 §R7). Issue #150 added the two trigger-surface methods
94/// (`deliver_signal` / `claim_resumed_execution`).
95///
96/// # Note on `complete` payload shape
97///
98/// The RFC §3.3 sketch uses `Option<Bytes>`; the Stage 1a trait uses
99/// `Option<Vec<u8>>` to match the existing
100/// `ff_sdk::ClaimedTask::complete` signature and avoid adding a
101/// `bytes` public-type dep for zero consumer benefit. Round-4 §7.17
102/// resolved the payload container debate to `Box<[u8]>` in the
103/// public type (see `HandleOpaque`); `Option<Vec<u8>>` is the
104/// zero-churn choice consistent with today's code. Consumers that
105/// need `&[u8]` can borrow via `.as_deref()` on the Option.
106#[async_trait]
107pub trait EngineBackend: Send + Sync + 'static {
108    // ── Claim + lifecycle ──
109
110    /// Fresh-work claim. Returns `Ok(None)` when no work is currently
111    /// available; `Err` only on transport or input-validation faults.
112    async fn claim(
113        &self,
114        lane: &LaneId,
115        capabilities: &CapabilitySet,
116        policy: ClaimPolicy,
117    ) -> Result<Option<Handle>, EngineError>;
118
119    /// Renew a held lease. Returns the updated expiry + epoch on
120    /// success; typed `State::StaleLease` / `State::LeaseExpired`
121    /// when the lease has been stolen or timed out.
122    async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError>;
123
124    /// Numeric-progress heartbeat.
125    ///
126    /// Writes scalar `progress_percent` / `progress_message` fields on
127    /// `exec_core`; each call overwrites the previous value. This does
128    /// NOT append to the output stream — stream-frame producers must use
129    /// [`append_frame`](Self::append_frame) instead.
130    async fn progress(
131        &self,
132        handle: &Handle,
133        percent: Option<u8>,
134        message: Option<String>,
135    ) -> Result<(), EngineError>;
136
137    /// Append one stream frame. Distinct from [`progress`](Self::progress)
138    /// per RFC-012 §3.1.1 K#6. Returns the backend-assigned stream entry
139    /// id and post-append frame count (RFC-012 §R7.2.1).
140    ///
141    /// Stream-frame producers (arbitrary `frame_type` + payload, consumed
142    /// via the read/tail surfaces) MUST use this method rather than
143    /// [`progress`](Self::progress); the latter updates scalar fields on
144    /// `exec_core` and is invisible to stream consumers.
145    async fn append_frame(
146        &self,
147        handle: &Handle,
148        frame: Frame,
149    ) -> Result<AppendFrameOutcome, EngineError>;
150
151    /// Terminal success. Borrows `handle` (round-4 M-D2) so callers
152    /// can retry under `EngineError::Transport` without losing the
153    /// cookie. Payload is `Option<Vec<u8>>` per the note above.
154    async fn complete(&self, handle: &Handle, payload: Option<Vec<u8>>) -> Result<(), EngineError>;
155
156    /// Terminal failure with classification. Returns [`FailOutcome`]
157    /// so the caller learns whether a retry was scheduled.
158    async fn fail(
159        &self,
160        handle: &Handle,
161        reason: FailureReason,
162        classification: FailureClass,
163    ) -> Result<FailOutcome, EngineError>;
164
165    /// Cooperative cancel by the worker holding the lease.
166    async fn cancel(&self, handle: &Handle, reason: &str) -> Result<(), EngineError>;
167
168    /// Suspend the execution awaiting a typed resume condition
169    /// (RFC-013 Stage 1d).
170    ///
171    /// Borrows `handle` (round-4 M-D2). Terminal-looking behaviour is
172    /// expressed through [`SuspendOutcome`]:
173    ///
174    /// * [`SuspendOutcome::Suspended`] — the pre-suspend handle is
175    ///   logically invalidated; the fresh `HandleKind::Suspended`
176    ///   handle inside the variant supersedes it. Runtime enforcement
177    ///   via the fence triple: subsequent ops against the stale handle
178    ///   surface as `Contention(LeaseConflict)`.
179    /// * [`SuspendOutcome::AlreadySatisfied`] — buffered signals on a
180    ///   pending waitpoint already matched the resume condition at
181    ///   suspension time. The lease is NOT released; the caller's
182    ///   pre-suspend handle remains valid.
183    ///
184    /// See RFC-013 §2 for the type shapes, §3 for the replay /
185    /// idempotency contract, §4 for the error taxonomy.
186    async fn suspend(
187        &self,
188        handle: &Handle,
189        args: SuspendArgs,
190    ) -> Result<SuspendOutcome, EngineError>;
191
192    /// Suspend by execution id + lease fence triple, for service-layer
193    /// callers that hold a run record / lease-claim descriptor but no
194    /// worker [`Handle`] (cairn issue #322).
195    ///
196    /// Semantics mirror [`Self::suspend`] exactly — the same
197    /// [`SuspendArgs`] validation, the same [`SuspendOutcome`]
198    /// lifecycle, the same RFC-013 §3 dedup / replay contract. The
199    /// only difference is the fencing source: instead of the
200    /// `(lease_id, lease_epoch, attempt_id)` fields embedded in a
201    /// `Handle`, the backend fences against the triple passed directly.
202    /// Attempt-index, lane, and worker-instance metadata that
203    /// [`Self::suspend`] reads from the handle payload are recovered
204    /// from the backend's authoritative execution record (Valkey:
205    /// `exec_core` HGETs; Postgres: `ff_attempt` row lookup).
206    ///
207    /// The default impl returns [`EngineError::Unavailable`] so
208    /// existing backend impls remain non-breaking. Production backends
209    /// (Valkey, Postgres) override.
210    async fn suspend_by_triple(
211        &self,
212        exec_id: ExecutionId,
213        triple: LeaseFence,
214        args: SuspendArgs,
215    ) -> Result<SuspendOutcome, EngineError> {
216        let _ = (exec_id, triple, args);
217        Err(EngineError::Unavailable {
218            op: "suspend_by_triple",
219        })
220    }
221
222    /// Issue a pending waitpoint for future signal delivery.
223    ///
224    /// Waitpoints have two states in the Valkey wire contract:
225    /// **pending** (token issued, not yet backing a suspension) and
226    /// **active** (bound to a suspension). This method creates a
227    /// waitpoint in the **pending** state. A later `suspend` call
228    /// transitions a pending waitpoint to active (see Lua
229    /// `use_pending_waitpoint` ARGV flag at
230    /// `flowfabric.lua:3603,3641,3690`) — or, if buffered signals
231    /// already satisfy its condition, the suspend call returns
232    /// `SuspendOutcome::AlreadySatisfied` and the waitpoint activates
233    /// without ever releasing the lease.
234    ///
235    /// Pending-waitpoint expiry is a first-class terminal error on
236    /// the wire (`PendingWaitpointExpired` at
237    /// `ff-script/src/error.rs:170,403-408`). The attempt retains its
238    /// lease while the waitpoint is pending; signals delivered to
239    /// this waitpoint are buffered server-side (RFC-012 §R7.2.2).
240    async fn create_waitpoint(
241        &self,
242        handle: &Handle,
243        waitpoint_key: &str,
244        expires_in: Duration,
245    ) -> Result<PendingWaitpoint, EngineError>;
246
247    /// Non-mutating observation of signals that satisfied the handle's
248    /// resume condition.
249    async fn observe_signals(&self, handle: &Handle) -> Result<Vec<ResumeSignal>, EngineError>;
250
251    /// Consume a reclaim grant to mint a resumed-kind handle. Returns
252    /// `Ok(None)` when the grant's target execution is no longer
253    /// resumable (already reclaimed, terminal, etc.).
254    async fn claim_from_reclaim(&self, token: ReclaimToken) -> Result<Option<Handle>, EngineError>;
255
256    // Round-5 amendment: lease-releasing peers of `suspend`.
257
258    /// Park the execution until `delay_until`, releasing the lease.
259    async fn delay(&self, handle: &Handle, delay_until: TimestampMs) -> Result<(), EngineError>;
260
261    /// Mark the execution as waiting for its child flow to complete,
262    /// releasing the lease.
263    async fn wait_children(&self, handle: &Handle) -> Result<(), EngineError>;
264
265    // ── Read / admin ──
266
267    /// Snapshot an execution by id. `Ok(None)` ⇒ no such execution.
268    async fn describe_execution(
269        &self,
270        id: &ExecutionId,
271    ) -> Result<Option<ExecutionSnapshot>, EngineError>;
272
273    /// Snapshot a flow by id. `Ok(None)` ⇒ no such flow.
274    async fn describe_flow(&self, id: &FlowId) -> Result<Option<FlowSnapshot>, EngineError>;
275
276    /// List dependency edges adjacent to an execution. Read-only; the
277    /// backend resolves the subject execution's flow, reads the
278    /// direction-specific adjacency SET, and decodes each member's
279    /// flow-scoped `edge:<edge_id>` hash.
280    ///
281    /// Returns an empty `Vec` when the subject has no edges on the
282    /// requested side — including standalone executions (no owning
283    /// flow). Ordering is unspecified: the underlying adjacency SET
284    /// is an unordered SMEMBERS read. Callers that need deterministic
285    /// order should sort by [`EdgeSnapshot::edge_id`] /
286    /// [`EdgeSnapshot::created_at`] themselves.
287    ///
288    /// Parse failures on the edge hash surface as
289    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]
290    /// — unknown fields, missing required fields, endpoint mismatches
291    /// against the adjacency SET all fail loud rather than silently
292    /// returning partial results.
293    ///
294    /// Gated on the `core` feature — edge reads are part of the
295    /// minimal engine surface a Postgres-style backend must honour.
296    ///
297    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
298    #[cfg(feature = "core")]
299    async fn list_edges(
300        &self,
301        flow_id: &FlowId,
302        direction: EdgeDirection,
303    ) -> Result<Vec<EdgeSnapshot>, EngineError>;
304
305    /// Snapshot a single dependency edge by its owning flow + edge id.
306    ///
307    /// `Ok(None)` when the edge hash is absent (never staged, or
308    /// staged under a different flow than `flow_id`). Parse failures
309    /// on a present edge hash surface as
310    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]
311    /// — the stored `flow_id` field is cross-checked against the
312    /// caller's expected `flow_id` so a wrong-key read fails loud
313    /// rather than returning an unrelated edge.
314    ///
315    /// Gated on the `core` feature — single-edge reads are part of
316    /// the minimal snapshot surface an alternate backend must honour
317    /// alongside [`Self::describe_execution`] / [`Self::describe_flow`]
318    /// / [`Self::list_edges`].
319    ///
320    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
321    #[cfg(feature = "core")]
322    async fn describe_edge(
323        &self,
324        flow_id: &FlowId,
325        edge_id: &EdgeId,
326    ) -> Result<Option<EdgeSnapshot>, EngineError>;
327
328    /// Resolve an execution's owning flow id, if any.
329    ///
330    /// `Ok(None)` when the execution's core record is absent or has
331    /// no associated flow (standalone execution). A present-but-
332    /// malformed `flow_id` field surfaces as
333    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`].
334    ///
335    /// Gated on the `core` feature. Used by ff-sdk's
336    /// `list_outgoing_edges` / `list_incoming_edges` to pivot from a
337    /// consumer-supplied `ExecutionId` to the `FlowId` required by
338    /// [`Self::list_edges`]. A Valkey backend serves this with a
339    /// single `HGET exec_core flow_id`; a Postgres backend serves it
340    /// with the equivalent single-column row lookup.
341    ///
342    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
343    #[cfg(feature = "core")]
344    async fn resolve_execution_flow_id(
345        &self,
346        eid: &ExecutionId,
347    ) -> Result<Option<FlowId>, EngineError>;
348
349    /// List flows on a partition with cursor-based pagination (issue
350    /// #185).
351    ///
352    /// Returns a [`ListFlowsPage`] of [`FlowSummary`](crate::contracts::FlowSummary)
353    /// rows ordered by `flow_id` (UUID byte-lexicographic). `cursor`
354    /// is `None` for the first page; callers forward the returned
355    /// `next_cursor` verbatim to continue iteration, and the listing
356    /// is exhausted when `next_cursor` is `None`. `limit` is the
357    /// maximum number of rows to return on this page — implementations
358    /// MAY return fewer (end of partition) but MUST NOT exceed it.
359    ///
360    /// Ordering rationale: flow ids are UUIDs, and both Valkey
361    /// (sort after-the-fact) and Postgres (`ORDER BY flow_id`) can
362    /// agree on byte-lexicographic order — the same order
363    /// `FlowId::to_string()` produces for canonical hyphenated UUIDs.
364    /// Mapping to `cursor > flow_id` keeps the contract backend-
365    /// independent.
366    ///
367    /// # Postgres implementation pattern
368    ///
369    /// A Postgres-backed implementation serves this directly with
370    ///
371    /// ```sql
372    /// SELECT flow_id, created_at_ms, public_flow_state
373    ///   FROM ff_flow
374    ///  WHERE partition_key = $1
375    ///    AND ($2::uuid IS NULL OR flow_id > $2)
376    ///  ORDER BY flow_id
377    ///  LIMIT $3 + 1;
378    /// ```
379    ///
380    /// — reading one extra row to decide whether `next_cursor` should
381    /// be set to the last row's `flow_id`. The Valkey implementation
382    /// maintains the `ff:idx:{fp:N}:flow_index` SET and performs the
383    /// sort + slice client-side (SMEMBERS then sort-by-UUID-bytes),
384    /// pipelining `HGETALL flow_core` for each row on the page.
385    ///
386    /// Gated on the `core` feature — flow listing is part of the
387    /// minimal engine surface a Postgres-style backend must honour.
388    #[cfg(feature = "core")]
389    async fn list_flows(
390        &self,
391        partition: PartitionKey,
392        cursor: Option<FlowId>,
393        limit: usize,
394    ) -> Result<ListFlowsPage, EngineError>;
395
396    /// Enumerate registered lanes with cursor-based pagination.
397    ///
398    /// Lanes are global (not partition-scoped) — the backend serves
399    /// this from its lane registry and does NOT accept a
400    /// [`crate::partition::Partition`] argument. Results are sorted
401    /// by [`LaneId`] name so the ordering is stable across calls and
402    /// cursors address a deterministic position in the sort.
403    ///
404    /// * `cursor` — exclusive lower bound. `None` starts from the
405    ///   first lane. To continue a walk, pass the previous page's
406    ///   [`ListLanesPage::next_cursor`].
407    /// * `limit` — hard cap on the number of lanes returned in the
408    ///   page. Backends MAY round this down when the registry size
409    ///   is smaller; they MUST NOT return more than `limit`.
410    ///
411    /// [`ListLanesPage::next_cursor`] is `Some(last_lane_in_page)`
412    /// iff at least one more lane exists after the returned page,
413    /// and `None` on the final page. Callers loop until `next_cursor`
414    /// is `None` to read the full registry.
415    ///
416    /// Gated on the `core` feature — lane enumeration is part of the
417    /// minimal snapshot surface an alternate backend must honour
418    /// alongside [`Self::describe_flow`] / [`Self::list_edges`].
419    #[cfg(feature = "core")]
420    async fn list_lanes(
421        &self,
422        cursor: Option<LaneId>,
423        limit: usize,
424    ) -> Result<ListLanesPage, EngineError>;
425
426    /// List suspended executions in one partition, cursor-paginated,
427    /// with each entry's suspension `reason_code` populated (issue
428    /// #183).
429    ///
430    /// Consumer-facing "what's blocked on what?" panels (ff-board's
431    /// suspended-executions view, operator CLIs) need the reason in
432    /// the list response so the UI does not round-trip per row to
433    /// `describe_execution` for a field it knows it needs. `reason`
434    /// on [`SuspendedExecutionEntry`] carries the free-form
435    /// `suspension:current.reason_code` field — see the type rustdoc
436    /// for the String-not-enum rationale.
437    ///
438    /// `cursor` is opaque to callers; pass `None` to start a fresh
439    /// scan and feed the returned [`ListSuspendedPage::next_cursor`]
440    /// back in on subsequent pages until it comes back `None`.
441    /// `limit` bounds the `entries` count; backends MAY return fewer
442    /// when the partition is exhausted.
443    ///
444    /// Ordering is by ascending `suspended_at_ms` (the per-lane
445    /// suspended ZSET score == `timeout_at` or the no-timeout
446    /// sentinel) with execution id as a lex tiebreak, so cursor
447    /// continuation is deterministic across calls.
448    ///
449    /// Gated on the `core` feature — suspended-list enumeration is
450    /// part of the minimal engine surface a Postgres-style backend
451    /// must honour.
452    #[cfg(feature = "core")]
453    async fn list_suspended(
454        &self,
455        partition: PartitionKey,
456        cursor: Option<ExecutionId>,
457        limit: usize,
458    ) -> Result<ListSuspendedPage, EngineError>;
459
460    /// Forward-only paginated listing of the executions indexed under
461    /// one partition.
462    ///
463    /// Reads the partition-wide `ff:idx:{p:N}:all_executions` set,
464    /// sorts lexicographically on `ExecutionId`, and returns the page
465    /// of ids strictly greater than `cursor` (or starting from the
466    /// smallest id when `cursor = None`). The returned
467    /// [`ListExecutionsPage::next_cursor`] is the last id on the page
468    /// iff at least one more id exists past it; `None` signals
469    /// end-of-stream.
470    ///
471    /// `limit` is the maximum number of ids returned on this page. A
472    /// `limit` of `0` returns an empty page with `next_cursor = None`.
473    /// Backends MAY cap `limit` internally (Valkey: 1000) and return
474    /// fewer ids than requested; callers continue paginating until
475    /// `next_cursor == None`.
476    ///
477    /// Ordering is stable under concurrent inserts for already-emitted
478    /// ids (an id less-than-or-equal-to the caller's cursor is never
479    /// re-emitted in later pages) but new inserts past the cursor WILL
480    /// appear in subsequent pages — consistent with forward-only
481    /// cursor semantics.
482    ///
483    /// Gated on the `core` feature — partition-scoped listing is part
484    /// of the minimal engine surface every backend must honour.
485    #[cfg(feature = "core")]
486    async fn list_executions(
487        &self,
488        partition: PartitionKey,
489        cursor: Option<ExecutionId>,
490        limit: usize,
491    ) -> Result<ListExecutionsPage, EngineError>;
492
493    // ── Trigger ops (issue #150) ──
494
495    /// Deliver an external signal to a suspended execution's waitpoint.
496    ///
497    /// The backend atomically records the signal, evaluates the resume
498    /// condition, and — when satisfied — transitions the execution
499    /// from `suspended` to `runnable` (or buffers the signal when the
500    /// waitpoint is still `pending`). Duplicate delivery — same
501    /// `idempotency_key` + waitpoint — surfaces as
502    /// [`DeliverSignalResult::Duplicate`] with the pre-existing
503    /// `signal_id` rather than mutating state twice.
504    ///
505    /// Input validation (HMAC token presence, payload size limits,
506    /// signal-name shape) is the backend's responsibility; callers
507    /// pass a fully populated [`DeliverSignalArgs`] and receive typed
508    /// outcomes or typed errors (`ScriptError::invalid_token`,
509    /// `ScriptError::token_expired`, `ScriptError::ExecutionNotFound`
510    /// surfaced via [`EngineError::Transport`] on the Valkey backend).
511    ///
512    /// Gated on the `core` feature — signal delivery is part of the
513    /// minimal trigger surface every backend must honour so ff-server
514    /// / REST handlers can dispatch against `Arc<dyn EngineBackend>`
515    /// without knowing which backend is running underneath.
516    #[cfg(feature = "core")]
517    async fn deliver_signal(
518        &self,
519        args: DeliverSignalArgs,
520    ) -> Result<DeliverSignalResult, EngineError>;
521
522    /// Claim a resumed execution — a previously-suspended attempt that
523    /// has cleared its resume condition (e.g. via
524    /// [`Self::deliver_signal`]) and now needs a worker to pick up the
525    /// same attempt index.
526    ///
527    /// Distinct from [`Self::claim`] (fresh work) and
528    /// [`Self::claim_from_reclaim`] (grant-based ownership transfer
529    /// after a crash): the resumed-claim path re-binds an existing
530    /// attempt rather than minting a new one. The backend issues a
531    /// fresh `lease_id` + bumps the `lease_epoch`, preserving
532    /// `attempt_id` / `attempt_index` so stream frames and progress
533    /// updates continue on the same attempt.
534    ///
535    /// Typed failures surface via `ScriptError` → `EngineError`:
536    /// `NotAResumedExecution` when the attempt state is not
537    /// `attempt_interrupted`, `ExecutionNotLeaseable` when the
538    /// lifecycle phase is not `runnable`, and `InvalidClaimGrant`
539    /// when the grant key is missing or was already consumed.
540    ///
541    /// Gated on the `core` feature — resumed-claim is part of the
542    /// minimal trigger surface every backend must honour.
543    #[cfg(feature = "core")]
544    async fn claim_resumed_execution(
545        &self,
546        args: ClaimResumedExecutionArgs,
547    ) -> Result<ClaimResumedExecutionResult, EngineError>;
548
549    /// Operator-initiated cancellation of a flow and (optionally) its
550    /// member executions. See RFC-012 §3.1.1 for the policy /wait
551    /// matrix.
552    async fn cancel_flow(
553        &self,
554        id: &FlowId,
555        policy: CancelFlowPolicy,
556        wait: CancelFlowWait,
557    ) -> Result<CancelFlowResult, EngineError>;
558
559    /// RFC-016 Stage A: set the inbound-edge-group policy for a
560    /// downstream execution. Must be called before the first
561    /// `add_dependency(... -> downstream_execution_id)` — the backend
562    /// rejects with [`EngineError::Conflict`] if edges have already
563    /// been staged for this group.
564    ///
565    /// Stage A honours only
566    /// [`EdgeDependencyPolicy::AllOf`](crate::contracts::EdgeDependencyPolicy::AllOf);
567    /// the `AnyOf` / `Quorum` variants return
568    /// [`EngineError::Validation`] with
569    /// `detail = "stage A supports AllOf only; AnyOf/Quorum land in stage B"`
570    /// until Stage B's resolver lands.
571    #[cfg(feature = "core")]
572    async fn set_edge_group_policy(
573        &self,
574        flow_id: &FlowId,
575        downstream_execution_id: &ExecutionId,
576        policy: crate::contracts::EdgeDependencyPolicy,
577    ) -> Result<crate::contracts::SetEdgeGroupPolicyResult, EngineError>;
578
579    // ── HMAC secret rotation (v0.7 migration-master Q4) ──
580
581    /// Rotate the waitpoint HMAC signing kid **cluster-wide**.
582    ///
583    /// **v0.7 migration-master Q4 (adjudicated 2026-04-24).**
584    /// Additive trait surface so Valkey and Postgres backends can
585    /// both expose the "rotate everywhere" semantic under one name.
586    ///
587    /// * Valkey impl fans out an `ff_rotate_waitpoint_hmac_secret`
588    ///   FCALL per execution partition. `entries.len() == num_flow_partitions`
589    ///   and per-partition failures are surfaced as inner `Err`
590    ///   entries — the call as a whole does not fail when one
591    ///   partition's FCALL fails, matching
592    ///   [`ff_sdk::admin::rotate_waitpoint_hmac_secret_all_partitions`]'s
593    ///   partial-success contract.
594    /// * Postgres impl (Wave 4) writes one row to
595    ///   `ff_waitpoint_hmac(kid, secret, rotated_at)` and returns a
596    ///   single-entry vec with `partition = 0`.
597    ///
598    /// The default impl returns
599    /// [`EngineError::Unavailable`] with
600    /// `op = "rotate_waitpoint_hmac_secret_all"` so backends that
601    /// haven't implemented the method surface the miss loudly rather
602    /// than silently no-op'ing. Both concrete backends override.
603    async fn rotate_waitpoint_hmac_secret_all(
604        &self,
605        _args: RotateWaitpointHmacSecretAllArgs,
606    ) -> Result<RotateWaitpointHmacSecretAllResult, EngineError> {
607        Err(EngineError::Unavailable {
608            op: "rotate_waitpoint_hmac_secret_all",
609        })
610    }
611
612    /// Seed the initial waitpoint HMAC secret for a fresh deployment
613    /// (issue #280).
614    ///
615    /// **Idempotent.** If a `current_kid` (Valkey per-partition) or
616    /// an active kid row (Postgres) already exists with the given
617    /// `kid`, the method returns
618    /// [`SeedOutcome::AlreadySeeded`] without overwriting, reporting
619    /// whether the stored secret matches the caller-supplied one via
620    /// `same_secret`. Callers (cairn boot, operator tooling) invoke
621    /// this on every boot and let the backend decide whether to
622    /// install — removing the client-side "check then HSET" race that
623    /// cairn's raw-HSET boot path silently tolerated.
624    ///
625    /// For rotation of an already-seeded secret, use
626    /// [`Self::rotate_waitpoint_hmac_secret_all`] instead; seed is
627    /// install-only.
628    ///
629    /// The default impl returns [`EngineError::Unavailable`] with
630    /// `op = "seed_waitpoint_hmac_secret"` so backends that haven't
631    /// implemented the method surface the miss loudly.
632    async fn seed_waitpoint_hmac_secret(
633        &self,
634        _args: SeedWaitpointHmacSecretArgs,
635    ) -> Result<SeedOutcome, EngineError> {
636        Err(EngineError::Unavailable {
637            op: "seed_waitpoint_hmac_secret",
638        })
639    }
640
641    // ── Budget ──
642
643    /// Report usage against a budget and check limits. Returns the
644    /// typed [`ReportUsageResult`] variant; backends enforce
645    /// idempotency via the caller-supplied
646    /// [`UsageDimensions::dedup_key`] (RFC-012 §R7.2.3 — replaces
647    /// the pre-Round-7 `AdmissionDecision` return).
648    async fn report_usage(
649        &self,
650        handle: &Handle,
651        budget: &BudgetId,
652        dimensions: crate::backend::UsageDimensions,
653    ) -> Result<ReportUsageResult, EngineError>;
654
655    // ── Stream reads (RFC-012 Stage 1c tranche-4; issue #87) ──
656
657    /// Read frames from a completed or in-flight attempt's stream.
658    ///
659    /// `from` / `to` are [`StreamCursor`] values — `StreamCursor::Start`
660    /// / `StreamCursor::End` are equivalent to XRANGE `-` / `+`, and
661    /// `StreamCursor::At("<id>")` reads from a concrete entry id.
662    ///
663    /// Input validation (count_limit bounds, cursor shape) is the
664    /// caller's responsibility — SDK-side wrappers in
665    /// [`ff-sdk`](https://docs.rs/ff-sdk) enforce bounds before
666    /// forwarding. Backends MAY additionally reject out-of-range
667    /// input via [`EngineError::Validation`].
668    ///
669    /// Gated on the `streaming` feature — stream reads are part of
670    /// the stream-subset surface a backend without XREAD-like
671    /// primitives may omit.
672    #[cfg(feature = "streaming")]
673    async fn read_stream(
674        &self,
675        execution_id: &ExecutionId,
676        attempt_index: AttemptIndex,
677        from: StreamCursor,
678        to: StreamCursor,
679        count_limit: u64,
680    ) -> Result<StreamFrames, EngineError>;
681
682    /// Tail a live attempt's stream.
683    ///
684    /// `after` is an exclusive [`StreamCursor`] — entries with id
685    /// strictly greater than `after` are returned. `StreamCursor::Start`
686    /// / `StreamCursor::End` are NOT accepted here; callers MUST pass
687    /// a concrete id (or `StreamCursor::from_beginning()`). The SDK
688    /// wrapper rejects the open markers before reaching the backend.
689    ///
690    /// `block_ms == 0` → non-blocking peek. `block_ms > 0` → blocks up
691    /// to that many ms for a new entry.
692    ///
693    /// `visibility` (RFC-015 §6.1) filters the returned entries by
694    /// their stored [`StreamMode`](crate::backend::StreamMode)
695    /// `mode` field. Default
696    /// [`TailVisibility::All`](crate::backend::TailVisibility::All)
697    /// preserves v1 behaviour.
698    ///
699    /// Gated on the `streaming` feature — see [`read_stream`](Self::read_stream).
700    #[cfg(feature = "streaming")]
701    async fn tail_stream(
702        &self,
703        execution_id: &ExecutionId,
704        attempt_index: AttemptIndex,
705        after: StreamCursor,
706        block_ms: u64,
707        count_limit: u64,
708        visibility: TailVisibility,
709    ) -> Result<StreamFrames, EngineError>;
710
711    /// Read the rolling summary document for an attempt (RFC-015 §6.3).
712    ///
713    /// Returns `Ok(None)` when no [`StreamMode::DurableSummary`](crate::backend::StreamMode::DurableSummary)
714    /// frame has ever been appended for the attempt. Non-blocking Hash
715    /// read; safe to call from any consumer without holding the lease.
716    ///
717    /// Gated on the `streaming` feature — summary reads are part of
718    /// the stream-subset surface.
719    #[cfg(feature = "streaming")]
720    async fn read_summary(
721        &self,
722        execution_id: &ExecutionId,
723        attempt_index: AttemptIndex,
724    ) -> Result<Option<SummaryDocument>, EngineError>;
725
726    // ── RFC-017 Stage A — Ingress (5) ──────────────────────────
727    //
728    // Every method in this block has a default impl returning
729    // `EngineError::Unavailable { op }` per RFC-017 §5.3. Concrete
730    // backends override each method with a real body. A missing
731    // override surfaces as a loud typed error at the call site rather
732    // than a silent no-op.
733
734    /// Create an execution. Ingress row 6 (RFC-017 §4). Wraps
735    /// `ff_create_execution` on Valkey; `INSERT INTO ff_execution ...`
736    /// on Postgres. The `idempotency_key` + backend-side default
737    /// `dedup_ttl_ms = 86400000` make duplicate submissions idempotent.
738    #[cfg(feature = "core")]
739    async fn create_execution(
740        &self,
741        _args: CreateExecutionArgs,
742    ) -> Result<CreateExecutionResult, EngineError> {
743        Err(EngineError::Unavailable {
744            op: "create_execution",
745        })
746    }
747
748    /// Create a flow header. Ingress row 5.
749    #[cfg(feature = "core")]
750    async fn create_flow(
751        &self,
752        _args: CreateFlowArgs,
753    ) -> Result<CreateFlowResult, EngineError> {
754        Err(EngineError::Unavailable { op: "create_flow" })
755    }
756
757    /// Atomically add an execution to a flow (single-FCALL co-located
758    /// commit on Valkey; single-transaction UPSERT on Postgres).
759    #[cfg(feature = "core")]
760    async fn add_execution_to_flow(
761        &self,
762        _args: AddExecutionToFlowArgs,
763    ) -> Result<AddExecutionToFlowResult, EngineError> {
764        Err(EngineError::Unavailable {
765            op: "add_execution_to_flow",
766        })
767    }
768
769    /// Stage a dependency edge between flow members. CAS-guarded on
770    /// `graph_revision` — stale rev returns `Contention(StaleGraphRevision)`.
771    #[cfg(feature = "core")]
772    async fn stage_dependency_edge(
773        &self,
774        _args: StageDependencyEdgeArgs,
775    ) -> Result<StageDependencyEdgeResult, EngineError> {
776        Err(EngineError::Unavailable {
777            op: "stage_dependency_edge",
778        })
779    }
780
781    /// Apply a staged dependency edge to its downstream child.
782    #[cfg(feature = "core")]
783    async fn apply_dependency_to_child(
784        &self,
785        _args: ApplyDependencyToChildArgs,
786    ) -> Result<ApplyDependencyToChildResult, EngineError> {
787        Err(EngineError::Unavailable {
788            op: "apply_dependency_to_child",
789        })
790    }
791
792    // ── RFC-017 Stage A — Operator control (4) ─────────────────
793
794    /// Operator-initiated execution cancel (row 2).
795    #[cfg(feature = "core")]
796    async fn cancel_execution(
797        &self,
798        _args: CancelExecutionArgs,
799    ) -> Result<CancelExecutionResult, EngineError> {
800        Err(EngineError::Unavailable {
801            op: "cancel_execution",
802        })
803    }
804
805    /// Re-score an execution's eligibility priority (row 17).
806    #[cfg(feature = "core")]
807    async fn change_priority(
808        &self,
809        _args: ChangePriorityArgs,
810    ) -> Result<ChangePriorityResult, EngineError> {
811        Err(EngineError::Unavailable {
812            op: "change_priority",
813        })
814    }
815
816    /// Replay a terminal execution (row 22). Variadic KEYS handling
817    /// (inbound-edge pre-read) is hidden inside the Valkey impl per
818    /// RFC-017 §4 row 3.
819    #[cfg(feature = "core")]
820    async fn replay_execution(
821        &self,
822        _args: ReplayExecutionArgs,
823    ) -> Result<ReplayExecutionResult, EngineError> {
824        Err(EngineError::Unavailable {
825            op: "replay_execution",
826        })
827    }
828
829    /// Operator-initiated lease revoke (row 19).
830    #[cfg(feature = "core")]
831    async fn revoke_lease(
832        &self,
833        _args: RevokeLeaseArgs,
834    ) -> Result<RevokeLeaseResult, EngineError> {
835        Err(EngineError::Unavailable { op: "revoke_lease" })
836    }
837
838    // ── RFC-017 Stage A — Budget + quota admin (5) ─────────────
839
840    /// Create a budget definition (row 6).
841    #[cfg(feature = "core")]
842    async fn create_budget(
843        &self,
844        _args: CreateBudgetArgs,
845    ) -> Result<CreateBudgetResult, EngineError> {
846        Err(EngineError::Unavailable {
847            op: "create_budget",
848        })
849    }
850
851    /// Reset a budget's usage counters (row 10).
852    #[cfg(feature = "core")]
853    async fn reset_budget(
854        &self,
855        _args: ResetBudgetArgs,
856    ) -> Result<ResetBudgetResult, EngineError> {
857        Err(EngineError::Unavailable { op: "reset_budget" })
858    }
859
860    /// Create a quota policy (row 7).
861    #[cfg(feature = "core")]
862    async fn create_quota_policy(
863        &self,
864        _args: CreateQuotaPolicyArgs,
865    ) -> Result<CreateQuotaPolicyResult, EngineError> {
866        Err(EngineError::Unavailable {
867            op: "create_quota_policy",
868        })
869    }
870
871    /// Read-only budget status for operator visibility (row 8).
872    #[cfg(feature = "core")]
873    async fn get_budget_status(
874        &self,
875        _id: &BudgetId,
876    ) -> Result<BudgetStatus, EngineError> {
877        Err(EngineError::Unavailable {
878            op: "get_budget_status",
879        })
880    }
881
882    /// Admin-path `report_usage` (row 9 + RFC-017 §5 round-1 F4).
883    /// Distinct from the existing [`Self::report_usage`] which takes
884    /// a worker handle — the admin path has no lease context.
885    #[cfg(feature = "core")]
886    async fn report_usage_admin(
887        &self,
888        _budget: &BudgetId,
889        _args: ReportUsageAdminArgs,
890    ) -> Result<ReportUsageResult, EngineError> {
891        Err(EngineError::Unavailable {
892            op: "report_usage_admin",
893        })
894    }
895
896    // ── RFC-017 Stage A — Read + diagnostics (3) ───────────────
897
898    /// Fetch the stored result payload for a completed execution
899    /// (row 4). Returns `Ok(None)` when the execution is missing, not
900    /// yet complete, or its payload was trimmed by retention policy.
901    async fn get_execution_result(
902        &self,
903        _id: &ExecutionId,
904    ) -> Result<Option<Vec<u8>>, EngineError> {
905        Err(EngineError::Unavailable {
906            op: "get_execution_result",
907        })
908    }
909
910    /// List the pending-or-active waitpoints for an execution, cursor
911    /// paginated (row 5 / §8). Stage A preserves the existing
912    /// `PendingWaitpointInfo` shape; Stage D ships the §8 HMAC
913    /// sanitisation + `(token_kid, token_fingerprint)` schema.
914    #[cfg(feature = "core")]
915    async fn list_pending_waitpoints(
916        &self,
917        _args: ListPendingWaitpointsArgs,
918    ) -> Result<ListPendingWaitpointsResult, EngineError> {
919        Err(EngineError::Unavailable {
920            op: "list_pending_waitpoints",
921        })
922    }
923
924    /// Backend-level reachability probe (row 1). Valkey: `PING`;
925    /// Postgres: `SELECT 1`.
926    async fn ping(&self) -> Result<(), EngineError> {
927        Err(EngineError::Unavailable { op: "ping" })
928    }
929
930    // ── RFC-017 Stage A — Scheduling (1) ───────────────────────
931
932    /// Scheduler-routed claim entrypoint (row 18, RFC-017 §7). Valkey
933    /// forwards to its `ff_scheduler::Scheduler` cursor; Postgres
934    /// forwards to `PostgresScheduler`'s `FOR UPDATE SKIP LOCKED`
935    /// path.
936    ///
937    /// Backends that carry an embedded scheduler (e.g. `ValkeyBackend`
938    /// constructed via `with_embedded_scheduler`, or `PostgresBackend`
939    /// with its `with_scanners` sibling) route the claim through it.
940    /// Backends without a wired scheduler return
941    /// [`EngineError::Unavailable`]. HTTP consumers use
942    /// `FlowFabricWorker::claim_via_server` instead.
943    #[cfg(feature = "core")]
944    async fn claim_for_worker(
945        &self,
946        _args: ClaimForWorkerArgs,
947    ) -> Result<ClaimForWorkerOutcome, EngineError> {
948        Err(EngineError::Unavailable {
949            op: "claim_for_worker",
950        })
951    }
952
953    // ── Cross-cutting (RFC-017 Stage B trait-lift) ──────────────
954
955    /// Static observability label identifying the backend family in
956    /// logs + metrics (RFC-017 §5.4 + §9 Stage B). Default impl
957    /// returns `"unknown"` so legacy `impl EngineBackend` blocks that
958    /// have not upgraded keep compiling; every in-tree backend
959    /// overrides — `ValkeyBackend` → `"valkey"`, `PostgresBackend` →
960    /// `"postgres"`.
961    fn backend_label(&self) -> &'static str {
962        "unknown"
963    }
964
965    /// RFC-018 Stage A: snapshot of this backend's identity + the
966    /// flat `Supports` surface it can actually service. Consumers use
967    /// this at startup to gate UI features / choose between alternative
968    /// code paths before dispatching. See
969    /// `rfcs/RFC-018-backend-capability-discovery.md` for the full
970    /// discovery contract and the four owner-adjudicated open
971    /// questions (granularity: coarse; version: struct; sync; no
972    /// event stream).
973    ///
974    /// Default: returns a value tagged `family = "unknown"` with every
975    /// `supports.*` bool `false`, so pre-RFC-018 out-of-tree backends
976    /// keep compiling and consumers treat "all false" as "dispatch
977    /// and catch [`EngineError::Unavailable`]" (pre-RFC-018 behaviour).
978    /// Concrete in-tree backends (`ValkeyBackend`, `PostgresBackend`)
979    /// override to populate a real value.
980    ///
981    /// Sync (no `.await`): backend-static info should not require a
982    /// probe on every query. Dynamic probes happen once at
983    /// `connect*` time and cache the result.
984    fn capabilities(&self) -> crate::capability::Capabilities {
985        crate::capability::Capabilities::new(
986            crate::capability::BackendIdentity::new(
987                "unknown",
988                crate::capability::Version::new(0, 0, 0),
989                "unknown",
990            ),
991            crate::capability::Supports::none(),
992        )
993    }
994
995    /// Issue #281: run one-time backend-specific boot preparation.
996    ///
997    /// Intended to run ONCE per deployment startup — NOT per request.
998    /// Idempotent and safe for consumers to call on every application
999    /// boot; backends that have nothing to do return
1000    /// [`PrepareOutcome::NoOp`] without side effects.
1001    ///
1002    /// Per-backend behaviour:
1003    ///
1004    /// * **Valkey** — issues `FUNCTION LOAD REPLACE` for the
1005    ///   `flowfabric` Lua library (with bounded retry on transient
1006    ///   transport faults; permanent compile errors surface as
1007    ///   [`EngineError::Transport`] without retry). Returns
1008    ///   [`PrepareOutcome::Applied`] carrying
1009    ///   `"FUNCTION LOAD (flowfabric lib v<N>)"`.
1010    /// * **Postgres** — returns [`PrepareOutcome::NoOp`]. Schema
1011    ///   migrations are applied out-of-band per
1012    ///   `rfcs/drafts/v0.7-migration-master.md §Q12`; the backend
1013    ///   runs a schema-version check at connect time and refuses to
1014    ///   start on mismatch, so no boot-side prepare work remains.
1015    /// * **Default impl** — returns [`PrepareOutcome::NoOp`] so
1016    ///   out-of-tree backends without preparation work compile
1017    ///   without boilerplate.
1018    ///
1019    /// # Relationship to the in-tree boot path
1020    ///
1021    /// `ValkeyBackend::initialize_deployment` (called from
1022    /// `Server::start_with_metrics`) already invokes
1023    /// [`ensure_library`](ff_script::loader::ensure_library) inline as
1024    /// its step 4; that path is unchanged. `prepare()` exists as a
1025    /// **trait-surface entry point** so consumers that construct an
1026    /// `Arc<dyn EngineBackend>` outside of `Server` (e.g.
1027    /// cairn-fabric's boot path at `cairn-fabric/src/boot.rs`) can
1028    /// run the same preparation without reaching into
1029    /// backend-specific modules. The overlap is intentional: calling
1030    /// both `prepare()` and `initialize_deployment` is safe because
1031    /// `FUNCTION LOAD REPLACE` is idempotent under the version
1032    /// check.
1033    ///
1034    /// # Layer forwarding
1035    ///
1036    /// Layer impls (`HookedBackend`, ff-sdk layers) do NOT forward
1037    /// `prepare` today — consistent with `backend_label` / `ping` /
1038    /// `shutdown_prepare`. Consumers that wrap a backend in layers
1039    /// MUST call `prepare()` on the raw backend before wrapping, or
1040    /// accept the default [`PrepareOutcome::NoOp`].
1041    async fn prepare(&self) -> Result<PrepareOutcome, EngineError> {
1042        Ok(PrepareOutcome::NoOp)
1043    }
1044
1045    /// Drain-before-shutdown hook (RFC-017 §5.4). The server calls
1046    /// this before draining its own background tasks so backend-
1047    /// scoped primitives (Valkey stream semaphore, Postgres sqlx
1048    /// pool, …) can close their gates and await in-flight work up to
1049    /// `grace`.
1050    ///
1051    /// Default impl returns `Ok(())` — a no-op backend has nothing
1052    /// backend-scoped to drain. Concrete backends whose data plane
1053    /// owns resources (connection pools, semaphores, listeners)
1054    /// override with a real body.
1055    async fn shutdown_prepare(&self, _grace: Duration) -> Result<(), EngineError> {
1056        Ok(())
1057    }
1058
1059    // ── RFC-017 Stage E2 — `Server::client` removal (header + reads) ───
1060
1061    /// RFC-017 Stage E2: the "header" portion of `cancel_flow` — run the
1062    /// atomic flow-state flip (Valkey: `ff_cancel_flow` FCALL; Postgres:
1063    /// `cancel_flow_once` tx), decode policy + membership, and surface
1064    /// the `flow_already_terminal` idempotency branch as a first-class
1065    /// [`CancelFlowHeader::AlreadyTerminal`] so the Server can build
1066    /// the wire [`CancelFlowResult`] without reaching for a raw
1067    /// `Client`. Separate from the existing
1068    /// [`EngineBackend::cancel_flow`] entry point (which takes the
1069    /// enum-typed `(policy, wait)` split and returns the wait-collapsed
1070    /// `CancelFlowResult`) because the Server owns its own
1071    /// wait-dispatch + member-cancel machinery via
1072    /// [`EngineBackend::cancel_execution`] + backlog ack.
1073    ///
1074    /// Default impl returns [`EngineError::Unavailable`] so un-migrated
1075    /// backends surface the miss loudly.
1076    #[cfg(feature = "core")]
1077    async fn cancel_flow_header(
1078        &self,
1079        _args: CancelFlowArgs,
1080    ) -> Result<crate::contracts::CancelFlowHeader, EngineError> {
1081        Err(EngineError::Unavailable {
1082            op: "cancel_flow_header",
1083        })
1084    }
1085
1086    /// RFC-017 Stage E2: best-effort acknowledgement that one member of
1087    /// a `cancel_all` flow has completed its per-member cancel. Drains
1088    /// the member from the flow's `pending_cancels` set and, if empty,
1089    /// removes the flow from the partition-level `cancel_backlog`
1090    /// (Valkey: `ff_ack_cancel_member` FCALL; Postgres: table write —
1091    /// default `Unavailable` until Wave 9).
1092    ///
1093    /// Failures are swallowed by the caller — the cancel-backlog
1094    /// reconciler is the authoritative drain — but a typed error here
1095    /// lets the caller log a backend-scoped context string.
1096    #[cfg(feature = "core")]
1097    async fn ack_cancel_member(
1098        &self,
1099        _flow_id: &FlowId,
1100        _execution_id: &ExecutionId,
1101    ) -> Result<(), EngineError> {
1102        Err(EngineError::Unavailable {
1103            op: "ack_cancel_member",
1104        })
1105    }
1106
1107    /// RFC-017 Stage E2: full-shape execution read used by the
1108    /// `GET /v1/executions/{id}` HTTP route. Returns the legacy
1109    /// [`ExecutionInfo`] wire shape (not the decoupled
1110    /// [`ExecutionSnapshot`]) so the existing HTTP response bytes stay
1111    /// identical across the migration.
1112    ///
1113    /// `Ok(None)` ⇒ no such execution. Default `Unavailable` because
1114    /// the Valkey HGETALL-and-parse is backend-specific.
1115    #[cfg(feature = "core")]
1116    async fn read_execution_info(
1117        &self,
1118        _id: &ExecutionId,
1119    ) -> Result<Option<ExecutionInfo>, EngineError> {
1120        Err(EngineError::Unavailable {
1121            op: "read_execution_info",
1122        })
1123    }
1124
1125    /// RFC-017 Stage E2: narrow `public_state` read used by the
1126    /// `GET /v1/executions/{id}/state` HTTP route. Returns `Ok(None)`
1127    /// when the execution is missing. Default `Unavailable`.
1128    #[cfg(feature = "core")]
1129    async fn read_execution_state(
1130        &self,
1131        _id: &ExecutionId,
1132    ) -> Result<Option<PublicState>, EngineError> {
1133        Err(EngineError::Unavailable {
1134            op: "read_execution_state",
1135        })
1136    }
1137
1138    // ── RFC-019 Stage A/B/C — Stream-cursor subscriptions ─────────
1139    //
1140    // Four owner-adjudicated families (RFC-019 §Open Questions #5):
1141    // `lease_history`, `completion`, `signal_delivery`,
1142    // `instance_tags`. Stage C (this crate) promotes each family to
1143    // a typed event enum; consumers `match` on variants instead of
1144    // parsing a backend-shaped byte blob.
1145    //
1146    // Each method returns a family-specific subscription alias (see
1147    // [`crate::stream_events`]). All defaults return
1148    // `EngineError::Unavailable` per RFC-017 trait-growth conventions.
1149
1150    /// Subscribe to lease lifecycle events (acquired / renewed /
1151    /// expired / reclaimed / revoked) for the partition this backend
1152    /// is configured with.
1153    ///
1154    /// Cross-partition fan-out is consumer-side merge: subscribe
1155    /// per-partition backend instance and interleave on the read
1156    /// side. Yields
1157    /// `Err(EngineError::StreamDisconnected { cursor })` on backend
1158    /// disconnect; resume by calling this method again with the
1159    /// returned cursor.
1160    ///
1161    /// `filter` (#282): when `filter.instance_tag` is `Some((k, v))`,
1162    /// only events whose execution carries tag `k = v` are yielded
1163    /// (matching the [`crate::backend::ScannerFilter`] surface from
1164    /// #122). Pass `&ScannerFilter::default()` for unfiltered
1165    /// behaviour. Filtering happens inside the backend stream; the
1166    /// [`crate::stream_events::LeaseHistorySubscription`] return type
1167    /// is unchanged.
1168    async fn subscribe_lease_history(
1169        &self,
1170        _cursor: crate::stream_subscribe::StreamCursor,
1171        _filter: &crate::backend::ScannerFilter,
1172    ) -> Result<crate::stream_events::LeaseHistorySubscription, EngineError> {
1173        Err(EngineError::Unavailable {
1174            op: "subscribe_lease_history",
1175        })
1176    }
1177
1178    /// Subscribe to completion events (terminal state transitions).
1179    ///
1180    /// - **Postgres**: wraps the `ff_completion_event` outbox +
1181    ///   LISTEN/NOTIFY machinery. Durable via event-id cursor.
1182    /// - **Valkey**: wraps the RESP3 `ff:dag:completions` pubsub
1183    ///   subscriber. Pubsub is at-most-once over the live
1184    ///   subscription window; the cursor is always the empty
1185    ///   sentinel. If you need at-least-once replay with durable
1186    ///   cursor resume, use the Postgres backend (see
1187    ///   `docs/POSTGRES_PARITY_MATRIX.md` row `subscribe_completion`).
1188    ///
1189    /// `filter` (#282): see [`Self::subscribe_lease_history`]. Valkey
1190    /// reuses the `subscribe_completions_filtered` per-event HGET
1191    /// gate; Postgres filters inline against the outbox's denormalised
1192    /// `instance_tag` column.
1193    async fn subscribe_completion(
1194        &self,
1195        _cursor: crate::stream_subscribe::StreamCursor,
1196        _filter: &crate::backend::ScannerFilter,
1197    ) -> Result<crate::stream_events::CompletionSubscription, EngineError> {
1198        Err(EngineError::Unavailable {
1199            op: "subscribe_completion",
1200        })
1201    }
1202
1203    /// Subscribe to signal-delivery events (satisfied / buffered /
1204    /// deduped).
1205    ///
1206    /// `filter` (#282): see [`Self::subscribe_lease_history`].
1207    async fn subscribe_signal_delivery(
1208        &self,
1209        _cursor: crate::stream_subscribe::StreamCursor,
1210        _filter: &crate::backend::ScannerFilter,
1211    ) -> Result<crate::stream_events::SignalDeliverySubscription, EngineError> {
1212        Err(EngineError::Unavailable {
1213            op: "subscribe_signal_delivery",
1214        })
1215    }
1216
1217    /// Subscribe to instance-tag events (tag attached / cleared).
1218    ///
1219    /// Producer wiring is deferred per #311 audit ("no concrete
1220    /// demand"); the trait method exists for API uniformity across
1221    /// the four families. Backends currently return
1222    /// `EngineError::Unavailable`.
1223    async fn subscribe_instance_tags(
1224        &self,
1225        _cursor: crate::stream_subscribe::StreamCursor,
1226    ) -> Result<crate::stream_events::InstanceTagSubscription, EngineError> {
1227        Err(EngineError::Unavailable {
1228            op: "subscribe_instance_tags",
1229        })
1230    }
1231}
1232
1233/// Object-safety assertion: `dyn EngineBackend` compiles iff every
1234/// method is dyn-compatible. Kept as a compile-time guard so a future
1235/// trait change that accidentally breaks dyn-safety fails the build
1236/// at this site rather than at every downstream `Arc<dyn
1237/// EngineBackend>` use.
1238#[allow(dead_code)]
1239fn _assert_dyn_compatible(_: &dyn EngineBackend) {}
1240
1241/// Polling interval for [`wait_for_flow_cancellation`]. Tight enough
1242/// that a local single-node cancel cascade observes `cancelled` within
1243/// one or two polls; slack enough that a `WaitIndefinite` caller does
1244/// not hammer `describe_flow` on a live cluster.
1245const CANCEL_WAIT_POLL_INTERVAL: Duration = Duration::from_millis(100);
1246
1247/// Defensive ceiling for [`CancelFlowWait::WaitIndefinite`] — if the
1248/// reconciler cascade has not converged in five minutes, something is
1249/// wedged and returning `Timeout` is strictly more useful than blocking
1250/// forever. RFC-012 §3.1.1 expects real-world cascades to finish within
1251/// `reconciler_interval + grace`, which is orders of magnitude below
1252/// this.
1253const CANCEL_WAIT_INDEFINITE_CEILING: Duration = Duration::from_secs(300);
1254
1255/// Poll `backend.describe_flow(flow_id)` until `public_flow_state` is
1256/// `"cancelled"` or `deadline` elapses.
1257///
1258/// Shared by every backend's `cancel_flow` trait impl that honours
1259/// [`CancelFlowWait::WaitTimeout`] / [`CancelFlowWait::WaitIndefinite`].
1260/// The underlying `cancel_flow` FCALL / SQL transaction flips the
1261/// flow-level state synchronously; member cancellations dispatch
1262/// asynchronously via the reconciler, which also flips
1263/// `public_flow_state` to `cancelled` once the cascade completes. This
1264/// helper waits for that terminal flip.
1265///
1266/// Returns:
1267/// * `Ok(())` once `public_flow_state = "cancelled"` is observed.
1268/// * `Err(EngineError::Timeout { op: "cancel_flow", elapsed })` when
1269///   `deadline` elapses first. `elapsed` is the wait budget (the
1270///   requested timeout), not wall-clock precision.
1271/// * `Err(e)` if `describe_flow` itself errors (propagated).
1272pub async fn wait_for_flow_cancellation<B: EngineBackend + ?Sized>(
1273    backend: &B,
1274    flow_id: &crate::types::FlowId,
1275    deadline: Duration,
1276) -> Result<(), EngineError> {
1277    let start = std::time::Instant::now();
1278    loop {
1279        match backend.describe_flow(flow_id).await? {
1280            Some(snap) if snap.public_flow_state == "cancelled" => return Ok(()),
1281            // `None` (flow removed) is also terminal from the caller's
1282            // perspective — nothing left to wait on.
1283            None => return Ok(()),
1284            Some(_) => {}
1285        }
1286        if start.elapsed() >= deadline {
1287            return Err(EngineError::Timeout {
1288                op: "cancel_flow",
1289                elapsed: deadline,
1290            });
1291        }
1292        tokio::time::sleep(CANCEL_WAIT_POLL_INTERVAL).await;
1293    }
1294}
1295
1296/// Convert a [`CancelFlowWait`] into the deadline passed to
1297/// [`wait_for_flow_cancellation`]. `NoWait` returns `None` — the caller
1298/// must skip the wait entirely.
1299pub fn cancel_flow_wait_deadline(wait: CancelFlowWait) -> Option<Duration> {
1300    // `CancelFlowWait` is `#[non_exhaustive]`; this match lives in the
1301    // defining crate so the exhaustiveness check keeps the compiler
1302    // honest. Future variants must be wired here explicitly.
1303    match wait {
1304        CancelFlowWait::NoWait => None,
1305        CancelFlowWait::WaitTimeout(d) => Some(d),
1306        CancelFlowWait::WaitIndefinite => Some(CANCEL_WAIT_INDEFINITE_CEILING),
1307    }
1308}
1309
1310#[cfg(test)]
1311mod tests {
1312    use super::*;
1313
1314    /// A zero-state backend stub used to exercise the default
1315    /// `capabilities()` impl without pulling in a real
1316    /// transport. Only the default method is under test here; every
1317    /// other method is unreachable on this type.
1318    struct DefaultBackend;
1319
1320    #[async_trait]
1321    impl EngineBackend for DefaultBackend {
1322        async fn claim(
1323            &self,
1324            _lane: &LaneId,
1325            _capabilities: &CapabilitySet,
1326            _policy: ClaimPolicy,
1327        ) -> Result<Option<Handle>, EngineError> {
1328            unreachable!()
1329        }
1330        async fn renew(&self, _handle: &Handle) -> Result<LeaseRenewal, EngineError> {
1331            unreachable!()
1332        }
1333        async fn progress(
1334            &self,
1335            _handle: &Handle,
1336            _percent: Option<u8>,
1337            _message: Option<String>,
1338        ) -> Result<(), EngineError> {
1339            unreachable!()
1340        }
1341        async fn append_frame(
1342            &self,
1343            _handle: &Handle,
1344            _frame: Frame,
1345        ) -> Result<AppendFrameOutcome, EngineError> {
1346            unreachable!()
1347        }
1348        async fn complete(
1349            &self,
1350            _handle: &Handle,
1351            _payload: Option<Vec<u8>>,
1352        ) -> Result<(), EngineError> {
1353            unreachable!()
1354        }
1355        async fn fail(
1356            &self,
1357            _handle: &Handle,
1358            _reason: FailureReason,
1359            _classification: FailureClass,
1360        ) -> Result<FailOutcome, EngineError> {
1361            unreachable!()
1362        }
1363        async fn cancel(&self, _handle: &Handle, _reason: &str) -> Result<(), EngineError> {
1364            unreachable!()
1365        }
1366        async fn suspend(
1367            &self,
1368            _handle: &Handle,
1369            _args: SuspendArgs,
1370        ) -> Result<SuspendOutcome, EngineError> {
1371            unreachable!()
1372        }
1373        async fn create_waitpoint(
1374            &self,
1375            _handle: &Handle,
1376            _waitpoint_key: &str,
1377            _expires_in: Duration,
1378        ) -> Result<PendingWaitpoint, EngineError> {
1379            unreachable!()
1380        }
1381        async fn observe_signals(
1382            &self,
1383            _handle: &Handle,
1384        ) -> Result<Vec<ResumeSignal>, EngineError> {
1385            unreachable!()
1386        }
1387        async fn claim_from_reclaim(
1388            &self,
1389            _token: ReclaimToken,
1390        ) -> Result<Option<Handle>, EngineError> {
1391            unreachable!()
1392        }
1393        async fn delay(
1394            &self,
1395            _handle: &Handle,
1396            _delay_until: TimestampMs,
1397        ) -> Result<(), EngineError> {
1398            unreachable!()
1399        }
1400        async fn wait_children(&self, _handle: &Handle) -> Result<(), EngineError> {
1401            unreachable!()
1402        }
1403        async fn describe_execution(
1404            &self,
1405            _id: &ExecutionId,
1406        ) -> Result<Option<ExecutionSnapshot>, EngineError> {
1407            unreachable!()
1408        }
1409        async fn describe_flow(
1410            &self,
1411            _id: &FlowId,
1412        ) -> Result<Option<FlowSnapshot>, EngineError> {
1413            unreachable!()
1414        }
1415        #[cfg(feature = "core")]
1416        async fn list_edges(
1417            &self,
1418            _flow_id: &FlowId,
1419            _direction: EdgeDirection,
1420        ) -> Result<Vec<EdgeSnapshot>, EngineError> {
1421            unreachable!()
1422        }
1423        #[cfg(feature = "core")]
1424        async fn describe_edge(
1425            &self,
1426            _flow_id: &FlowId,
1427            _edge_id: &EdgeId,
1428        ) -> Result<Option<EdgeSnapshot>, EngineError> {
1429            unreachable!()
1430        }
1431        #[cfg(feature = "core")]
1432        async fn resolve_execution_flow_id(
1433            &self,
1434            _eid: &ExecutionId,
1435        ) -> Result<Option<FlowId>, EngineError> {
1436            unreachable!()
1437        }
1438        #[cfg(feature = "core")]
1439        async fn list_flows(
1440            &self,
1441            _partition: PartitionKey,
1442            _cursor: Option<FlowId>,
1443            _limit: usize,
1444        ) -> Result<ListFlowsPage, EngineError> {
1445            unreachable!()
1446        }
1447        #[cfg(feature = "core")]
1448        async fn list_lanes(
1449            &self,
1450            _cursor: Option<LaneId>,
1451            _limit: usize,
1452        ) -> Result<ListLanesPage, EngineError> {
1453            unreachable!()
1454        }
1455        #[cfg(feature = "core")]
1456        async fn list_suspended(
1457            &self,
1458            _partition: PartitionKey,
1459            _cursor: Option<ExecutionId>,
1460            _limit: usize,
1461        ) -> Result<ListSuspendedPage, EngineError> {
1462            unreachable!()
1463        }
1464        #[cfg(feature = "core")]
1465        async fn list_executions(
1466            &self,
1467            _partition: PartitionKey,
1468            _cursor: Option<ExecutionId>,
1469            _limit: usize,
1470        ) -> Result<ListExecutionsPage, EngineError> {
1471            unreachable!()
1472        }
1473        #[cfg(feature = "core")]
1474        async fn deliver_signal(
1475            &self,
1476            _args: DeliverSignalArgs,
1477        ) -> Result<DeliverSignalResult, EngineError> {
1478            unreachable!()
1479        }
1480        #[cfg(feature = "core")]
1481        async fn claim_resumed_execution(
1482            &self,
1483            _args: ClaimResumedExecutionArgs,
1484        ) -> Result<ClaimResumedExecutionResult, EngineError> {
1485            unreachable!()
1486        }
1487        async fn cancel_flow(
1488            &self,
1489            _id: &FlowId,
1490            _policy: CancelFlowPolicy,
1491            _wait: CancelFlowWait,
1492        ) -> Result<CancelFlowResult, EngineError> {
1493            unreachable!()
1494        }
1495        #[cfg(feature = "core")]
1496        async fn set_edge_group_policy(
1497            &self,
1498            _flow_id: &FlowId,
1499            _downstream_execution_id: &ExecutionId,
1500            _policy: crate::contracts::EdgeDependencyPolicy,
1501        ) -> Result<crate::contracts::SetEdgeGroupPolicyResult, EngineError> {
1502            unreachable!()
1503        }
1504        async fn report_usage(
1505            &self,
1506            _handle: &Handle,
1507            _budget: &BudgetId,
1508            _dimensions: crate::backend::UsageDimensions,
1509        ) -> Result<ReportUsageResult, EngineError> {
1510            unreachable!()
1511        }
1512        #[cfg(feature = "streaming")]
1513        async fn read_stream(
1514            &self,
1515            _execution_id: &ExecutionId,
1516            _attempt_index: AttemptIndex,
1517            _from: StreamCursor,
1518            _to: StreamCursor,
1519            _count_limit: u64,
1520        ) -> Result<StreamFrames, EngineError> {
1521            unreachable!()
1522        }
1523        #[cfg(feature = "streaming")]
1524        async fn tail_stream(
1525            &self,
1526            _execution_id: &ExecutionId,
1527            _attempt_index: AttemptIndex,
1528            _after: StreamCursor,
1529            _block_ms: u64,
1530            _count_limit: u64,
1531            _visibility: TailVisibility,
1532        ) -> Result<StreamFrames, EngineError> {
1533            unreachable!()
1534        }
1535        #[cfg(feature = "streaming")]
1536        async fn read_summary(
1537            &self,
1538            _execution_id: &ExecutionId,
1539            _attempt_index: AttemptIndex,
1540        ) -> Result<Option<SummaryDocument>, EngineError> {
1541            unreachable!()
1542        }
1543    }
1544
1545    /// The default `capabilities()` impl returns a value tagged
1546    /// `family = "unknown"` with every `supports.*` bool false, so
1547    /// pre-RFC-018 out-of-tree backends keep compiling and consumers
1548    /// can distinguish "backend predates RFC-018" from "backend
1549    /// reports concrete bools." Every concrete in-tree backend
1550    /// overrides.
1551    #[test]
1552    fn default_capabilities_is_unknown_family_all_false() {
1553        let b = DefaultBackend;
1554        let caps = b.capabilities();
1555        assert_eq!(caps.identity.family, "unknown");
1556        assert_eq!(
1557            caps.identity.version,
1558            crate::capability::Version::new(0, 0, 0)
1559        );
1560        assert_eq!(caps.identity.rfc017_stage, "unknown");
1561        // Every field false on the default (matches `Supports::none()`).
1562        assert_eq!(caps.supports, crate::capability::Supports::none());
1563    }
1564}