Skip to main content

ff_core/
engine_backend.rs

1//! The `EngineBackend` trait — abstracting FlowFabric's write surface.
2//!
3//! **RFC-012 Stage 1a:** this is the trait landing. The
4//! Valkey-backed impl lives in `ff-backend-valkey`; future backends
5//! (Postgres) add a sibling crate with their own impl. ff-sdk's
6//! `FlowFabricWorker` gains `connect_with(backend)` /
7//! `backend(&self)` accessors so consumers that want to bring their
8//! own backend (tests, future non-Valkey deployments) can hand one
9//! in. The hot-path migration of `ClaimedTask` / `FlowFabricWorker`
10//! to forward through the trait lands across Stages 1b-1d.
11//!
12//! # Object safety
13//!
14//! `EngineBackend` is object-safe: all methods are `async fn` behind
15//! `#[async_trait]` and take `&self`. Consumers can hold
16//! `Arc<dyn EngineBackend>` for heterogenous-backend deployments.
17//! The trait is `Send + Sync + 'static` per RFC-012 §4.1; every impl
18//! must honour that bound.
19//!
20//! # Error surface
21//!
22//! Every method returns [`Result<_, EngineError>`]. `EngineError`'s
23//! `Transport` variant carries a boxed `dyn Error + Send + Sync`;
24//! Valkey-backed transport faults box a
25//! `ff_script::error::ScriptError` (downcast via
26//! `ff_script::engine_error_ext::transport_script_ref`). Other
27//! backends box their native error type and set the `backend` tag
28//! accordingly.
29//!
30//! # Atomicity contract
31//!
32//! Per-op state transitions MUST be atomic (RFC-012 §3.4). On Valkey
33//! this is the single-FCALL-per-op property; on Postgres it is the
34//! per-transaction property. A backend that cannot honour atomicity
35//! for a given op either MUST NOT implement `EngineBackend` or MUST
36//! return `EngineError::Unavailable { op }` for the affected method.
37//!
38//! # Replay semantics
39//!
40//! `complete`, `fail`, `cancel`, `suspend`, `delay`, `wait_children`
41//! are idempotent under replay — calling twice with the same handle
42//! and args returns the same outcome (success on first call, typed
43//! `State` / `Contention` on subsequent calls where the fence triple
44//! no longer matches a live lease).
45
46use std::time::Duration;
47
48use async_trait::async_trait;
49
50use crate::backend::{
51    AppendFrameOutcome, CancelFlowPolicy, CancelFlowWait, CapabilitySet, ClaimPolicy,
52    FailOutcome, FailureClass, FailureReason, Frame, Handle, LeaseRenewal, PendingWaitpoint,
53    ReclaimToken, ResumeSignal, SummaryDocument, TailVisibility,
54};
55use crate::contracts::{
56    CancelFlowResult, ExecutionSnapshot, FlowSnapshot, ReportUsageResult,
57    RotateWaitpointHmacSecretAllArgs, RotateWaitpointHmacSecretAllResult, SuspendArgs,
58    SuspendOutcome,
59};
60#[cfg(feature = "core")]
61use crate::contracts::{
62    AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
63    ApplyDependencyToChildResult, BudgetStatus, CancelExecutionArgs, CancelExecutionResult,
64    CancelFlowArgs, ChangePriorityArgs, ChangePriorityResult, ClaimForWorkerArgs,
65    ClaimForWorkerOutcome, ClaimResumedExecutionArgs, ClaimResumedExecutionResult,
66    CreateBudgetArgs, CreateBudgetResult, CreateExecutionArgs, CreateExecutionResult,
67    CreateFlowArgs, CreateFlowResult, CreateQuotaPolicyArgs, CreateQuotaPolicyResult,
68    DeliverSignalArgs, DeliverSignalResult, EdgeDirection, EdgeSnapshot, ExecutionInfo,
69    ListExecutionsPage, ListFlowsPage, ListLanesPage, ListPendingWaitpointsArgs,
70    ListPendingWaitpointsResult, ListSuspendedPage, ReplayExecutionArgs, ReplayExecutionResult,
71    ReportUsageAdminArgs, ResetBudgetArgs, ResetBudgetResult, RevokeLeaseArgs, RevokeLeaseResult,
72    StageDependencyEdgeArgs, StageDependencyEdgeResult,
73};
74#[cfg(feature = "core")]
75use crate::state::PublicState;
76#[cfg(feature = "core")]
77use crate::partition::PartitionKey;
78#[cfg(feature = "streaming")]
79use crate::contracts::{StreamCursor, StreamFrames};
80use crate::engine_error::EngineError;
81#[cfg(feature = "streaming")]
82use crate::types::AttemptIndex;
83#[cfg(feature = "core")]
84use crate::types::EdgeId;
85use crate::types::{BudgetId, ExecutionId, FlowId, LaneId, TimestampMs};
86
87/// The engine write surface — a single trait a backend implementation
88/// honours to serve a `FlowFabricWorker`.
89///
90/// See RFC-012 §3.1 for the inventory rationale and §3.3 for the
91/// type-level shape. 16 methods (Round-7 added `create_waitpoint`;
92/// `append_frame` return widened; `report_usage` return replaced —
93/// RFC-012 §R7). Issue #150 added the two trigger-surface methods
94/// (`deliver_signal` / `claim_resumed_execution`).
95///
96/// # Note on `complete` payload shape
97///
98/// The RFC §3.3 sketch uses `Option<Bytes>`; the Stage 1a trait uses
99/// `Option<Vec<u8>>` to match the existing
100/// `ff_sdk::ClaimedTask::complete` signature and avoid adding a
101/// `bytes` public-type dep for zero consumer benefit. Round-4 §7.17
102/// resolved the payload container debate to `Box<[u8]>` in the
103/// public type (see `HandleOpaque`); `Option<Vec<u8>>` is the
104/// zero-churn choice consistent with today's code. Consumers that
105/// need `&[u8]` can borrow via `.as_deref()` on the Option.
106#[async_trait]
107pub trait EngineBackend: Send + Sync + 'static {
108    // ── Claim + lifecycle ──
109
110    /// Fresh-work claim. Returns `Ok(None)` when no work is currently
111    /// available; `Err` only on transport or input-validation faults.
112    async fn claim(
113        &self,
114        lane: &LaneId,
115        capabilities: &CapabilitySet,
116        policy: ClaimPolicy,
117    ) -> Result<Option<Handle>, EngineError>;
118
119    /// Renew a held lease. Returns the updated expiry + epoch on
120    /// success; typed `State::StaleLease` / `State::LeaseExpired`
121    /// when the lease has been stolen or timed out.
122    async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError>;
123
124    /// Numeric-progress heartbeat.
125    ///
126    /// Writes scalar `progress_percent` / `progress_message` fields on
127    /// `exec_core`; each call overwrites the previous value. This does
128    /// NOT append to the output stream — stream-frame producers must use
129    /// [`append_frame`](Self::append_frame) instead.
130    async fn progress(
131        &self,
132        handle: &Handle,
133        percent: Option<u8>,
134        message: Option<String>,
135    ) -> Result<(), EngineError>;
136
137    /// Append one stream frame. Distinct from [`progress`](Self::progress)
138    /// per RFC-012 §3.1.1 K#6. Returns the backend-assigned stream entry
139    /// id and post-append frame count (RFC-012 §R7.2.1).
140    ///
141    /// Stream-frame producers (arbitrary `frame_type` + payload, consumed
142    /// via the read/tail surfaces) MUST use this method rather than
143    /// [`progress`](Self::progress); the latter updates scalar fields on
144    /// `exec_core` and is invisible to stream consumers.
145    async fn append_frame(
146        &self,
147        handle: &Handle,
148        frame: Frame,
149    ) -> Result<AppendFrameOutcome, EngineError>;
150
151    /// Terminal success. Borrows `handle` (round-4 M-D2) so callers
152    /// can retry under `EngineError::Transport` without losing the
153    /// cookie. Payload is `Option<Vec<u8>>` per the note above.
154    async fn complete(&self, handle: &Handle, payload: Option<Vec<u8>>) -> Result<(), EngineError>;
155
156    /// Terminal failure with classification. Returns [`FailOutcome`]
157    /// so the caller learns whether a retry was scheduled.
158    async fn fail(
159        &self,
160        handle: &Handle,
161        reason: FailureReason,
162        classification: FailureClass,
163    ) -> Result<FailOutcome, EngineError>;
164
165    /// Cooperative cancel by the worker holding the lease.
166    async fn cancel(&self, handle: &Handle, reason: &str) -> Result<(), EngineError>;
167
168    /// Suspend the execution awaiting a typed resume condition
169    /// (RFC-013 Stage 1d).
170    ///
171    /// Borrows `handle` (round-4 M-D2). Terminal-looking behaviour is
172    /// expressed through [`SuspendOutcome`]:
173    ///
174    /// * [`SuspendOutcome::Suspended`] — the pre-suspend handle is
175    ///   logically invalidated; the fresh `HandleKind::Suspended`
176    ///   handle inside the variant supersedes it. Runtime enforcement
177    ///   via the fence triple: subsequent ops against the stale handle
178    ///   surface as `Contention(LeaseConflict)`.
179    /// * [`SuspendOutcome::AlreadySatisfied`] — buffered signals on a
180    ///   pending waitpoint already matched the resume condition at
181    ///   suspension time. The lease is NOT released; the caller's
182    ///   pre-suspend handle remains valid.
183    ///
184    /// See RFC-013 §2 for the type shapes, §3 for the replay /
185    /// idempotency contract, §4 for the error taxonomy.
186    async fn suspend(
187        &self,
188        handle: &Handle,
189        args: SuspendArgs,
190    ) -> Result<SuspendOutcome, EngineError>;
191
192    /// Issue a pending waitpoint for future signal delivery.
193    ///
194    /// Waitpoints have two states in the Valkey wire contract:
195    /// **pending** (token issued, not yet backing a suspension) and
196    /// **active** (bound to a suspension). This method creates a
197    /// waitpoint in the **pending** state. A later `suspend` call
198    /// transitions a pending waitpoint to active (see Lua
199    /// `use_pending_waitpoint` ARGV flag at
200    /// `flowfabric.lua:3603,3641,3690`) — or, if buffered signals
201    /// already satisfy its condition, the suspend call returns
202    /// `SuspendOutcome::AlreadySatisfied` and the waitpoint activates
203    /// without ever releasing the lease.
204    ///
205    /// Pending-waitpoint expiry is a first-class terminal error on
206    /// the wire (`PendingWaitpointExpired` at
207    /// `ff-script/src/error.rs:170,403-408`). The attempt retains its
208    /// lease while the waitpoint is pending; signals delivered to
209    /// this waitpoint are buffered server-side (RFC-012 §R7.2.2).
210    async fn create_waitpoint(
211        &self,
212        handle: &Handle,
213        waitpoint_key: &str,
214        expires_in: Duration,
215    ) -> Result<PendingWaitpoint, EngineError>;
216
217    /// Non-mutating observation of signals that satisfied the handle's
218    /// resume condition.
219    async fn observe_signals(&self, handle: &Handle) -> Result<Vec<ResumeSignal>, EngineError>;
220
221    /// Consume a reclaim grant to mint a resumed-kind handle. Returns
222    /// `Ok(None)` when the grant's target execution is no longer
223    /// resumable (already reclaimed, terminal, etc.).
224    async fn claim_from_reclaim(&self, token: ReclaimToken) -> Result<Option<Handle>, EngineError>;
225
226    // Round-5 amendment: lease-releasing peers of `suspend`.
227
228    /// Park the execution until `delay_until`, releasing the lease.
229    async fn delay(&self, handle: &Handle, delay_until: TimestampMs) -> Result<(), EngineError>;
230
231    /// Mark the execution as waiting for its child flow to complete,
232    /// releasing the lease.
233    async fn wait_children(&self, handle: &Handle) -> Result<(), EngineError>;
234
235    // ── Read / admin ──
236
237    /// Snapshot an execution by id. `Ok(None)` ⇒ no such execution.
238    async fn describe_execution(
239        &self,
240        id: &ExecutionId,
241    ) -> Result<Option<ExecutionSnapshot>, EngineError>;
242
243    /// Snapshot a flow by id. `Ok(None)` ⇒ no such flow.
244    async fn describe_flow(&self, id: &FlowId) -> Result<Option<FlowSnapshot>, EngineError>;
245
246    /// List dependency edges adjacent to an execution. Read-only; the
247    /// backend resolves the subject execution's flow, reads the
248    /// direction-specific adjacency SET, and decodes each member's
249    /// flow-scoped `edge:<edge_id>` hash.
250    ///
251    /// Returns an empty `Vec` when the subject has no edges on the
252    /// requested side — including standalone executions (no owning
253    /// flow). Ordering is unspecified: the underlying adjacency SET
254    /// is an unordered SMEMBERS read. Callers that need deterministic
255    /// order should sort by [`EdgeSnapshot::edge_id`] /
256    /// [`EdgeSnapshot::created_at`] themselves.
257    ///
258    /// Parse failures on the edge hash surface as
259    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]
260    /// — unknown fields, missing required fields, endpoint mismatches
261    /// against the adjacency SET all fail loud rather than silently
262    /// returning partial results.
263    ///
264    /// Gated on the `core` feature — edge reads are part of the
265    /// minimal engine surface a Postgres-style backend must honour.
266    ///
267    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
268    #[cfg(feature = "core")]
269    async fn list_edges(
270        &self,
271        flow_id: &FlowId,
272        direction: EdgeDirection,
273    ) -> Result<Vec<EdgeSnapshot>, EngineError>;
274
275    /// Snapshot a single dependency edge by its owning flow + edge id.
276    ///
277    /// `Ok(None)` when the edge hash is absent (never staged, or
278    /// staged under a different flow than `flow_id`). Parse failures
279    /// on a present edge hash surface as
280    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]
281    /// — the stored `flow_id` field is cross-checked against the
282    /// caller's expected `flow_id` so a wrong-key read fails loud
283    /// rather than returning an unrelated edge.
284    ///
285    /// Gated on the `core` feature — single-edge reads are part of
286    /// the minimal snapshot surface an alternate backend must honour
287    /// alongside [`Self::describe_execution`] / [`Self::describe_flow`]
288    /// / [`Self::list_edges`].
289    ///
290    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
291    #[cfg(feature = "core")]
292    async fn describe_edge(
293        &self,
294        flow_id: &FlowId,
295        edge_id: &EdgeId,
296    ) -> Result<Option<EdgeSnapshot>, EngineError>;
297
298    /// Resolve an execution's owning flow id, if any.
299    ///
300    /// `Ok(None)` when the execution's core record is absent or has
301    /// no associated flow (standalone execution). A present-but-
302    /// malformed `flow_id` field surfaces as
303    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`].
304    ///
305    /// Gated on the `core` feature. Used by ff-sdk's
306    /// `list_outgoing_edges` / `list_incoming_edges` to pivot from a
307    /// consumer-supplied `ExecutionId` to the `FlowId` required by
308    /// [`Self::list_edges`]. A Valkey backend serves this with a
309    /// single `HGET exec_core flow_id`; a Postgres backend serves it
310    /// with the equivalent single-column row lookup.
311    ///
312    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
313    #[cfg(feature = "core")]
314    async fn resolve_execution_flow_id(
315        &self,
316        eid: &ExecutionId,
317    ) -> Result<Option<FlowId>, EngineError>;
318
319    /// List flows on a partition with cursor-based pagination (issue
320    /// #185).
321    ///
322    /// Returns a [`ListFlowsPage`] of [`FlowSummary`](crate::contracts::FlowSummary)
323    /// rows ordered by `flow_id` (UUID byte-lexicographic). `cursor`
324    /// is `None` for the first page; callers forward the returned
325    /// `next_cursor` verbatim to continue iteration, and the listing
326    /// is exhausted when `next_cursor` is `None`. `limit` is the
327    /// maximum number of rows to return on this page — implementations
328    /// MAY return fewer (end of partition) but MUST NOT exceed it.
329    ///
330    /// Ordering rationale: flow ids are UUIDs, and both Valkey
331    /// (sort after-the-fact) and Postgres (`ORDER BY flow_id`) can
332    /// agree on byte-lexicographic order — the same order
333    /// `FlowId::to_string()` produces for canonical hyphenated UUIDs.
334    /// Mapping to `cursor > flow_id` keeps the contract backend-
335    /// independent.
336    ///
337    /// # Postgres implementation pattern
338    ///
339    /// A Postgres-backed implementation serves this directly with
340    ///
341    /// ```sql
342    /// SELECT flow_id, created_at_ms, public_flow_state
343    ///   FROM ff_flow
344    ///  WHERE partition_key = $1
345    ///    AND ($2::uuid IS NULL OR flow_id > $2)
346    ///  ORDER BY flow_id
347    ///  LIMIT $3 + 1;
348    /// ```
349    ///
350    /// — reading one extra row to decide whether `next_cursor` should
351    /// be set to the last row's `flow_id`. The Valkey implementation
352    /// maintains the `ff:idx:{fp:N}:flow_index` SET and performs the
353    /// sort + slice client-side (SMEMBERS then sort-by-UUID-bytes),
354    /// pipelining `HGETALL flow_core` for each row on the page.
355    ///
356    /// Gated on the `core` feature — flow listing is part of the
357    /// minimal engine surface a Postgres-style backend must honour.
358    #[cfg(feature = "core")]
359    async fn list_flows(
360        &self,
361        partition: PartitionKey,
362        cursor: Option<FlowId>,
363        limit: usize,
364    ) -> Result<ListFlowsPage, EngineError>;
365
366    /// Enumerate registered lanes with cursor-based pagination.
367    ///
368    /// Lanes are global (not partition-scoped) — the backend serves
369    /// this from its lane registry and does NOT accept a
370    /// [`crate::partition::Partition`] argument. Results are sorted
371    /// by [`LaneId`] name so the ordering is stable across calls and
372    /// cursors address a deterministic position in the sort.
373    ///
374    /// * `cursor` — exclusive lower bound. `None` starts from the
375    ///   first lane. To continue a walk, pass the previous page's
376    ///   [`ListLanesPage::next_cursor`].
377    /// * `limit` — hard cap on the number of lanes returned in the
378    ///   page. Backends MAY round this down when the registry size
379    ///   is smaller; they MUST NOT return more than `limit`.
380    ///
381    /// [`ListLanesPage::next_cursor`] is `Some(last_lane_in_page)`
382    /// iff at least one more lane exists after the returned page,
383    /// and `None` on the final page. Callers loop until `next_cursor`
384    /// is `None` to read the full registry.
385    ///
386    /// Gated on the `core` feature — lane enumeration is part of the
387    /// minimal snapshot surface an alternate backend must honour
388    /// alongside [`Self::describe_flow`] / [`Self::list_edges`].
389    #[cfg(feature = "core")]
390    async fn list_lanes(
391        &self,
392        cursor: Option<LaneId>,
393        limit: usize,
394    ) -> Result<ListLanesPage, EngineError>;
395
396    /// List suspended executions in one partition, cursor-paginated,
397    /// with each entry's suspension `reason_code` populated (issue
398    /// #183).
399    ///
400    /// Consumer-facing "what's blocked on what?" panels (ff-board's
401    /// suspended-executions view, operator CLIs) need the reason in
402    /// the list response so the UI does not round-trip per row to
403    /// `describe_execution` for a field it knows it needs. `reason`
404    /// on [`SuspendedExecutionEntry`] carries the free-form
405    /// `suspension:current.reason_code` field — see the type rustdoc
406    /// for the String-not-enum rationale.
407    ///
408    /// `cursor` is opaque to callers; pass `None` to start a fresh
409    /// scan and feed the returned [`ListSuspendedPage::next_cursor`]
410    /// back in on subsequent pages until it comes back `None`.
411    /// `limit` bounds the `entries` count; backends MAY return fewer
412    /// when the partition is exhausted.
413    ///
414    /// Ordering is by ascending `suspended_at_ms` (the per-lane
415    /// suspended ZSET score == `timeout_at` or the no-timeout
416    /// sentinel) with execution id as a lex tiebreak, so cursor
417    /// continuation is deterministic across calls.
418    ///
419    /// Gated on the `core` feature — suspended-list enumeration is
420    /// part of the minimal engine surface a Postgres-style backend
421    /// must honour.
422    #[cfg(feature = "core")]
423    async fn list_suspended(
424        &self,
425        partition: PartitionKey,
426        cursor: Option<ExecutionId>,
427        limit: usize,
428    ) -> Result<ListSuspendedPage, EngineError>;
429
430    /// Forward-only paginated listing of the executions indexed under
431    /// one partition.
432    ///
433    /// Reads the partition-wide `ff:idx:{p:N}:all_executions` set,
434    /// sorts lexicographically on `ExecutionId`, and returns the page
435    /// of ids strictly greater than `cursor` (or starting from the
436    /// smallest id when `cursor = None`). The returned
437    /// [`ListExecutionsPage::next_cursor`] is the last id on the page
438    /// iff at least one more id exists past it; `None` signals
439    /// end-of-stream.
440    ///
441    /// `limit` is the maximum number of ids returned on this page. A
442    /// `limit` of `0` returns an empty page with `next_cursor = None`.
443    /// Backends MAY cap `limit` internally (Valkey: 1000) and return
444    /// fewer ids than requested; callers continue paginating until
445    /// `next_cursor == None`.
446    ///
447    /// Ordering is stable under concurrent inserts for already-emitted
448    /// ids (an id less-than-or-equal-to the caller's cursor is never
449    /// re-emitted in later pages) but new inserts past the cursor WILL
450    /// appear in subsequent pages — consistent with forward-only
451    /// cursor semantics.
452    ///
453    /// Gated on the `core` feature — partition-scoped listing is part
454    /// of the minimal engine surface every backend must honour.
455    #[cfg(feature = "core")]
456    async fn list_executions(
457        &self,
458        partition: PartitionKey,
459        cursor: Option<ExecutionId>,
460        limit: usize,
461    ) -> Result<ListExecutionsPage, EngineError>;
462
463    // ── Trigger ops (issue #150) ──
464
465    /// Deliver an external signal to a suspended execution's waitpoint.
466    ///
467    /// The backend atomically records the signal, evaluates the resume
468    /// condition, and — when satisfied — transitions the execution
469    /// from `suspended` to `runnable` (or buffers the signal when the
470    /// waitpoint is still `pending`). Duplicate delivery — same
471    /// `idempotency_key` + waitpoint — surfaces as
472    /// [`DeliverSignalResult::Duplicate`] with the pre-existing
473    /// `signal_id` rather than mutating state twice.
474    ///
475    /// Input validation (HMAC token presence, payload size limits,
476    /// signal-name shape) is the backend's responsibility; callers
477    /// pass a fully populated [`DeliverSignalArgs`] and receive typed
478    /// outcomes or typed errors (`ScriptError::invalid_token`,
479    /// `ScriptError::token_expired`, `ScriptError::ExecutionNotFound`
480    /// surfaced via [`EngineError::Transport`] on the Valkey backend).
481    ///
482    /// Gated on the `core` feature — signal delivery is part of the
483    /// minimal trigger surface every backend must honour so ff-server
484    /// / REST handlers can dispatch against `Arc<dyn EngineBackend>`
485    /// without knowing which backend is running underneath.
486    #[cfg(feature = "core")]
487    async fn deliver_signal(
488        &self,
489        args: DeliverSignalArgs,
490    ) -> Result<DeliverSignalResult, EngineError>;
491
492    /// Claim a resumed execution — a previously-suspended attempt that
493    /// has cleared its resume condition (e.g. via
494    /// [`Self::deliver_signal`]) and now needs a worker to pick up the
495    /// same attempt index.
496    ///
497    /// Distinct from [`Self::claim`] (fresh work) and
498    /// [`Self::claim_from_reclaim`] (grant-based ownership transfer
499    /// after a crash): the resumed-claim path re-binds an existing
500    /// attempt rather than minting a new one. The backend issues a
501    /// fresh `lease_id` + bumps the `lease_epoch`, preserving
502    /// `attempt_id` / `attempt_index` so stream frames and progress
503    /// updates continue on the same attempt.
504    ///
505    /// Typed failures surface via `ScriptError` → `EngineError`:
506    /// `NotAResumedExecution` when the attempt state is not
507    /// `attempt_interrupted`, `ExecutionNotLeaseable` when the
508    /// lifecycle phase is not `runnable`, and `InvalidClaimGrant`
509    /// when the grant key is missing or was already consumed.
510    ///
511    /// Gated on the `core` feature — resumed-claim is part of the
512    /// minimal trigger surface every backend must honour.
513    #[cfg(feature = "core")]
514    async fn claim_resumed_execution(
515        &self,
516        args: ClaimResumedExecutionArgs,
517    ) -> Result<ClaimResumedExecutionResult, EngineError>;
518
519    /// Operator-initiated cancellation of a flow and (optionally) its
520    /// member executions. See RFC-012 §3.1.1 for the policy /wait
521    /// matrix.
522    async fn cancel_flow(
523        &self,
524        id: &FlowId,
525        policy: CancelFlowPolicy,
526        wait: CancelFlowWait,
527    ) -> Result<CancelFlowResult, EngineError>;
528
529    /// RFC-016 Stage A: set the inbound-edge-group policy for a
530    /// downstream execution. Must be called before the first
531    /// `add_dependency(... -> downstream_execution_id)` — the backend
532    /// rejects with [`EngineError::Conflict`] if edges have already
533    /// been staged for this group.
534    ///
535    /// Stage A honours only
536    /// [`EdgeDependencyPolicy::AllOf`](crate::contracts::EdgeDependencyPolicy::AllOf);
537    /// the `AnyOf` / `Quorum` variants return
538    /// [`EngineError::Validation`] with
539    /// `detail = "stage A supports AllOf only; AnyOf/Quorum land in stage B"`
540    /// until Stage B's resolver lands.
541    #[cfg(feature = "core")]
542    async fn set_edge_group_policy(
543        &self,
544        flow_id: &FlowId,
545        downstream_execution_id: &ExecutionId,
546        policy: crate::contracts::EdgeDependencyPolicy,
547    ) -> Result<crate::contracts::SetEdgeGroupPolicyResult, EngineError>;
548
549    // ── HMAC secret rotation (v0.7 migration-master Q4) ──
550
551    /// Rotate the waitpoint HMAC signing kid **cluster-wide**.
552    ///
553    /// **v0.7 migration-master Q4 (adjudicated 2026-04-24).**
554    /// Additive trait surface so Valkey and Postgres backends can
555    /// both expose the "rotate everywhere" semantic under one name.
556    ///
557    /// * Valkey impl fans out an `ff_rotate_waitpoint_hmac_secret`
558    ///   FCALL per execution partition. `entries.len() == num_flow_partitions`
559    ///   and per-partition failures are surfaced as inner `Err`
560    ///   entries — the call as a whole does not fail when one
561    ///   partition's FCALL fails, matching
562    ///   [`ff_sdk::admin::rotate_waitpoint_hmac_secret_all_partitions`]'s
563    ///   partial-success contract.
564    /// * Postgres impl (Wave 4) writes one row to
565    ///   `ff_waitpoint_hmac(kid, secret, rotated_at)` and returns a
566    ///   single-entry vec with `partition = 0`.
567    ///
568    /// The default impl returns
569    /// [`EngineError::Unavailable`] with
570    /// `op = "rotate_waitpoint_hmac_secret_all"` so backends that
571    /// haven't implemented the method surface the miss loudly rather
572    /// than silently no-op'ing. Both concrete backends override.
573    async fn rotate_waitpoint_hmac_secret_all(
574        &self,
575        _args: RotateWaitpointHmacSecretAllArgs,
576    ) -> Result<RotateWaitpointHmacSecretAllResult, EngineError> {
577        Err(EngineError::Unavailable {
578            op: "rotate_waitpoint_hmac_secret_all",
579        })
580    }
581
582    // ── Budget ──
583
584    /// Report usage against a budget and check limits. Returns the
585    /// typed [`ReportUsageResult`] variant; backends enforce
586    /// idempotency via the caller-supplied
587    /// [`UsageDimensions::dedup_key`] (RFC-012 §R7.2.3 — replaces
588    /// the pre-Round-7 `AdmissionDecision` return).
589    async fn report_usage(
590        &self,
591        handle: &Handle,
592        budget: &BudgetId,
593        dimensions: crate::backend::UsageDimensions,
594    ) -> Result<ReportUsageResult, EngineError>;
595
596    // ── Stream reads (RFC-012 Stage 1c tranche-4; issue #87) ──
597
598    /// Read frames from a completed or in-flight attempt's stream.
599    ///
600    /// `from` / `to` are [`StreamCursor`] values — `StreamCursor::Start`
601    /// / `StreamCursor::End` are equivalent to XRANGE `-` / `+`, and
602    /// `StreamCursor::At("<id>")` reads from a concrete entry id.
603    ///
604    /// Input validation (count_limit bounds, cursor shape) is the
605    /// caller's responsibility — SDK-side wrappers in
606    /// [`ff-sdk`](https://docs.rs/ff-sdk) enforce bounds before
607    /// forwarding. Backends MAY additionally reject out-of-range
608    /// input via [`EngineError::Validation`].
609    ///
610    /// Gated on the `streaming` feature — stream reads are part of
611    /// the stream-subset surface a backend without XREAD-like
612    /// primitives may omit.
613    #[cfg(feature = "streaming")]
614    async fn read_stream(
615        &self,
616        execution_id: &ExecutionId,
617        attempt_index: AttemptIndex,
618        from: StreamCursor,
619        to: StreamCursor,
620        count_limit: u64,
621    ) -> Result<StreamFrames, EngineError>;
622
623    /// Tail a live attempt's stream.
624    ///
625    /// `after` is an exclusive [`StreamCursor`] — entries with id
626    /// strictly greater than `after` are returned. `StreamCursor::Start`
627    /// / `StreamCursor::End` are NOT accepted here; callers MUST pass
628    /// a concrete id (or `StreamCursor::from_beginning()`). The SDK
629    /// wrapper rejects the open markers before reaching the backend.
630    ///
631    /// `block_ms == 0` → non-blocking peek. `block_ms > 0` → blocks up
632    /// to that many ms for a new entry.
633    ///
634    /// `visibility` (RFC-015 §6.1) filters the returned entries by
635    /// their stored [`StreamMode`](crate::backend::StreamMode)
636    /// `mode` field. Default
637    /// [`TailVisibility::All`](crate::backend::TailVisibility::All)
638    /// preserves v1 behaviour.
639    ///
640    /// Gated on the `streaming` feature — see [`read_stream`](Self::read_stream).
641    #[cfg(feature = "streaming")]
642    async fn tail_stream(
643        &self,
644        execution_id: &ExecutionId,
645        attempt_index: AttemptIndex,
646        after: StreamCursor,
647        block_ms: u64,
648        count_limit: u64,
649        visibility: TailVisibility,
650    ) -> Result<StreamFrames, EngineError>;
651
652    /// Read the rolling summary document for an attempt (RFC-015 §6.3).
653    ///
654    /// Returns `Ok(None)` when no [`StreamMode::DurableSummary`](crate::backend::StreamMode::DurableSummary)
655    /// frame has ever been appended for the attempt. Non-blocking Hash
656    /// read; safe to call from any consumer without holding the lease.
657    ///
658    /// Gated on the `streaming` feature — summary reads are part of
659    /// the stream-subset surface.
660    #[cfg(feature = "streaming")]
661    async fn read_summary(
662        &self,
663        execution_id: &ExecutionId,
664        attempt_index: AttemptIndex,
665    ) -> Result<Option<SummaryDocument>, EngineError>;
666
667    // ── RFC-017 Stage A — Ingress (5) ──────────────────────────
668    //
669    // Every method in this block has a default impl returning
670    // `EngineError::Unavailable { op }` per RFC-017 §5.3. Concrete
671    // backends override each method with a real body. A missing
672    // override surfaces as a loud typed error at the call site rather
673    // than a silent no-op.
674
675    /// Create an execution. Ingress row 6 (RFC-017 §4). Wraps
676    /// `ff_create_execution` on Valkey; `INSERT INTO ff_execution ...`
677    /// on Postgres. The `idempotency_key` + backend-side default
678    /// `dedup_ttl_ms = 86400000` make duplicate submissions idempotent.
679    #[cfg(feature = "core")]
680    async fn create_execution(
681        &self,
682        _args: CreateExecutionArgs,
683    ) -> Result<CreateExecutionResult, EngineError> {
684        Err(EngineError::Unavailable {
685            op: "create_execution",
686        })
687    }
688
689    /// Create a flow header. Ingress row 5.
690    #[cfg(feature = "core")]
691    async fn create_flow(
692        &self,
693        _args: CreateFlowArgs,
694    ) -> Result<CreateFlowResult, EngineError> {
695        Err(EngineError::Unavailable { op: "create_flow" })
696    }
697
698    /// Atomically add an execution to a flow (single-FCALL co-located
699    /// commit on Valkey; single-transaction UPSERT on Postgres).
700    #[cfg(feature = "core")]
701    async fn add_execution_to_flow(
702        &self,
703        _args: AddExecutionToFlowArgs,
704    ) -> Result<AddExecutionToFlowResult, EngineError> {
705        Err(EngineError::Unavailable {
706            op: "add_execution_to_flow",
707        })
708    }
709
710    /// Stage a dependency edge between flow members. CAS-guarded on
711    /// `graph_revision` — stale rev returns `Contention(StaleGraphRevision)`.
712    #[cfg(feature = "core")]
713    async fn stage_dependency_edge(
714        &self,
715        _args: StageDependencyEdgeArgs,
716    ) -> Result<StageDependencyEdgeResult, EngineError> {
717        Err(EngineError::Unavailable {
718            op: "stage_dependency_edge",
719        })
720    }
721
722    /// Apply a staged dependency edge to its downstream child.
723    #[cfg(feature = "core")]
724    async fn apply_dependency_to_child(
725        &self,
726        _args: ApplyDependencyToChildArgs,
727    ) -> Result<ApplyDependencyToChildResult, EngineError> {
728        Err(EngineError::Unavailable {
729            op: "apply_dependency_to_child",
730        })
731    }
732
733    // ── RFC-017 Stage A — Operator control (4) ─────────────────
734
735    /// Operator-initiated execution cancel (row 2).
736    #[cfg(feature = "core")]
737    async fn cancel_execution(
738        &self,
739        _args: CancelExecutionArgs,
740    ) -> Result<CancelExecutionResult, EngineError> {
741        Err(EngineError::Unavailable {
742            op: "cancel_execution",
743        })
744    }
745
746    /// Re-score an execution's eligibility priority (row 17).
747    #[cfg(feature = "core")]
748    async fn change_priority(
749        &self,
750        _args: ChangePriorityArgs,
751    ) -> Result<ChangePriorityResult, EngineError> {
752        Err(EngineError::Unavailable {
753            op: "change_priority",
754        })
755    }
756
757    /// Replay a terminal execution (row 22). Variadic KEYS handling
758    /// (inbound-edge pre-read) is hidden inside the Valkey impl per
759    /// RFC-017 §4 row 3.
760    #[cfg(feature = "core")]
761    async fn replay_execution(
762        &self,
763        _args: ReplayExecutionArgs,
764    ) -> Result<ReplayExecutionResult, EngineError> {
765        Err(EngineError::Unavailable {
766            op: "replay_execution",
767        })
768    }
769
770    /// Operator-initiated lease revoke (row 19).
771    #[cfg(feature = "core")]
772    async fn revoke_lease(
773        &self,
774        _args: RevokeLeaseArgs,
775    ) -> Result<RevokeLeaseResult, EngineError> {
776        Err(EngineError::Unavailable { op: "revoke_lease" })
777    }
778
779    // ── RFC-017 Stage A — Budget + quota admin (5) ─────────────
780
781    /// Create a budget definition (row 6).
782    #[cfg(feature = "core")]
783    async fn create_budget(
784        &self,
785        _args: CreateBudgetArgs,
786    ) -> Result<CreateBudgetResult, EngineError> {
787        Err(EngineError::Unavailable {
788            op: "create_budget",
789        })
790    }
791
792    /// Reset a budget's usage counters (row 10).
793    #[cfg(feature = "core")]
794    async fn reset_budget(
795        &self,
796        _args: ResetBudgetArgs,
797    ) -> Result<ResetBudgetResult, EngineError> {
798        Err(EngineError::Unavailable { op: "reset_budget" })
799    }
800
801    /// Create a quota policy (row 7).
802    #[cfg(feature = "core")]
803    async fn create_quota_policy(
804        &self,
805        _args: CreateQuotaPolicyArgs,
806    ) -> Result<CreateQuotaPolicyResult, EngineError> {
807        Err(EngineError::Unavailable {
808            op: "create_quota_policy",
809        })
810    }
811
812    /// Read-only budget status for operator visibility (row 8).
813    #[cfg(feature = "core")]
814    async fn get_budget_status(
815        &self,
816        _id: &BudgetId,
817    ) -> Result<BudgetStatus, EngineError> {
818        Err(EngineError::Unavailable {
819            op: "get_budget_status",
820        })
821    }
822
823    /// Admin-path `report_usage` (row 9 + RFC-017 §5 round-1 F4).
824    /// Distinct from the existing [`Self::report_usage`] which takes
825    /// a worker handle — the admin path has no lease context.
826    #[cfg(feature = "core")]
827    async fn report_usage_admin(
828        &self,
829        _budget: &BudgetId,
830        _args: ReportUsageAdminArgs,
831    ) -> Result<ReportUsageResult, EngineError> {
832        Err(EngineError::Unavailable {
833            op: "report_usage_admin",
834        })
835    }
836
837    // ── RFC-017 Stage A — Read + diagnostics (3) ───────────────
838
839    /// Fetch the stored result payload for a completed execution
840    /// (row 4). Returns `Ok(None)` when the execution is missing, not
841    /// yet complete, or its payload was trimmed by retention policy.
842    async fn get_execution_result(
843        &self,
844        _id: &ExecutionId,
845    ) -> Result<Option<Vec<u8>>, EngineError> {
846        Err(EngineError::Unavailable {
847            op: "get_execution_result",
848        })
849    }
850
851    /// List the pending-or-active waitpoints for an execution, cursor
852    /// paginated (row 5 / §8). Stage A preserves the existing
853    /// `PendingWaitpointInfo` shape; Stage D ships the §8 HMAC
854    /// sanitisation + `(token_kid, token_fingerprint)` schema.
855    #[cfg(feature = "core")]
856    async fn list_pending_waitpoints(
857        &self,
858        _args: ListPendingWaitpointsArgs,
859    ) -> Result<ListPendingWaitpointsResult, EngineError> {
860        Err(EngineError::Unavailable {
861            op: "list_pending_waitpoints",
862        })
863    }
864
865    /// Backend-level reachability probe (row 1). Valkey: `PING`;
866    /// Postgres: `SELECT 1`.
867    async fn ping(&self) -> Result<(), EngineError> {
868        Err(EngineError::Unavailable { op: "ping" })
869    }
870
871    // ── RFC-017 Stage A — Scheduling (1) ───────────────────────
872
873    /// Scheduler-routed claim entrypoint (row 18, RFC-017 §7). Valkey
874    /// forwards to its `ff_scheduler::Scheduler` cursor; Postgres
875    /// forwards to `PostgresScheduler`'s `FOR UPDATE SKIP LOCKED`
876    /// path.
877    #[cfg(feature = "core")]
878    async fn claim_for_worker(
879        &self,
880        _args: ClaimForWorkerArgs,
881    ) -> Result<ClaimForWorkerOutcome, EngineError> {
882        Err(EngineError::Unavailable {
883            op: "claim_for_worker",
884        })
885    }
886
887    // ── Cross-cutting (RFC-017 Stage B trait-lift) ──────────────
888
889    /// Static observability label identifying the backend family in
890    /// logs + metrics (RFC-017 §5.4 + §9 Stage B). Default impl
891    /// returns `"unknown"` so legacy `impl EngineBackend` blocks that
892    /// have not upgraded keep compiling; every in-tree backend
893    /// overrides — `ValkeyBackend` → `"valkey"`, `PostgresBackend` →
894    /// `"postgres"`.
895    fn backend_label(&self) -> &'static str {
896        "unknown"
897    }
898
899    /// Drain-before-shutdown hook (RFC-017 §5.4). The server calls
900    /// this before draining its own background tasks so backend-
901    /// scoped primitives (Valkey stream semaphore, Postgres sqlx
902    /// pool, …) can close their gates and await in-flight work up to
903    /// `grace`.
904    ///
905    /// Default impl returns `Ok(())` — a no-op backend has nothing
906    /// backend-scoped to drain. Concrete backends whose data plane
907    /// owns resources (connection pools, semaphores, listeners)
908    /// override with a real body.
909    async fn shutdown_prepare(&self, _grace: Duration) -> Result<(), EngineError> {
910        Ok(())
911    }
912
913    // ── RFC-017 Stage E2 — `Server::client` removal (header + reads) ───
914
915    /// RFC-017 Stage E2: the "header" portion of `cancel_flow` — run the
916    /// atomic flow-state flip (Valkey: `ff_cancel_flow` FCALL; Postgres:
917    /// `cancel_flow_once` tx), decode policy + membership, and surface
918    /// the `flow_already_terminal` idempotency branch as a first-class
919    /// [`CancelFlowHeader::AlreadyTerminal`] so the Server can build
920    /// the wire [`CancelFlowResult`] without reaching for a raw
921    /// `Client`. Separate from the existing
922    /// [`EngineBackend::cancel_flow`] entry point (which takes the
923    /// enum-typed `(policy, wait)` split and returns the wait-collapsed
924    /// `CancelFlowResult`) because the Server owns its own
925    /// wait-dispatch + member-cancel machinery via
926    /// [`EngineBackend::cancel_execution`] + backlog ack.
927    ///
928    /// Default impl returns [`EngineError::Unavailable`] so un-migrated
929    /// backends surface the miss loudly.
930    #[cfg(feature = "core")]
931    async fn cancel_flow_header(
932        &self,
933        _args: CancelFlowArgs,
934    ) -> Result<crate::contracts::CancelFlowHeader, EngineError> {
935        Err(EngineError::Unavailable {
936            op: "cancel_flow_header",
937        })
938    }
939
940    /// RFC-017 Stage E2: best-effort acknowledgement that one member of
941    /// a `cancel_all` flow has completed its per-member cancel. Drains
942    /// the member from the flow's `pending_cancels` set and, if empty,
943    /// removes the flow from the partition-level `cancel_backlog`
944    /// (Valkey: `ff_ack_cancel_member` FCALL; Postgres: table write —
945    /// default `Unavailable` until Wave 9).
946    ///
947    /// Failures are swallowed by the caller — the cancel-backlog
948    /// reconciler is the authoritative drain — but a typed error here
949    /// lets the caller log a backend-scoped context string.
950    #[cfg(feature = "core")]
951    async fn ack_cancel_member(
952        &self,
953        _flow_id: &FlowId,
954        _execution_id: &ExecutionId,
955    ) -> Result<(), EngineError> {
956        Err(EngineError::Unavailable {
957            op: "ack_cancel_member",
958        })
959    }
960
961    /// RFC-017 Stage E2: full-shape execution read used by the
962    /// `GET /v1/executions/{id}` HTTP route. Returns the legacy
963    /// [`ExecutionInfo`] wire shape (not the decoupled
964    /// [`ExecutionSnapshot`]) so the existing HTTP response bytes stay
965    /// identical across the migration.
966    ///
967    /// `Ok(None)` ⇒ no such execution. Default `Unavailable` because
968    /// the Valkey HGETALL-and-parse is backend-specific.
969    #[cfg(feature = "core")]
970    async fn read_execution_info(
971        &self,
972        _id: &ExecutionId,
973    ) -> Result<Option<ExecutionInfo>, EngineError> {
974        Err(EngineError::Unavailable {
975            op: "read_execution_info",
976        })
977    }
978
979    /// RFC-017 Stage E2: narrow `public_state` read used by the
980    /// `GET /v1/executions/{id}/state` HTTP route. Returns `Ok(None)`
981    /// when the execution is missing. Default `Unavailable`.
982    #[cfg(feature = "core")]
983    async fn read_execution_state(
984        &self,
985        _id: &ExecutionId,
986    ) -> Result<Option<PublicState>, EngineError> {
987        Err(EngineError::Unavailable {
988            op: "read_execution_state",
989        })
990    }
991
992}
993
994/// Object-safety assertion: `dyn EngineBackend` compiles iff every
995/// method is dyn-compatible. Kept as a compile-time guard so a future
996/// trait change that accidentally breaks dyn-safety fails the build
997/// at this site rather than at every downstream `Arc<dyn
998/// EngineBackend>` use.
999#[allow(dead_code)]
1000fn _assert_dyn_compatible(_: &dyn EngineBackend) {}