Skip to main content

ff_core/
engine_backend.rs

1//! The `EngineBackend` trait — abstracting FlowFabric's write surface.
2//!
3//! **RFC-012 Stage 1a:** this is the trait landing. The
4//! Valkey-backed impl lives in `ff-backend-valkey`; future backends
5//! (Postgres) add a sibling crate with their own impl. ff-sdk's
6//! `FlowFabricWorker` gains `connect_with(backend)` /
7//! `backend(&self)` accessors so consumers that want to bring their
8//! own backend (tests, future non-Valkey deployments) can hand one
9//! in. The hot-path migration of `ClaimedTask` / `FlowFabricWorker`
10//! to forward through the trait lands across Stages 1b-1d.
11//!
12//! # Object safety
13//!
14//! `EngineBackend` is object-safe: all methods are `async fn` behind
15//! `#[async_trait]` and take `&self`. Consumers can hold
16//! `Arc<dyn EngineBackend>` for heterogenous-backend deployments.
17//! The trait is `Send + Sync + 'static` per RFC-012 §4.1; every impl
18//! must honour that bound.
19//!
20//! # Error surface
21//!
22//! Every method returns [`Result<_, EngineError>`]. `EngineError`'s
23//! `Transport` variant carries a boxed `dyn Error + Send + Sync`;
24//! Valkey-backed transport faults box a
25//! `ff_script::error::ScriptError` (downcast via
26//! `ff_script::engine_error_ext::transport_script_ref`). Other
27//! backends box their native error type and set the `backend` tag
28//! accordingly.
29//!
30//! # Atomicity contract
31//!
32//! Per-op state transitions MUST be atomic (RFC-012 §3.4). On Valkey
33//! this is the single-FCALL-per-op property; on Postgres it is the
34//! per-transaction property. A backend that cannot honour atomicity
35//! for a given op either MUST NOT implement `EngineBackend` or MUST
36//! return `EngineError::Unavailable { op }` for the affected method.
37//!
38//! # Replay semantics
39//!
40//! `complete`, `fail`, `cancel`, `suspend`, `delay`, `wait_children`
41//! are idempotent under replay — calling twice with the same handle
42//! and args returns the same outcome (success on first call, typed
43//! `State` / `Contention` on subsequent calls where the fence triple
44//! no longer matches a live lease).
45
46use std::time::Duration;
47
48use async_trait::async_trait;
49
50use crate::backend::{
51    AppendFrameOutcome, CancelFlowPolicy, CancelFlowWait, CapabilitySet, ClaimPolicy,
52    FailOutcome, FailureClass, FailureReason, Frame, Handle, LeaseRenewal, PendingWaitpoint,
53    ReclaimToken, ResumeSignal, WaitpointSpec,
54};
55use crate::contracts::{CancelFlowResult, ExecutionSnapshot, FlowSnapshot, ReportUsageResult};
56#[cfg(feature = "core")]
57use crate::contracts::{EdgeDirection, EdgeSnapshot};
58#[cfg(feature = "streaming")]
59use crate::contracts::{StreamCursor, StreamFrames};
60use crate::engine_error::EngineError;
61#[cfg(feature = "streaming")]
62use crate::types::AttemptIndex;
63use crate::types::{BudgetId, ExecutionId, FlowId, LaneId, TimestampMs};
64
65/// The engine write surface — a single trait a backend implementation
66/// honours to serve a `FlowFabricWorker`.
67///
68/// See RFC-012 §3.1 for the inventory rationale and §3.3 for the
69/// type-level shape. 16 methods (Round-7 added `create_waitpoint`;
70/// `append_frame` return widened; `report_usage` return replaced —
71/// RFC-012 §R7).
72///
73/// # Note on `complete` payload shape
74///
75/// The RFC §3.3 sketch uses `Option<Bytes>`; the Stage 1a trait uses
76/// `Option<Vec<u8>>` to match the existing
77/// `ff_sdk::ClaimedTask::complete` signature and avoid adding a
78/// `bytes` public-type dep for zero consumer benefit. Round-4 §7.17
79/// resolved the payload container debate to `Box<[u8]>` in the
80/// public type (see `HandleOpaque`); `Option<Vec<u8>>` is the
81/// zero-churn choice consistent with today's code. Consumers that
82/// need `&[u8]` can borrow via `.as_deref()` on the Option.
83#[async_trait]
84pub trait EngineBackend: Send + Sync + 'static {
85    // ── Claim + lifecycle ──
86
87    /// Fresh-work claim. Returns `Ok(None)` when no work is currently
88    /// available; `Err` only on transport or input-validation faults.
89    async fn claim(
90        &self,
91        lane: &LaneId,
92        capabilities: &CapabilitySet,
93        policy: ClaimPolicy,
94    ) -> Result<Option<Handle>, EngineError>;
95
96    /// Renew a held lease. Returns the updated expiry + epoch on
97    /// success; typed `State::StaleLease` / `State::LeaseExpired`
98    /// when the lease has been stolen or timed out.
99    async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError>;
100
101    /// Numeric-progress heartbeat.
102    async fn progress(
103        &self,
104        handle: &Handle,
105        percent: Option<u8>,
106        message: Option<String>,
107    ) -> Result<(), EngineError>;
108
109    /// Append one stream frame. Distinct from [`progress`](Self::progress)
110    /// per RFC-012 §3.1.1 K#6. Returns the backend-assigned stream entry
111    /// id and post-append frame count (RFC-012 §R7.2.1).
112    async fn append_frame(
113        &self,
114        handle: &Handle,
115        frame: Frame,
116    ) -> Result<AppendFrameOutcome, EngineError>;
117
118    /// Terminal success. Borrows `handle` (round-4 M-D2) so callers
119    /// can retry under `EngineError::Transport` without losing the
120    /// cookie. Payload is `Option<Vec<u8>>` per the note above.
121    async fn complete(&self, handle: &Handle, payload: Option<Vec<u8>>) -> Result<(), EngineError>;
122
123    /// Terminal failure with classification. Returns [`FailOutcome`]
124    /// so the caller learns whether a retry was scheduled.
125    async fn fail(
126        &self,
127        handle: &Handle,
128        reason: FailureReason,
129        classification: FailureClass,
130    ) -> Result<FailOutcome, EngineError>;
131
132    /// Cooperative cancel by the worker holding the lease.
133    async fn cancel(&self, handle: &Handle, reason: &str) -> Result<(), EngineError>;
134
135    /// Suspend the execution awaiting one or more waitpoints. Returns
136    /// a fresh `Handle` whose `HandleKind::Suspended` supersedes the
137    /// caller's pre-suspend handle.
138    async fn suspend(
139        &self,
140        handle: &Handle,
141        waitpoints: Vec<WaitpointSpec>,
142        timeout: Option<Duration>,
143    ) -> Result<Handle, EngineError>;
144
145    /// Issue a pending waitpoint for future signal delivery.
146    ///
147    /// Waitpoints have two states in the Valkey wire contract:
148    /// **pending** (token issued, not yet backing a suspension) and
149    /// **active** (bound to a suspension). This method creates a
150    /// waitpoint in the **pending** state. A later `suspend` call
151    /// transitions a pending waitpoint to active (see Lua
152    /// `use_pending_waitpoint` ARGV flag at
153    /// `flowfabric.lua:3603,3641,3690`) — or, if buffered signals
154    /// already satisfy its condition, the suspend call returns
155    /// `SuspendOutcome::AlreadySatisfied` and the waitpoint activates
156    /// without ever releasing the lease.
157    ///
158    /// Pending-waitpoint expiry is a first-class terminal error on
159    /// the wire (`PendingWaitpointExpired` at
160    /// `ff-script/src/error.rs:170,403-408`). The attempt retains its
161    /// lease while the waitpoint is pending; signals delivered to
162    /// this waitpoint are buffered server-side (RFC-012 §R7.2.2).
163    async fn create_waitpoint(
164        &self,
165        handle: &Handle,
166        waitpoint_key: &str,
167        expires_in: Duration,
168    ) -> Result<PendingWaitpoint, EngineError>;
169
170    /// Non-mutating observation of signals that satisfied the handle's
171    /// resume condition.
172    async fn observe_signals(&self, handle: &Handle) -> Result<Vec<ResumeSignal>, EngineError>;
173
174    /// Consume a reclaim grant to mint a resumed-kind handle. Returns
175    /// `Ok(None)` when the grant's target execution is no longer
176    /// resumable (already reclaimed, terminal, etc.).
177    async fn claim_from_reclaim(&self, token: ReclaimToken) -> Result<Option<Handle>, EngineError>;
178
179    // Round-5 amendment: lease-releasing peers of `suspend`.
180
181    /// Park the execution until `delay_until`, releasing the lease.
182    async fn delay(&self, handle: &Handle, delay_until: TimestampMs) -> Result<(), EngineError>;
183
184    /// Mark the execution as waiting for its child flow to complete,
185    /// releasing the lease.
186    async fn wait_children(&self, handle: &Handle) -> Result<(), EngineError>;
187
188    // ── Read / admin ──
189
190    /// Snapshot an execution by id. `Ok(None)` ⇒ no such execution.
191    async fn describe_execution(
192        &self,
193        id: &ExecutionId,
194    ) -> Result<Option<ExecutionSnapshot>, EngineError>;
195
196    /// Snapshot a flow by id. `Ok(None)` ⇒ no such flow.
197    async fn describe_flow(&self, id: &FlowId) -> Result<Option<FlowSnapshot>, EngineError>;
198
199    /// List dependency edges adjacent to an execution. Read-only; the
200    /// backend resolves the subject execution's flow, reads the
201    /// direction-specific adjacency SET, and decodes each member's
202    /// flow-scoped `edge:<edge_id>` hash.
203    ///
204    /// Returns an empty `Vec` when the subject has no edges on the
205    /// requested side — including standalone executions (no owning
206    /// flow). Ordering is unspecified: the underlying adjacency SET
207    /// is an unordered SMEMBERS read. Callers that need deterministic
208    /// order should sort by [`EdgeSnapshot::edge_id`] /
209    /// [`EdgeSnapshot::created_at`] themselves.
210    ///
211    /// Parse failures on the edge hash surface as
212    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]
213    /// — unknown fields, missing required fields, endpoint mismatches
214    /// against the adjacency SET all fail loud rather than silently
215    /// returning partial results.
216    ///
217    /// Gated on the `core` feature — edge reads are part of the
218    /// minimal engine surface a Postgres-style backend must honour.
219    ///
220    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
221    #[cfg(feature = "core")]
222    async fn list_edges(
223        &self,
224        flow_id: &FlowId,
225        direction: EdgeDirection,
226    ) -> Result<Vec<EdgeSnapshot>, EngineError>;
227
228    /// Operator-initiated cancellation of a flow and (optionally) its
229    /// member executions. See RFC-012 §3.1.1 for the policy /wait
230    /// matrix.
231    async fn cancel_flow(
232        &self,
233        id: &FlowId,
234        policy: CancelFlowPolicy,
235        wait: CancelFlowWait,
236    ) -> Result<CancelFlowResult, EngineError>;
237
238    // ── Budget ──
239
240    /// Report usage against a budget and check limits. Returns the
241    /// typed [`ReportUsageResult`] variant; backends enforce
242    /// idempotency via the caller-supplied
243    /// [`UsageDimensions::dedup_key`] (RFC-012 §R7.2.3 — replaces
244    /// the pre-Round-7 `AdmissionDecision` return).
245    async fn report_usage(
246        &self,
247        handle: &Handle,
248        budget: &BudgetId,
249        dimensions: crate::backend::UsageDimensions,
250    ) -> Result<ReportUsageResult, EngineError>;
251
252    // ── Stream reads (RFC-012 Stage 1c tranche-4; issue #87) ──
253
254    /// Read frames from a completed or in-flight attempt's stream.
255    ///
256    /// `from` / `to` are [`StreamCursor`] values — `StreamCursor::Start`
257    /// / `StreamCursor::End` are equivalent to XRANGE `-` / `+`, and
258    /// `StreamCursor::At("<id>")` reads from a concrete entry id.
259    ///
260    /// Input validation (count_limit bounds, cursor shape) is the
261    /// caller's responsibility — SDK-side wrappers in
262    /// [`ff-sdk`](https://docs.rs/ff-sdk) enforce bounds before
263    /// forwarding. Backends MAY additionally reject out-of-range
264    /// input via [`EngineError::Validation`].
265    ///
266    /// Gated on the `streaming` feature — stream reads are part of
267    /// the stream-subset surface a backend without XREAD-like
268    /// primitives may omit.
269    #[cfg(feature = "streaming")]
270    async fn read_stream(
271        &self,
272        execution_id: &ExecutionId,
273        attempt_index: AttemptIndex,
274        from: StreamCursor,
275        to: StreamCursor,
276        count_limit: u64,
277    ) -> Result<StreamFrames, EngineError>;
278
279    /// Tail a live attempt's stream.
280    ///
281    /// `after` is an exclusive [`StreamCursor`] — entries with id
282    /// strictly greater than `after` are returned. `StreamCursor::Start`
283    /// / `StreamCursor::End` are NOT accepted here; callers MUST pass
284    /// a concrete id (or `StreamCursor::from_beginning()`). The SDK
285    /// wrapper rejects the open markers before reaching the backend.
286    ///
287    /// `block_ms == 0` → non-blocking peek. `block_ms > 0` → blocks up
288    /// to that many ms for a new entry.
289    ///
290    /// Gated on the `streaming` feature — see [`read_stream`](Self::read_stream).
291    #[cfg(feature = "streaming")]
292    async fn tail_stream(
293        &self,
294        execution_id: &ExecutionId,
295        attempt_index: AttemptIndex,
296        after: StreamCursor,
297        block_ms: u64,
298        count_limit: u64,
299    ) -> Result<StreamFrames, EngineError>;
300}
301
302/// Object-safety assertion: `dyn EngineBackend` compiles iff every
303/// method is dyn-compatible. Kept as a compile-time guard so a future
304/// trait change that accidentally breaks dyn-safety fails the build
305/// at this site rather than at every downstream `Arc<dyn
306/// EngineBackend>` use.
307#[allow(dead_code)]
308fn _assert_dyn_compatible(_: &dyn EngineBackend) {}