ff_core/engine_backend.rs
1//! The `EngineBackend` trait — abstracting FlowFabric's write surface.
2//!
3//! **RFC-012 Stage 1a:** this is the trait landing. The
4//! Valkey-backed impl lives in `ff-backend-valkey`; future backends
5//! (Postgres) add a sibling crate with their own impl. ff-sdk's
6//! `FlowFabricWorker` gains `connect_with(backend)` /
7//! `backend(&self)` accessors so consumers that want to bring their
8//! own backend (tests, future non-Valkey deployments) can hand one
9//! in. The hot-path migration of `ClaimedTask` / `FlowFabricWorker`
10//! to forward through the trait lands across Stages 1b-1d.
11//!
12//! # Object safety
13//!
14//! `EngineBackend` is object-safe: all methods are `async fn` behind
15//! `#[async_trait]` and take `&self`. Consumers can hold
16//! `Arc<dyn EngineBackend>` for heterogenous-backend deployments.
17//! The trait is `Send + Sync + 'static` per RFC-012 §4.1; every impl
18//! must honour that bound.
19//!
20//! # Error surface
21//!
22//! Every method returns [`Result<_, EngineError>`]. `EngineError`'s
23//! `Transport` variant carries a boxed `dyn Error + Send + Sync`;
24//! Valkey-backed transport faults box a
25//! `ff_script::error::ScriptError` (downcast via
26//! `ff_script::engine_error_ext::transport_script_ref`). Other
27//! backends box their native error type and set the `backend` tag
28//! accordingly.
29//!
30//! # Atomicity contract
31//!
32//! Per-op state transitions MUST be atomic (RFC-012 §3.4). On Valkey
33//! this is the single-FCALL-per-op property; on Postgres it is the
34//! per-transaction property. A backend that cannot honour atomicity
35//! for a given op either MUST NOT implement `EngineBackend` or MUST
36//! return `EngineError::Unavailable { op }` for the affected method.
37//!
38//! # Replay semantics
39//!
40//! `complete`, `fail`, `cancel`, `suspend`, `delay`, `wait_children`
41//! are idempotent under replay — calling twice with the same handle
42//! and args returns the same outcome (success on first call, typed
43//! `State` / `Contention` on subsequent calls where the fence triple
44//! no longer matches a live lease).
45
46use std::time::Duration;
47
48use async_trait::async_trait;
49
50use crate::backend::{
51 AppendFrameOutcome, CancelFlowPolicy, CancelFlowWait, CapabilitySet, ClaimPolicy,
52 FailOutcome, FailureClass, FailureReason, Frame, Handle, LeaseRenewal, PendingWaitpoint,
53 ReclaimToken, ResumeSignal, WaitpointSpec,
54};
55use crate::contracts::{CancelFlowResult, ExecutionSnapshot, FlowSnapshot, ReportUsageResult};
56#[cfg(feature = "core")]
57use crate::contracts::{
58 ClaimResumedExecutionArgs, ClaimResumedExecutionResult, DeliverSignalArgs, DeliverSignalResult,
59 EdgeDirection, EdgeSnapshot, ListExecutionsPage, ListFlowsPage, ListLanesPage,
60 ListSuspendedPage,
61};
62#[cfg(feature = "core")]
63use crate::partition::PartitionKey;
64#[cfg(feature = "streaming")]
65use crate::contracts::{StreamCursor, StreamFrames};
66use crate::engine_error::EngineError;
67#[cfg(feature = "streaming")]
68use crate::types::AttemptIndex;
69#[cfg(feature = "core")]
70use crate::types::EdgeId;
71use crate::types::{BudgetId, ExecutionId, FlowId, LaneId, TimestampMs};
72
73/// The engine write surface — a single trait a backend implementation
74/// honours to serve a `FlowFabricWorker`.
75///
76/// See RFC-012 §3.1 for the inventory rationale and §3.3 for the
77/// type-level shape. 16 methods (Round-7 added `create_waitpoint`;
78/// `append_frame` return widened; `report_usage` return replaced —
79/// RFC-012 §R7). Issue #150 added the two trigger-surface methods
80/// (`deliver_signal` / `claim_resumed_execution`).
81///
82/// # Note on `complete` payload shape
83///
84/// The RFC §3.3 sketch uses `Option<Bytes>`; the Stage 1a trait uses
85/// `Option<Vec<u8>>` to match the existing
86/// `ff_sdk::ClaimedTask::complete` signature and avoid adding a
87/// `bytes` public-type dep for zero consumer benefit. Round-4 §7.17
88/// resolved the payload container debate to `Box<[u8]>` in the
89/// public type (see `HandleOpaque`); `Option<Vec<u8>>` is the
90/// zero-churn choice consistent with today's code. Consumers that
91/// need `&[u8]` can borrow via `.as_deref()` on the Option.
92#[async_trait]
93pub trait EngineBackend: Send + Sync + 'static {
94 // ── Claim + lifecycle ──
95
96 /// Fresh-work claim. Returns `Ok(None)` when no work is currently
97 /// available; `Err` only on transport or input-validation faults.
98 async fn claim(
99 &self,
100 lane: &LaneId,
101 capabilities: &CapabilitySet,
102 policy: ClaimPolicy,
103 ) -> Result<Option<Handle>, EngineError>;
104
105 /// Renew a held lease. Returns the updated expiry + epoch on
106 /// success; typed `State::StaleLease` / `State::LeaseExpired`
107 /// when the lease has been stolen or timed out.
108 async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError>;
109
110 /// Numeric-progress heartbeat.
111 ///
112 /// Writes scalar `progress_percent` / `progress_message` fields on
113 /// `exec_core`; each call overwrites the previous value. This does
114 /// NOT append to the output stream — stream-frame producers must use
115 /// [`append_frame`](Self::append_frame) instead.
116 async fn progress(
117 &self,
118 handle: &Handle,
119 percent: Option<u8>,
120 message: Option<String>,
121 ) -> Result<(), EngineError>;
122
123 /// Append one stream frame. Distinct from [`progress`](Self::progress)
124 /// per RFC-012 §3.1.1 K#6. Returns the backend-assigned stream entry
125 /// id and post-append frame count (RFC-012 §R7.2.1).
126 ///
127 /// Stream-frame producers (arbitrary `frame_type` + payload, consumed
128 /// via the read/tail surfaces) MUST use this method rather than
129 /// [`progress`](Self::progress); the latter updates scalar fields on
130 /// `exec_core` and is invisible to stream consumers.
131 async fn append_frame(
132 &self,
133 handle: &Handle,
134 frame: Frame,
135 ) -> Result<AppendFrameOutcome, EngineError>;
136
137 /// Terminal success. Borrows `handle` (round-4 M-D2) so callers
138 /// can retry under `EngineError::Transport` without losing the
139 /// cookie. Payload is `Option<Vec<u8>>` per the note above.
140 async fn complete(&self, handle: &Handle, payload: Option<Vec<u8>>) -> Result<(), EngineError>;
141
142 /// Terminal failure with classification. Returns [`FailOutcome`]
143 /// so the caller learns whether a retry was scheduled.
144 async fn fail(
145 &self,
146 handle: &Handle,
147 reason: FailureReason,
148 classification: FailureClass,
149 ) -> Result<FailOutcome, EngineError>;
150
151 /// Cooperative cancel by the worker holding the lease.
152 async fn cancel(&self, handle: &Handle, reason: &str) -> Result<(), EngineError>;
153
154 /// Suspend the execution awaiting one or more waitpoints. Returns
155 /// a fresh `Handle` whose `HandleKind::Suspended` supersedes the
156 /// caller's pre-suspend handle.
157 async fn suspend(
158 &self,
159 handle: &Handle,
160 waitpoints: Vec<WaitpointSpec>,
161 timeout: Option<Duration>,
162 ) -> Result<Handle, EngineError>;
163
164 /// Issue a pending waitpoint for future signal delivery.
165 ///
166 /// Waitpoints have two states in the Valkey wire contract:
167 /// **pending** (token issued, not yet backing a suspension) and
168 /// **active** (bound to a suspension). This method creates a
169 /// waitpoint in the **pending** state. A later `suspend` call
170 /// transitions a pending waitpoint to active (see Lua
171 /// `use_pending_waitpoint` ARGV flag at
172 /// `flowfabric.lua:3603,3641,3690`) — or, if buffered signals
173 /// already satisfy its condition, the suspend call returns
174 /// `SuspendOutcome::AlreadySatisfied` and the waitpoint activates
175 /// without ever releasing the lease.
176 ///
177 /// Pending-waitpoint expiry is a first-class terminal error on
178 /// the wire (`PendingWaitpointExpired` at
179 /// `ff-script/src/error.rs:170,403-408`). The attempt retains its
180 /// lease while the waitpoint is pending; signals delivered to
181 /// this waitpoint are buffered server-side (RFC-012 §R7.2.2).
182 async fn create_waitpoint(
183 &self,
184 handle: &Handle,
185 waitpoint_key: &str,
186 expires_in: Duration,
187 ) -> Result<PendingWaitpoint, EngineError>;
188
189 /// Non-mutating observation of signals that satisfied the handle's
190 /// resume condition.
191 async fn observe_signals(&self, handle: &Handle) -> Result<Vec<ResumeSignal>, EngineError>;
192
193 /// Consume a reclaim grant to mint a resumed-kind handle. Returns
194 /// `Ok(None)` when the grant's target execution is no longer
195 /// resumable (already reclaimed, terminal, etc.).
196 async fn claim_from_reclaim(&self, token: ReclaimToken) -> Result<Option<Handle>, EngineError>;
197
198 // Round-5 amendment: lease-releasing peers of `suspend`.
199
200 /// Park the execution until `delay_until`, releasing the lease.
201 async fn delay(&self, handle: &Handle, delay_until: TimestampMs) -> Result<(), EngineError>;
202
203 /// Mark the execution as waiting for its child flow to complete,
204 /// releasing the lease.
205 async fn wait_children(&self, handle: &Handle) -> Result<(), EngineError>;
206
207 // ── Read / admin ──
208
209 /// Snapshot an execution by id. `Ok(None)` ⇒ no such execution.
210 async fn describe_execution(
211 &self,
212 id: &ExecutionId,
213 ) -> Result<Option<ExecutionSnapshot>, EngineError>;
214
215 /// Snapshot a flow by id. `Ok(None)` ⇒ no such flow.
216 async fn describe_flow(&self, id: &FlowId) -> Result<Option<FlowSnapshot>, EngineError>;
217
218 /// List dependency edges adjacent to an execution. Read-only; the
219 /// backend resolves the subject execution's flow, reads the
220 /// direction-specific adjacency SET, and decodes each member's
221 /// flow-scoped `edge:<edge_id>` hash.
222 ///
223 /// Returns an empty `Vec` when the subject has no edges on the
224 /// requested side — including standalone executions (no owning
225 /// flow). Ordering is unspecified: the underlying adjacency SET
226 /// is an unordered SMEMBERS read. Callers that need deterministic
227 /// order should sort by [`EdgeSnapshot::edge_id`] /
228 /// [`EdgeSnapshot::created_at`] themselves.
229 ///
230 /// Parse failures on the edge hash surface as
231 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]
232 /// — unknown fields, missing required fields, endpoint mismatches
233 /// against the adjacency SET all fail loud rather than silently
234 /// returning partial results.
235 ///
236 /// Gated on the `core` feature — edge reads are part of the
237 /// minimal engine surface a Postgres-style backend must honour.
238 ///
239 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
240 #[cfg(feature = "core")]
241 async fn list_edges(
242 &self,
243 flow_id: &FlowId,
244 direction: EdgeDirection,
245 ) -> Result<Vec<EdgeSnapshot>, EngineError>;
246
247 /// Snapshot a single dependency edge by its owning flow + edge id.
248 ///
249 /// `Ok(None)` when the edge hash is absent (never staged, or
250 /// staged under a different flow than `flow_id`). Parse failures
251 /// on a present edge hash surface as
252 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]
253 /// — the stored `flow_id` field is cross-checked against the
254 /// caller's expected `flow_id` so a wrong-key read fails loud
255 /// rather than returning an unrelated edge.
256 ///
257 /// Gated on the `core` feature — single-edge reads are part of
258 /// the minimal snapshot surface an alternate backend must honour
259 /// alongside [`Self::describe_execution`] / [`Self::describe_flow`]
260 /// / [`Self::list_edges`].
261 ///
262 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
263 #[cfg(feature = "core")]
264 async fn describe_edge(
265 &self,
266 flow_id: &FlowId,
267 edge_id: &EdgeId,
268 ) -> Result<Option<EdgeSnapshot>, EngineError>;
269
270 /// Resolve an execution's owning flow id, if any.
271 ///
272 /// `Ok(None)` when the execution's core record is absent or has
273 /// no associated flow (standalone execution). A present-but-
274 /// malformed `flow_id` field surfaces as
275 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`].
276 ///
277 /// Gated on the `core` feature. Used by ff-sdk's
278 /// `list_outgoing_edges` / `list_incoming_edges` to pivot from a
279 /// consumer-supplied `ExecutionId` to the `FlowId` required by
280 /// [`Self::list_edges`]. A Valkey backend serves this with a
281 /// single `HGET exec_core flow_id`; a Postgres backend serves it
282 /// with the equivalent single-column row lookup.
283 ///
284 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
285 #[cfg(feature = "core")]
286 async fn resolve_execution_flow_id(
287 &self,
288 eid: &ExecutionId,
289 ) -> Result<Option<FlowId>, EngineError>;
290
291 /// List flows on a partition with cursor-based pagination (issue
292 /// #185).
293 ///
294 /// Returns a [`ListFlowsPage`] of [`FlowSummary`](crate::contracts::FlowSummary)
295 /// rows ordered by `flow_id` (UUID byte-lexicographic). `cursor`
296 /// is `None` for the first page; callers forward the returned
297 /// `next_cursor` verbatim to continue iteration, and the listing
298 /// is exhausted when `next_cursor` is `None`. `limit` is the
299 /// maximum number of rows to return on this page — implementations
300 /// MAY return fewer (end of partition) but MUST NOT exceed it.
301 ///
302 /// Ordering rationale: flow ids are UUIDs, and both Valkey
303 /// (sort after-the-fact) and Postgres (`ORDER BY flow_id`) can
304 /// agree on byte-lexicographic order — the same order
305 /// `FlowId::to_string()` produces for canonical hyphenated UUIDs.
306 /// Mapping to `cursor > flow_id` keeps the contract backend-
307 /// independent.
308 ///
309 /// # Postgres implementation pattern
310 ///
311 /// A Postgres-backed implementation serves this directly with
312 ///
313 /// ```sql
314 /// SELECT flow_id, created_at_ms, public_flow_state
315 /// FROM ff_flow
316 /// WHERE partition_key = $1
317 /// AND ($2::uuid IS NULL OR flow_id > $2)
318 /// ORDER BY flow_id
319 /// LIMIT $3 + 1;
320 /// ```
321 ///
322 /// — reading one extra row to decide whether `next_cursor` should
323 /// be set to the last row's `flow_id`. The Valkey implementation
324 /// maintains the `ff:idx:{fp:N}:flow_index` SET and performs the
325 /// sort + slice client-side (SMEMBERS then sort-by-UUID-bytes),
326 /// pipelining `HGETALL flow_core` for each row on the page.
327 ///
328 /// Gated on the `core` feature — flow listing is part of the
329 /// minimal engine surface a Postgres-style backend must honour.
330 #[cfg(feature = "core")]
331 async fn list_flows(
332 &self,
333 partition: PartitionKey,
334 cursor: Option<FlowId>,
335 limit: usize,
336 ) -> Result<ListFlowsPage, EngineError>;
337
338 /// Enumerate registered lanes with cursor-based pagination.
339 ///
340 /// Lanes are global (not partition-scoped) — the backend serves
341 /// this from its lane registry and does NOT accept a
342 /// [`crate::partition::Partition`] argument. Results are sorted
343 /// by [`LaneId`] name so the ordering is stable across calls and
344 /// cursors address a deterministic position in the sort.
345 ///
346 /// * `cursor` — exclusive lower bound. `None` starts from the
347 /// first lane. To continue a walk, pass the previous page's
348 /// [`ListLanesPage::next_cursor`].
349 /// * `limit` — hard cap on the number of lanes returned in the
350 /// page. Backends MAY round this down when the registry size
351 /// is smaller; they MUST NOT return more than `limit`.
352 ///
353 /// [`ListLanesPage::next_cursor`] is `Some(last_lane_in_page)`
354 /// iff at least one more lane exists after the returned page,
355 /// and `None` on the final page. Callers loop until `next_cursor`
356 /// is `None` to read the full registry.
357 ///
358 /// Gated on the `core` feature — lane enumeration is part of the
359 /// minimal snapshot surface an alternate backend must honour
360 /// alongside [`Self::describe_flow`] / [`Self::list_edges`].
361 #[cfg(feature = "core")]
362 async fn list_lanes(
363 &self,
364 cursor: Option<LaneId>,
365 limit: usize,
366 ) -> Result<ListLanesPage, EngineError>;
367
368 /// List suspended executions in one partition, cursor-paginated,
369 /// with each entry's suspension `reason_code` populated (issue
370 /// #183).
371 ///
372 /// Consumer-facing "what's blocked on what?" panels (ff-board's
373 /// suspended-executions view, operator CLIs) need the reason in
374 /// the list response so the UI does not round-trip per row to
375 /// `describe_execution` for a field it knows it needs. `reason`
376 /// on [`SuspendedExecutionEntry`] carries the free-form
377 /// `suspension:current.reason_code` field — see the type rustdoc
378 /// for the String-not-enum rationale.
379 ///
380 /// `cursor` is opaque to callers; pass `None` to start a fresh
381 /// scan and feed the returned [`ListSuspendedPage::next_cursor`]
382 /// back in on subsequent pages until it comes back `None`.
383 /// `limit` bounds the `entries` count; backends MAY return fewer
384 /// when the partition is exhausted.
385 ///
386 /// Ordering is by ascending `suspended_at_ms` (the per-lane
387 /// suspended ZSET score == `timeout_at` or the no-timeout
388 /// sentinel) with execution id as a lex tiebreak, so cursor
389 /// continuation is deterministic across calls.
390 ///
391 /// Gated on the `core` feature — suspended-list enumeration is
392 /// part of the minimal engine surface a Postgres-style backend
393 /// must honour.
394 #[cfg(feature = "core")]
395 async fn list_suspended(
396 &self,
397 partition: PartitionKey,
398 cursor: Option<ExecutionId>,
399 limit: usize,
400 ) -> Result<ListSuspendedPage, EngineError>;
401
402 /// Forward-only paginated listing of the executions indexed under
403 /// one partition.
404 ///
405 /// Reads the partition-wide `ff:idx:{p:N}:all_executions` set,
406 /// sorts lexicographically on `ExecutionId`, and returns the page
407 /// of ids strictly greater than `cursor` (or starting from the
408 /// smallest id when `cursor = None`). The returned
409 /// [`ListExecutionsPage::next_cursor`] is the last id on the page
410 /// iff at least one more id exists past it; `None` signals
411 /// end-of-stream.
412 ///
413 /// `limit` is the maximum number of ids returned on this page. A
414 /// `limit` of `0` returns an empty page with `next_cursor = None`.
415 /// Backends MAY cap `limit` internally (Valkey: 1000) and return
416 /// fewer ids than requested; callers continue paginating until
417 /// `next_cursor == None`.
418 ///
419 /// Ordering is stable under concurrent inserts for already-emitted
420 /// ids (an id less-than-or-equal-to the caller's cursor is never
421 /// re-emitted in later pages) but new inserts past the cursor WILL
422 /// appear in subsequent pages — consistent with forward-only
423 /// cursor semantics.
424 ///
425 /// Gated on the `core` feature — partition-scoped listing is part
426 /// of the minimal engine surface every backend must honour.
427 #[cfg(feature = "core")]
428 async fn list_executions(
429 &self,
430 partition: PartitionKey,
431 cursor: Option<ExecutionId>,
432 limit: usize,
433 ) -> Result<ListExecutionsPage, EngineError>;
434
435 // ── Trigger ops (issue #150) ──
436
437 /// Deliver an external signal to a suspended execution's waitpoint.
438 ///
439 /// The backend atomically records the signal, evaluates the resume
440 /// condition, and — when satisfied — transitions the execution
441 /// from `suspended` to `runnable` (or buffers the signal when the
442 /// waitpoint is still `pending`). Duplicate delivery — same
443 /// `idempotency_key` + waitpoint — surfaces as
444 /// [`DeliverSignalResult::Duplicate`] with the pre-existing
445 /// `signal_id` rather than mutating state twice.
446 ///
447 /// Input validation (HMAC token presence, payload size limits,
448 /// signal-name shape) is the backend's responsibility; callers
449 /// pass a fully populated [`DeliverSignalArgs`] and receive typed
450 /// outcomes or typed errors (`ScriptError::invalid_token`,
451 /// `ScriptError::token_expired`, `ScriptError::ExecutionNotFound`
452 /// surfaced via [`EngineError::Transport`] on the Valkey backend).
453 ///
454 /// Gated on the `core` feature — signal delivery is part of the
455 /// minimal trigger surface every backend must honour so ff-server
456 /// / REST handlers can dispatch against `Arc<dyn EngineBackend>`
457 /// without knowing which backend is running underneath.
458 #[cfg(feature = "core")]
459 async fn deliver_signal(
460 &self,
461 args: DeliverSignalArgs,
462 ) -> Result<DeliverSignalResult, EngineError>;
463
464 /// Claim a resumed execution — a previously-suspended attempt that
465 /// has cleared its resume condition (e.g. via
466 /// [`Self::deliver_signal`]) and now needs a worker to pick up the
467 /// same attempt index.
468 ///
469 /// Distinct from [`Self::claim`] (fresh work) and
470 /// [`Self::claim_from_reclaim`] (grant-based ownership transfer
471 /// after a crash): the resumed-claim path re-binds an existing
472 /// attempt rather than minting a new one. The backend issues a
473 /// fresh `lease_id` + bumps the `lease_epoch`, preserving
474 /// `attempt_id` / `attempt_index` so stream frames and progress
475 /// updates continue on the same attempt.
476 ///
477 /// Typed failures surface via `ScriptError` → `EngineError`:
478 /// `NotAResumedExecution` when the attempt state is not
479 /// `attempt_interrupted`, `ExecutionNotLeaseable` when the
480 /// lifecycle phase is not `runnable`, and `InvalidClaimGrant`
481 /// when the grant key is missing or was already consumed.
482 ///
483 /// Gated on the `core` feature — resumed-claim is part of the
484 /// minimal trigger surface every backend must honour.
485 #[cfg(feature = "core")]
486 async fn claim_resumed_execution(
487 &self,
488 args: ClaimResumedExecutionArgs,
489 ) -> Result<ClaimResumedExecutionResult, EngineError>;
490
491 /// Operator-initiated cancellation of a flow and (optionally) its
492 /// member executions. See RFC-012 §3.1.1 for the policy /wait
493 /// matrix.
494 async fn cancel_flow(
495 &self,
496 id: &FlowId,
497 policy: CancelFlowPolicy,
498 wait: CancelFlowWait,
499 ) -> Result<CancelFlowResult, EngineError>;
500
501 // ── Budget ──
502
503 /// Report usage against a budget and check limits. Returns the
504 /// typed [`ReportUsageResult`] variant; backends enforce
505 /// idempotency via the caller-supplied
506 /// [`UsageDimensions::dedup_key`] (RFC-012 §R7.2.3 — replaces
507 /// the pre-Round-7 `AdmissionDecision` return).
508 async fn report_usage(
509 &self,
510 handle: &Handle,
511 budget: &BudgetId,
512 dimensions: crate::backend::UsageDimensions,
513 ) -> Result<ReportUsageResult, EngineError>;
514
515 // ── Stream reads (RFC-012 Stage 1c tranche-4; issue #87) ──
516
517 /// Read frames from a completed or in-flight attempt's stream.
518 ///
519 /// `from` / `to` are [`StreamCursor`] values — `StreamCursor::Start`
520 /// / `StreamCursor::End` are equivalent to XRANGE `-` / `+`, and
521 /// `StreamCursor::At("<id>")` reads from a concrete entry id.
522 ///
523 /// Input validation (count_limit bounds, cursor shape) is the
524 /// caller's responsibility — SDK-side wrappers in
525 /// [`ff-sdk`](https://docs.rs/ff-sdk) enforce bounds before
526 /// forwarding. Backends MAY additionally reject out-of-range
527 /// input via [`EngineError::Validation`].
528 ///
529 /// Gated on the `streaming` feature — stream reads are part of
530 /// the stream-subset surface a backend without XREAD-like
531 /// primitives may omit.
532 #[cfg(feature = "streaming")]
533 async fn read_stream(
534 &self,
535 execution_id: &ExecutionId,
536 attempt_index: AttemptIndex,
537 from: StreamCursor,
538 to: StreamCursor,
539 count_limit: u64,
540 ) -> Result<StreamFrames, EngineError>;
541
542 /// Tail a live attempt's stream.
543 ///
544 /// `after` is an exclusive [`StreamCursor`] — entries with id
545 /// strictly greater than `after` are returned. `StreamCursor::Start`
546 /// / `StreamCursor::End` are NOT accepted here; callers MUST pass
547 /// a concrete id (or `StreamCursor::from_beginning()`). The SDK
548 /// wrapper rejects the open markers before reaching the backend.
549 ///
550 /// `block_ms == 0` → non-blocking peek. `block_ms > 0` → blocks up
551 /// to that many ms for a new entry.
552 ///
553 /// Gated on the `streaming` feature — see [`read_stream`](Self::read_stream).
554 #[cfg(feature = "streaming")]
555 async fn tail_stream(
556 &self,
557 execution_id: &ExecutionId,
558 attempt_index: AttemptIndex,
559 after: StreamCursor,
560 block_ms: u64,
561 count_limit: u64,
562 ) -> Result<StreamFrames, EngineError>;
563}
564
565/// Object-safety assertion: `dyn EngineBackend` compiles iff every
566/// method is dyn-compatible. Kept as a compile-time guard so a future
567/// trait change that accidentally breaks dyn-safety fails the build
568/// at this site rather than at every downstream `Arc<dyn
569/// EngineBackend>` use.
570#[allow(dead_code)]
571fn _assert_dyn_compatible(_: &dyn EngineBackend) {}