Skip to main content

ff_core/
engine_backend.rs

1//! The `EngineBackend` trait — abstracting FlowFabric's write surface.
2//!
3//! **RFC-012 Stage 1a:** this is the trait landing. The
4//! Valkey-backed impl lives in `ff-backend-valkey`; future backends
5//! (Postgres) add a sibling crate with their own impl. ff-sdk's
6//! `FlowFabricWorker` gains `connect_with(backend)` /
7//! `backend(&self)` accessors so consumers that want to bring their
8//! own backend (tests, future non-Valkey deployments) can hand one
9//! in. The hot-path migration of `ClaimedTask` / `FlowFabricWorker`
10//! to forward through the trait lands across Stages 1b-1d.
11//!
12//! # Object safety
13//!
14//! `EngineBackend` is object-safe: all methods are `async fn` behind
15//! `#[async_trait]` and take `&self`. Consumers can hold
16//! `Arc<dyn EngineBackend>` for heterogenous-backend deployments.
17//! The trait is `Send + Sync + 'static` per RFC-012 §4.1; every impl
18//! must honour that bound.
19//!
20//! # Error surface
21//!
22//! Every method returns [`Result<_, EngineError>`]. `EngineError`'s
23//! `Transport` variant carries a boxed `dyn Error + Send + Sync`;
24//! Valkey-backed transport faults box a
25//! `ff_script::error::ScriptError` (downcast via
26//! `ff_script::engine_error_ext::transport_script_ref`). Other
27//! backends box their native error type and set the `backend` tag
28//! accordingly.
29//!
30//! # Atomicity contract
31//!
32//! Per-op state transitions MUST be atomic (RFC-012 §3.4). On Valkey
33//! this is the single-FCALL-per-op property; on Postgres it is the
34//! per-transaction property. A backend that cannot honour atomicity
35//! for a given op either MUST NOT implement `EngineBackend` or MUST
36//! return `EngineError::Unavailable { op }` for the affected method.
37//!
38//! # Replay semantics
39//!
40//! `complete`, `fail`, `cancel`, `suspend`, `delay`, `wait_children`
41//! are idempotent under replay — calling twice with the same handle
42//! and args returns the same outcome (success on first call, typed
43//! `State` / `Contention` on subsequent calls where the fence triple
44//! no longer matches a live lease).
45
46use std::time::Duration;
47
48use async_trait::async_trait;
49
50use crate::backend::{
51    AppendFrameOutcome, CancelFlowPolicy, CancelFlowWait, CapabilitySet, ClaimPolicy,
52    FailOutcome, FailureClass, FailureReason, Frame, Handle, LeaseRenewal, PendingWaitpoint,
53    ReclaimToken, ResumeSignal, SummaryDocument, TailVisibility,
54};
55use crate::contracts::{
56    CancelFlowResult, ExecutionSnapshot, FlowSnapshot, ReportUsageResult, SuspendArgs,
57    SuspendOutcome,
58};
59#[cfg(feature = "core")]
60use crate::contracts::{
61    ClaimResumedExecutionArgs, ClaimResumedExecutionResult, DeliverSignalArgs, DeliverSignalResult,
62    EdgeDirection, EdgeSnapshot, ListExecutionsPage, ListFlowsPage, ListLanesPage,
63    ListSuspendedPage,
64};
65#[cfg(feature = "core")]
66use crate::partition::PartitionKey;
67#[cfg(feature = "streaming")]
68use crate::contracts::{StreamCursor, StreamFrames};
69use crate::engine_error::EngineError;
70#[cfg(feature = "streaming")]
71use crate::types::AttemptIndex;
72#[cfg(feature = "core")]
73use crate::types::EdgeId;
74use crate::types::{BudgetId, ExecutionId, FlowId, LaneId, TimestampMs};
75
76/// The engine write surface — a single trait a backend implementation
77/// honours to serve a `FlowFabricWorker`.
78///
79/// See RFC-012 §3.1 for the inventory rationale and §3.3 for the
80/// type-level shape. 16 methods (Round-7 added `create_waitpoint`;
81/// `append_frame` return widened; `report_usage` return replaced —
82/// RFC-012 §R7). Issue #150 added the two trigger-surface methods
83/// (`deliver_signal` / `claim_resumed_execution`).
84///
85/// # Note on `complete` payload shape
86///
87/// The RFC §3.3 sketch uses `Option<Bytes>`; the Stage 1a trait uses
88/// `Option<Vec<u8>>` to match the existing
89/// `ff_sdk::ClaimedTask::complete` signature and avoid adding a
90/// `bytes` public-type dep for zero consumer benefit. Round-4 §7.17
91/// resolved the payload container debate to `Box<[u8]>` in the
92/// public type (see `HandleOpaque`); `Option<Vec<u8>>` is the
93/// zero-churn choice consistent with today's code. Consumers that
94/// need `&[u8]` can borrow via `.as_deref()` on the Option.
95#[async_trait]
96pub trait EngineBackend: Send + Sync + 'static {
97    // ── Claim + lifecycle ──
98
99    /// Fresh-work claim. Returns `Ok(None)` when no work is currently
100    /// available; `Err` only on transport or input-validation faults.
101    async fn claim(
102        &self,
103        lane: &LaneId,
104        capabilities: &CapabilitySet,
105        policy: ClaimPolicy,
106    ) -> Result<Option<Handle>, EngineError>;
107
108    /// Renew a held lease. Returns the updated expiry + epoch on
109    /// success; typed `State::StaleLease` / `State::LeaseExpired`
110    /// when the lease has been stolen or timed out.
111    async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError>;
112
113    /// Numeric-progress heartbeat.
114    ///
115    /// Writes scalar `progress_percent` / `progress_message` fields on
116    /// `exec_core`; each call overwrites the previous value. This does
117    /// NOT append to the output stream — stream-frame producers must use
118    /// [`append_frame`](Self::append_frame) instead.
119    async fn progress(
120        &self,
121        handle: &Handle,
122        percent: Option<u8>,
123        message: Option<String>,
124    ) -> Result<(), EngineError>;
125
126    /// Append one stream frame. Distinct from [`progress`](Self::progress)
127    /// per RFC-012 §3.1.1 K#6. Returns the backend-assigned stream entry
128    /// id and post-append frame count (RFC-012 §R7.2.1).
129    ///
130    /// Stream-frame producers (arbitrary `frame_type` + payload, consumed
131    /// via the read/tail surfaces) MUST use this method rather than
132    /// [`progress`](Self::progress); the latter updates scalar fields on
133    /// `exec_core` and is invisible to stream consumers.
134    async fn append_frame(
135        &self,
136        handle: &Handle,
137        frame: Frame,
138    ) -> Result<AppendFrameOutcome, EngineError>;
139
140    /// Terminal success. Borrows `handle` (round-4 M-D2) so callers
141    /// can retry under `EngineError::Transport` without losing the
142    /// cookie. Payload is `Option<Vec<u8>>` per the note above.
143    async fn complete(&self, handle: &Handle, payload: Option<Vec<u8>>) -> Result<(), EngineError>;
144
145    /// Terminal failure with classification. Returns [`FailOutcome`]
146    /// so the caller learns whether a retry was scheduled.
147    async fn fail(
148        &self,
149        handle: &Handle,
150        reason: FailureReason,
151        classification: FailureClass,
152    ) -> Result<FailOutcome, EngineError>;
153
154    /// Cooperative cancel by the worker holding the lease.
155    async fn cancel(&self, handle: &Handle, reason: &str) -> Result<(), EngineError>;
156
157    /// Suspend the execution awaiting a typed resume condition
158    /// (RFC-013 Stage 1d).
159    ///
160    /// Borrows `handle` (round-4 M-D2). Terminal-looking behaviour is
161    /// expressed through [`SuspendOutcome`]:
162    ///
163    /// * [`SuspendOutcome::Suspended`] — the pre-suspend handle is
164    ///   logically invalidated; the fresh `HandleKind::Suspended`
165    ///   handle inside the variant supersedes it. Runtime enforcement
166    ///   via the fence triple: subsequent ops against the stale handle
167    ///   surface as `Contention(LeaseConflict)`.
168    /// * [`SuspendOutcome::AlreadySatisfied`] — buffered signals on a
169    ///   pending waitpoint already matched the resume condition at
170    ///   suspension time. The lease is NOT released; the caller's
171    ///   pre-suspend handle remains valid.
172    ///
173    /// See RFC-013 §2 for the type shapes, §3 for the replay /
174    /// idempotency contract, §4 for the error taxonomy.
175    async fn suspend(
176        &self,
177        handle: &Handle,
178        args: SuspendArgs,
179    ) -> Result<SuspendOutcome, EngineError>;
180
181    /// Issue a pending waitpoint for future signal delivery.
182    ///
183    /// Waitpoints have two states in the Valkey wire contract:
184    /// **pending** (token issued, not yet backing a suspension) and
185    /// **active** (bound to a suspension). This method creates a
186    /// waitpoint in the **pending** state. A later `suspend` call
187    /// transitions a pending waitpoint to active (see Lua
188    /// `use_pending_waitpoint` ARGV flag at
189    /// `flowfabric.lua:3603,3641,3690`) — or, if buffered signals
190    /// already satisfy its condition, the suspend call returns
191    /// `SuspendOutcome::AlreadySatisfied` and the waitpoint activates
192    /// without ever releasing the lease.
193    ///
194    /// Pending-waitpoint expiry is a first-class terminal error on
195    /// the wire (`PendingWaitpointExpired` at
196    /// `ff-script/src/error.rs:170,403-408`). The attempt retains its
197    /// lease while the waitpoint is pending; signals delivered to
198    /// this waitpoint are buffered server-side (RFC-012 §R7.2.2).
199    async fn create_waitpoint(
200        &self,
201        handle: &Handle,
202        waitpoint_key: &str,
203        expires_in: Duration,
204    ) -> Result<PendingWaitpoint, EngineError>;
205
206    /// Non-mutating observation of signals that satisfied the handle's
207    /// resume condition.
208    async fn observe_signals(&self, handle: &Handle) -> Result<Vec<ResumeSignal>, EngineError>;
209
210    /// Consume a reclaim grant to mint a resumed-kind handle. Returns
211    /// `Ok(None)` when the grant's target execution is no longer
212    /// resumable (already reclaimed, terminal, etc.).
213    async fn claim_from_reclaim(&self, token: ReclaimToken) -> Result<Option<Handle>, EngineError>;
214
215    // Round-5 amendment: lease-releasing peers of `suspend`.
216
217    /// Park the execution until `delay_until`, releasing the lease.
218    async fn delay(&self, handle: &Handle, delay_until: TimestampMs) -> Result<(), EngineError>;
219
220    /// Mark the execution as waiting for its child flow to complete,
221    /// releasing the lease.
222    async fn wait_children(&self, handle: &Handle) -> Result<(), EngineError>;
223
224    // ── Read / admin ──
225
226    /// Snapshot an execution by id. `Ok(None)` ⇒ no such execution.
227    async fn describe_execution(
228        &self,
229        id: &ExecutionId,
230    ) -> Result<Option<ExecutionSnapshot>, EngineError>;
231
232    /// Snapshot a flow by id. `Ok(None)` ⇒ no such flow.
233    async fn describe_flow(&self, id: &FlowId) -> Result<Option<FlowSnapshot>, EngineError>;
234
235    /// List dependency edges adjacent to an execution. Read-only; the
236    /// backend resolves the subject execution's flow, reads the
237    /// direction-specific adjacency SET, and decodes each member's
238    /// flow-scoped `edge:<edge_id>` hash.
239    ///
240    /// Returns an empty `Vec` when the subject has no edges on the
241    /// requested side — including standalone executions (no owning
242    /// flow). Ordering is unspecified: the underlying adjacency SET
243    /// is an unordered SMEMBERS read. Callers that need deterministic
244    /// order should sort by [`EdgeSnapshot::edge_id`] /
245    /// [`EdgeSnapshot::created_at`] themselves.
246    ///
247    /// Parse failures on the edge hash surface as
248    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]
249    /// — unknown fields, missing required fields, endpoint mismatches
250    /// against the adjacency SET all fail loud rather than silently
251    /// returning partial results.
252    ///
253    /// Gated on the `core` feature — edge reads are part of the
254    /// minimal engine surface a Postgres-style backend must honour.
255    ///
256    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
257    #[cfg(feature = "core")]
258    async fn list_edges(
259        &self,
260        flow_id: &FlowId,
261        direction: EdgeDirection,
262    ) -> Result<Vec<EdgeSnapshot>, EngineError>;
263
264    /// Snapshot a single dependency edge by its owning flow + edge id.
265    ///
266    /// `Ok(None)` when the edge hash is absent (never staged, or
267    /// staged under a different flow than `flow_id`). Parse failures
268    /// on a present edge hash surface as
269    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]
270    /// — the stored `flow_id` field is cross-checked against the
271    /// caller's expected `flow_id` so a wrong-key read fails loud
272    /// rather than returning an unrelated edge.
273    ///
274    /// Gated on the `core` feature — single-edge reads are part of
275    /// the minimal snapshot surface an alternate backend must honour
276    /// alongside [`Self::describe_execution`] / [`Self::describe_flow`]
277    /// / [`Self::list_edges`].
278    ///
279    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
280    #[cfg(feature = "core")]
281    async fn describe_edge(
282        &self,
283        flow_id: &FlowId,
284        edge_id: &EdgeId,
285    ) -> Result<Option<EdgeSnapshot>, EngineError>;
286
287    /// Resolve an execution's owning flow id, if any.
288    ///
289    /// `Ok(None)` when the execution's core record is absent or has
290    /// no associated flow (standalone execution). A present-but-
291    /// malformed `flow_id` field surfaces as
292    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`].
293    ///
294    /// Gated on the `core` feature. Used by ff-sdk's
295    /// `list_outgoing_edges` / `list_incoming_edges` to pivot from a
296    /// consumer-supplied `ExecutionId` to the `FlowId` required by
297    /// [`Self::list_edges`]. A Valkey backend serves this with a
298    /// single `HGET exec_core flow_id`; a Postgres backend serves it
299    /// with the equivalent single-column row lookup.
300    ///
301    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
302    #[cfg(feature = "core")]
303    async fn resolve_execution_flow_id(
304        &self,
305        eid: &ExecutionId,
306    ) -> Result<Option<FlowId>, EngineError>;
307
308    /// List flows on a partition with cursor-based pagination (issue
309    /// #185).
310    ///
311    /// Returns a [`ListFlowsPage`] of [`FlowSummary`](crate::contracts::FlowSummary)
312    /// rows ordered by `flow_id` (UUID byte-lexicographic). `cursor`
313    /// is `None` for the first page; callers forward the returned
314    /// `next_cursor` verbatim to continue iteration, and the listing
315    /// is exhausted when `next_cursor` is `None`. `limit` is the
316    /// maximum number of rows to return on this page — implementations
317    /// MAY return fewer (end of partition) but MUST NOT exceed it.
318    ///
319    /// Ordering rationale: flow ids are UUIDs, and both Valkey
320    /// (sort after-the-fact) and Postgres (`ORDER BY flow_id`) can
321    /// agree on byte-lexicographic order — the same order
322    /// `FlowId::to_string()` produces for canonical hyphenated UUIDs.
323    /// Mapping to `cursor > flow_id` keeps the contract backend-
324    /// independent.
325    ///
326    /// # Postgres implementation pattern
327    ///
328    /// A Postgres-backed implementation serves this directly with
329    ///
330    /// ```sql
331    /// SELECT flow_id, created_at_ms, public_flow_state
332    ///   FROM ff_flow
333    ///  WHERE partition_key = $1
334    ///    AND ($2::uuid IS NULL OR flow_id > $2)
335    ///  ORDER BY flow_id
336    ///  LIMIT $3 + 1;
337    /// ```
338    ///
339    /// — reading one extra row to decide whether `next_cursor` should
340    /// be set to the last row's `flow_id`. The Valkey implementation
341    /// maintains the `ff:idx:{fp:N}:flow_index` SET and performs the
342    /// sort + slice client-side (SMEMBERS then sort-by-UUID-bytes),
343    /// pipelining `HGETALL flow_core` for each row on the page.
344    ///
345    /// Gated on the `core` feature — flow listing is part of the
346    /// minimal engine surface a Postgres-style backend must honour.
347    #[cfg(feature = "core")]
348    async fn list_flows(
349        &self,
350        partition: PartitionKey,
351        cursor: Option<FlowId>,
352        limit: usize,
353    ) -> Result<ListFlowsPage, EngineError>;
354
355    /// Enumerate registered lanes with cursor-based pagination.
356    ///
357    /// Lanes are global (not partition-scoped) — the backend serves
358    /// this from its lane registry and does NOT accept a
359    /// [`crate::partition::Partition`] argument. Results are sorted
360    /// by [`LaneId`] name so the ordering is stable across calls and
361    /// cursors address a deterministic position in the sort.
362    ///
363    /// * `cursor` — exclusive lower bound. `None` starts from the
364    ///   first lane. To continue a walk, pass the previous page's
365    ///   [`ListLanesPage::next_cursor`].
366    /// * `limit` — hard cap on the number of lanes returned in the
367    ///   page. Backends MAY round this down when the registry size
368    ///   is smaller; they MUST NOT return more than `limit`.
369    ///
370    /// [`ListLanesPage::next_cursor`] is `Some(last_lane_in_page)`
371    /// iff at least one more lane exists after the returned page,
372    /// and `None` on the final page. Callers loop until `next_cursor`
373    /// is `None` to read the full registry.
374    ///
375    /// Gated on the `core` feature — lane enumeration is part of the
376    /// minimal snapshot surface an alternate backend must honour
377    /// alongside [`Self::describe_flow`] / [`Self::list_edges`].
378    #[cfg(feature = "core")]
379    async fn list_lanes(
380        &self,
381        cursor: Option<LaneId>,
382        limit: usize,
383    ) -> Result<ListLanesPage, EngineError>;
384
385    /// List suspended executions in one partition, cursor-paginated,
386    /// with each entry's suspension `reason_code` populated (issue
387    /// #183).
388    ///
389    /// Consumer-facing "what's blocked on what?" panels (ff-board's
390    /// suspended-executions view, operator CLIs) need the reason in
391    /// the list response so the UI does not round-trip per row to
392    /// `describe_execution` for a field it knows it needs. `reason`
393    /// on [`SuspendedExecutionEntry`] carries the free-form
394    /// `suspension:current.reason_code` field — see the type rustdoc
395    /// for the String-not-enum rationale.
396    ///
397    /// `cursor` is opaque to callers; pass `None` to start a fresh
398    /// scan and feed the returned [`ListSuspendedPage::next_cursor`]
399    /// back in on subsequent pages until it comes back `None`.
400    /// `limit` bounds the `entries` count; backends MAY return fewer
401    /// when the partition is exhausted.
402    ///
403    /// Ordering is by ascending `suspended_at_ms` (the per-lane
404    /// suspended ZSET score == `timeout_at` or the no-timeout
405    /// sentinel) with execution id as a lex tiebreak, so cursor
406    /// continuation is deterministic across calls.
407    ///
408    /// Gated on the `core` feature — suspended-list enumeration is
409    /// part of the minimal engine surface a Postgres-style backend
410    /// must honour.
411    #[cfg(feature = "core")]
412    async fn list_suspended(
413        &self,
414        partition: PartitionKey,
415        cursor: Option<ExecutionId>,
416        limit: usize,
417    ) -> Result<ListSuspendedPage, EngineError>;
418
419    /// Forward-only paginated listing of the executions indexed under
420    /// one partition.
421    ///
422    /// Reads the partition-wide `ff:idx:{p:N}:all_executions` set,
423    /// sorts lexicographically on `ExecutionId`, and returns the page
424    /// of ids strictly greater than `cursor` (or starting from the
425    /// smallest id when `cursor = None`). The returned
426    /// [`ListExecutionsPage::next_cursor`] is the last id on the page
427    /// iff at least one more id exists past it; `None` signals
428    /// end-of-stream.
429    ///
430    /// `limit` is the maximum number of ids returned on this page. A
431    /// `limit` of `0` returns an empty page with `next_cursor = None`.
432    /// Backends MAY cap `limit` internally (Valkey: 1000) and return
433    /// fewer ids than requested; callers continue paginating until
434    /// `next_cursor == None`.
435    ///
436    /// Ordering is stable under concurrent inserts for already-emitted
437    /// ids (an id less-than-or-equal-to the caller's cursor is never
438    /// re-emitted in later pages) but new inserts past the cursor WILL
439    /// appear in subsequent pages — consistent with forward-only
440    /// cursor semantics.
441    ///
442    /// Gated on the `core` feature — partition-scoped listing is part
443    /// of the minimal engine surface every backend must honour.
444    #[cfg(feature = "core")]
445    async fn list_executions(
446        &self,
447        partition: PartitionKey,
448        cursor: Option<ExecutionId>,
449        limit: usize,
450    ) -> Result<ListExecutionsPage, EngineError>;
451
452    // ── Trigger ops (issue #150) ──
453
454    /// Deliver an external signal to a suspended execution's waitpoint.
455    ///
456    /// The backend atomically records the signal, evaluates the resume
457    /// condition, and — when satisfied — transitions the execution
458    /// from `suspended` to `runnable` (or buffers the signal when the
459    /// waitpoint is still `pending`). Duplicate delivery — same
460    /// `idempotency_key` + waitpoint — surfaces as
461    /// [`DeliverSignalResult::Duplicate`] with the pre-existing
462    /// `signal_id` rather than mutating state twice.
463    ///
464    /// Input validation (HMAC token presence, payload size limits,
465    /// signal-name shape) is the backend's responsibility; callers
466    /// pass a fully populated [`DeliverSignalArgs`] and receive typed
467    /// outcomes or typed errors (`ScriptError::invalid_token`,
468    /// `ScriptError::token_expired`, `ScriptError::ExecutionNotFound`
469    /// surfaced via [`EngineError::Transport`] on the Valkey backend).
470    ///
471    /// Gated on the `core` feature — signal delivery is part of the
472    /// minimal trigger surface every backend must honour so ff-server
473    /// / REST handlers can dispatch against `Arc<dyn EngineBackend>`
474    /// without knowing which backend is running underneath.
475    #[cfg(feature = "core")]
476    async fn deliver_signal(
477        &self,
478        args: DeliverSignalArgs,
479    ) -> Result<DeliverSignalResult, EngineError>;
480
481    /// Claim a resumed execution — a previously-suspended attempt that
482    /// has cleared its resume condition (e.g. via
483    /// [`Self::deliver_signal`]) and now needs a worker to pick up the
484    /// same attempt index.
485    ///
486    /// Distinct from [`Self::claim`] (fresh work) and
487    /// [`Self::claim_from_reclaim`] (grant-based ownership transfer
488    /// after a crash): the resumed-claim path re-binds an existing
489    /// attempt rather than minting a new one. The backend issues a
490    /// fresh `lease_id` + bumps the `lease_epoch`, preserving
491    /// `attempt_id` / `attempt_index` so stream frames and progress
492    /// updates continue on the same attempt.
493    ///
494    /// Typed failures surface via `ScriptError` → `EngineError`:
495    /// `NotAResumedExecution` when the attempt state is not
496    /// `attempt_interrupted`, `ExecutionNotLeaseable` when the
497    /// lifecycle phase is not `runnable`, and `InvalidClaimGrant`
498    /// when the grant key is missing or was already consumed.
499    ///
500    /// Gated on the `core` feature — resumed-claim is part of the
501    /// minimal trigger surface every backend must honour.
502    #[cfg(feature = "core")]
503    async fn claim_resumed_execution(
504        &self,
505        args: ClaimResumedExecutionArgs,
506    ) -> Result<ClaimResumedExecutionResult, EngineError>;
507
508    /// Operator-initiated cancellation of a flow and (optionally) its
509    /// member executions. See RFC-012 §3.1.1 for the policy /wait
510    /// matrix.
511    async fn cancel_flow(
512        &self,
513        id: &FlowId,
514        policy: CancelFlowPolicy,
515        wait: CancelFlowWait,
516    ) -> Result<CancelFlowResult, EngineError>;
517
518    /// RFC-016 Stage A: set the inbound-edge-group policy for a
519    /// downstream execution. Must be called before the first
520    /// `add_dependency(... -> downstream_execution_id)` — the backend
521    /// rejects with [`EngineError::Conflict`] if edges have already
522    /// been staged for this group.
523    ///
524    /// Stage A honours only
525    /// [`EdgeDependencyPolicy::AllOf`](crate::contracts::EdgeDependencyPolicy::AllOf);
526    /// the `AnyOf` / `Quorum` variants return
527    /// [`EngineError::Validation`] with
528    /// `detail = "stage A supports AllOf only; AnyOf/Quorum land in stage B"`
529    /// until Stage B's resolver lands.
530    #[cfg(feature = "core")]
531    async fn set_edge_group_policy(
532        &self,
533        flow_id: &FlowId,
534        downstream_execution_id: &ExecutionId,
535        policy: crate::contracts::EdgeDependencyPolicy,
536    ) -> Result<crate::contracts::SetEdgeGroupPolicyResult, EngineError>;
537
538    // ── Budget ──
539
540    /// Report usage against a budget and check limits. Returns the
541    /// typed [`ReportUsageResult`] variant; backends enforce
542    /// idempotency via the caller-supplied
543    /// [`UsageDimensions::dedup_key`] (RFC-012 §R7.2.3 — replaces
544    /// the pre-Round-7 `AdmissionDecision` return).
545    async fn report_usage(
546        &self,
547        handle: &Handle,
548        budget: &BudgetId,
549        dimensions: crate::backend::UsageDimensions,
550    ) -> Result<ReportUsageResult, EngineError>;
551
552    // ── Stream reads (RFC-012 Stage 1c tranche-4; issue #87) ──
553
554    /// Read frames from a completed or in-flight attempt's stream.
555    ///
556    /// `from` / `to` are [`StreamCursor`] values — `StreamCursor::Start`
557    /// / `StreamCursor::End` are equivalent to XRANGE `-` / `+`, and
558    /// `StreamCursor::At("<id>")` reads from a concrete entry id.
559    ///
560    /// Input validation (count_limit bounds, cursor shape) is the
561    /// caller's responsibility — SDK-side wrappers in
562    /// [`ff-sdk`](https://docs.rs/ff-sdk) enforce bounds before
563    /// forwarding. Backends MAY additionally reject out-of-range
564    /// input via [`EngineError::Validation`].
565    ///
566    /// Gated on the `streaming` feature — stream reads are part of
567    /// the stream-subset surface a backend without XREAD-like
568    /// primitives may omit.
569    #[cfg(feature = "streaming")]
570    async fn read_stream(
571        &self,
572        execution_id: &ExecutionId,
573        attempt_index: AttemptIndex,
574        from: StreamCursor,
575        to: StreamCursor,
576        count_limit: u64,
577    ) -> Result<StreamFrames, EngineError>;
578
579    /// Tail a live attempt's stream.
580    ///
581    /// `after` is an exclusive [`StreamCursor`] — entries with id
582    /// strictly greater than `after` are returned. `StreamCursor::Start`
583    /// / `StreamCursor::End` are NOT accepted here; callers MUST pass
584    /// a concrete id (or `StreamCursor::from_beginning()`). The SDK
585    /// wrapper rejects the open markers before reaching the backend.
586    ///
587    /// `block_ms == 0` → non-blocking peek. `block_ms > 0` → blocks up
588    /// to that many ms for a new entry.
589    ///
590    /// `visibility` (RFC-015 §6.1) filters the returned entries by
591    /// their stored [`StreamMode`](crate::backend::StreamMode)
592    /// `mode` field. Default
593    /// [`TailVisibility::All`](crate::backend::TailVisibility::All)
594    /// preserves v1 behaviour.
595    ///
596    /// Gated on the `streaming` feature — see [`read_stream`](Self::read_stream).
597    #[cfg(feature = "streaming")]
598    async fn tail_stream(
599        &self,
600        execution_id: &ExecutionId,
601        attempt_index: AttemptIndex,
602        after: StreamCursor,
603        block_ms: u64,
604        count_limit: u64,
605        visibility: TailVisibility,
606    ) -> Result<StreamFrames, EngineError>;
607
608    /// Read the rolling summary document for an attempt (RFC-015 §6.3).
609    ///
610    /// Returns `Ok(None)` when no [`StreamMode::DurableSummary`](crate::backend::StreamMode::DurableSummary)
611    /// frame has ever been appended for the attempt. Non-blocking Hash
612    /// read; safe to call from any consumer without holding the lease.
613    ///
614    /// Gated on the `streaming` feature — summary reads are part of
615    /// the stream-subset surface.
616    #[cfg(feature = "streaming")]
617    async fn read_summary(
618        &self,
619        execution_id: &ExecutionId,
620        attempt_index: AttemptIndex,
621    ) -> Result<Option<SummaryDocument>, EngineError>;
622}
623
624/// Object-safety assertion: `dyn EngineBackend` compiles iff every
625/// method is dyn-compatible. Kept as a compile-time guard so a future
626/// trait change that accidentally breaks dyn-safety fails the build
627/// at this site rather than at every downstream `Arc<dyn
628/// EngineBackend>` use.
629#[allow(dead_code)]
630fn _assert_dyn_compatible(_: &dyn EngineBackend) {}