ff_core/engine_backend.rs
1//! The `EngineBackend` trait — abstracting FlowFabric's write surface.
2//!
3//! **RFC-012 Stage 1a:** this is the trait landing. The
4//! Valkey-backed impl lives in `ff-backend-valkey`; future backends
5//! (Postgres) add a sibling crate with their own impl. ff-sdk's
6//! `FlowFabricWorker` gains `connect_with(backend)` /
7//! `backend(&self)` accessors so consumers that want to bring their
8//! own backend (tests, future non-Valkey deployments) can hand one
9//! in. The hot-path migration of `ClaimedTask` / `FlowFabricWorker`
10//! to forward through the trait lands across Stages 1b-1d.
11//!
12//! # Object safety
13//!
14//! `EngineBackend` is object-safe: all methods are `async fn` behind
15//! `#[async_trait]` and take `&self`. Consumers can hold
16//! `Arc<dyn EngineBackend>` for heterogenous-backend deployments.
17//! The trait is `Send + Sync + 'static` per RFC-012 §4.1; every impl
18//! must honour that bound.
19//!
20//! # Error surface
21//!
22//! Every method returns [`Result<_, EngineError>`]. `EngineError`'s
23//! `Transport` variant carries a boxed `dyn Error + Send + Sync`;
24//! Valkey-backed transport faults box a
25//! `ff_script::error::ScriptError` (downcast via
26//! `ff_script::engine_error_ext::transport_script_ref`). Other
27//! backends box their native error type and set the `backend` tag
28//! accordingly.
29//!
30//! # Atomicity contract
31//!
32//! Per-op state transitions MUST be atomic (RFC-012 §3.4). On Valkey
33//! this is the single-FCALL-per-op property; on Postgres it is the
34//! per-transaction property. A backend that cannot honour atomicity
35//! for a given op either MUST NOT implement `EngineBackend` or MUST
36//! return `EngineError::Unavailable { op }` for the affected method.
37//!
38//! # Replay semantics
39//!
40//! `complete`, `fail`, `cancel`, `suspend`, `delay`, `wait_children`
41//! are idempotent under replay — calling twice with the same handle
42//! and args returns the same outcome (success on first call, typed
43//! `State` / `Contention` on subsequent calls where the fence triple
44//! no longer matches a live lease).
45
46use std::time::Duration;
47
48use async_trait::async_trait;
49
50use crate::backend::{
51 AppendFrameOutcome, CancelFlowPolicy, CancelFlowWait, CapabilitySet, ClaimPolicy,
52 FailOutcome, FailureClass, FailureReason, Frame, Handle, LeaseRenewal, PendingWaitpoint,
53 PrepareOutcome, ResumeSignal, ResumeToken,
54};
55// `SummaryDocument` and `TailVisibility` are referenced only inside
56// `#[cfg(feature = "streaming")]` trait methods below, so the imports
57// must be gated to avoid an unused-imports warning on the non-streaming
58// build.
59#[cfg(feature = "streaming")]
60use crate::backend::{SummaryDocument, TailVisibility};
61use crate::contracts::{
62 CancelFlowResult, ExecutionContext, ExecutionSnapshot, FlowSnapshot, IssueReclaimGrantArgs,
63 IssueReclaimGrantOutcome, ReclaimExecutionArgs, ReclaimExecutionOutcome, ReportUsageResult,
64 RotateWaitpointHmacSecretAllArgs, RotateWaitpointHmacSecretAllResult, SeedOutcome,
65 SeedWaitpointHmacSecretArgs, SuspendArgs, SuspendOutcome,
66};
67#[cfg(feature = "core")]
68use crate::contracts::{
69 AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
70 ApplyDependencyToChildResult, BudgetStatus, CancelExecutionArgs, CancelExecutionResult,
71 CancelFlowArgs, ChangePriorityArgs, ChangePriorityResult, ClaimExecutionArgs,
72 ClaimExecutionResult, ClaimForWorkerArgs, ClaimForWorkerOutcome, ClaimResumedExecutionArgs,
73 ClaimResumedExecutionResult,
74 BlockRouteArgs, BlockRouteOutcome, CreateBudgetArgs, CreateBudgetResult,
75 CreateExecutionArgs, CreateExecutionResult, CreateFlowArgs, CreateFlowResult,
76 CreateQuotaPolicyArgs, CreateQuotaPolicyResult,
77 DeliverSignalArgs, DeliverSignalResult, EdgeDirection, EdgeSnapshot, ExecutionInfo,
78 IssueClaimGrantArgs, IssueClaimGrantOutcome, ScanEligibleArgs,
79 ListExecutionsPage, ListFlowsPage, ListLanesPage, ListPendingWaitpointsArgs,
80 ListPendingWaitpointsResult, ListSuspendedPage, ReplayExecutionArgs, ReplayExecutionResult,
81 ReportUsageAdminArgs, ResetBudgetArgs, ResetBudgetResult, RevokeLeaseArgs, RevokeLeaseResult,
82 StageDependencyEdgeArgs, StageDependencyEdgeResult,
83};
84#[cfg(feature = "core")]
85use crate::state::PublicState;
86#[cfg(feature = "core")]
87use crate::partition::PartitionKey;
88#[cfg(feature = "streaming")]
89use crate::contracts::{StreamCursor, StreamFrames};
90use crate::engine_error::EngineError;
91#[cfg(feature = "core")]
92use crate::types::EdgeId;
93use crate::types::{AttemptIndex, BudgetId, ExecutionId, FlowId, LaneId, LeaseFence, TimestampMs};
94
95/// The engine write surface — a single trait a backend implementation
96/// honours to serve a `FlowFabricWorker`.
97///
98/// See RFC-012 §3.1 for the inventory rationale and §3.3 for the
99/// type-level shape. 16 methods (Round-7 added `create_waitpoint`;
100/// `append_frame` return widened; `report_usage` return replaced —
101/// RFC-012 §R7). Issue #150 added the two trigger-surface methods
102/// (`deliver_signal` / `claim_resumed_execution`).
103///
104/// # Note on `complete` payload shape
105///
106/// The RFC §3.3 sketch uses `Option<Bytes>`; the Stage 1a trait uses
107/// `Option<Vec<u8>>` to match the existing
108/// `ff_sdk::ClaimedTask::complete` signature and avoid adding a
109/// `bytes` public-type dep for zero consumer benefit. Round-4 §7.17
110/// resolved the payload container debate to `Box<[u8]>` in the
111/// public type (see `HandleOpaque`); `Option<Vec<u8>>` is the
112/// zero-churn choice consistent with today's code. Consumers that
113/// need `&[u8]` can borrow via `.as_deref()` on the Option.
114#[async_trait]
115pub trait EngineBackend: Send + Sync + 'static {
116 // ── Claim + lifecycle ──
117
118 /// Fresh-work claim. Returns `Ok(None)` when no work is currently
119 /// available; `Err` only on transport or input-validation faults.
120 async fn claim(
121 &self,
122 lane: &LaneId,
123 capabilities: &CapabilitySet,
124 policy: ClaimPolicy,
125 ) -> Result<Option<Handle>, EngineError>;
126
127 /// Renew a held lease. Returns the updated expiry + epoch on
128 /// success; typed `State::StaleLease` / `State::LeaseExpired`
129 /// when the lease has been stolen or timed out.
130 async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError>;
131
132 /// Numeric-progress heartbeat.
133 ///
134 /// Writes scalar `progress_percent` / `progress_message` fields on
135 /// `exec_core`; each call overwrites the previous value. This does
136 /// NOT append to the output stream — stream-frame producers must use
137 /// [`append_frame`](Self::append_frame) instead.
138 async fn progress(
139 &self,
140 handle: &Handle,
141 percent: Option<u8>,
142 message: Option<String>,
143 ) -> Result<(), EngineError>;
144
145 /// Append one stream frame. Distinct from [`progress`](Self::progress)
146 /// per RFC-012 §3.1.1 K#6. Returns the backend-assigned stream entry
147 /// id and post-append frame count (RFC-012 §R7.2.1).
148 ///
149 /// Stream-frame producers (arbitrary `frame_type` + payload, consumed
150 /// via the read/tail surfaces) MUST use this method rather than
151 /// [`progress`](Self::progress); the latter updates scalar fields on
152 /// `exec_core` and is invisible to stream consumers.
153 async fn append_frame(
154 &self,
155 handle: &Handle,
156 frame: Frame,
157 ) -> Result<AppendFrameOutcome, EngineError>;
158
159 /// Terminal success. Borrows `handle` (round-4 M-D2) so callers
160 /// can retry under `EngineError::Transport` without losing the
161 /// cookie. Payload is `Option<Vec<u8>>` per the note above.
162 async fn complete(&self, handle: &Handle, payload: Option<Vec<u8>>) -> Result<(), EngineError>;
163
164 /// Terminal failure with classification. Returns [`FailOutcome`]
165 /// so the caller learns whether a retry was scheduled.
166 async fn fail(
167 &self,
168 handle: &Handle,
169 reason: FailureReason,
170 classification: FailureClass,
171 ) -> Result<FailOutcome, EngineError>;
172
173 /// Cooperative cancel by the worker holding the lease.
174 async fn cancel(&self, handle: &Handle, reason: &str) -> Result<(), EngineError>;
175
176 /// Suspend the execution awaiting a typed resume condition
177 /// (RFC-013 Stage 1d).
178 ///
179 /// Borrows `handle` (round-4 M-D2). Terminal-looking behaviour is
180 /// expressed through [`SuspendOutcome`]:
181 ///
182 /// * [`SuspendOutcome::Suspended`] — the pre-suspend handle is
183 /// logically invalidated; the fresh `HandleKind::Suspended`
184 /// handle inside the variant supersedes it. Runtime enforcement
185 /// via the fence triple: subsequent ops against the stale handle
186 /// surface as `Contention(LeaseConflict)`.
187 /// * [`SuspendOutcome::AlreadySatisfied`] — buffered signals on a
188 /// pending waitpoint already matched the resume condition at
189 /// suspension time. The lease is NOT released; the caller's
190 /// pre-suspend handle remains valid.
191 ///
192 /// See RFC-013 §2 for the type shapes, §3 for the replay /
193 /// idempotency contract, §4 for the error taxonomy.
194 async fn suspend(
195 &self,
196 handle: &Handle,
197 args: SuspendArgs,
198 ) -> Result<SuspendOutcome, EngineError>;
199
200 /// Suspend by execution id + lease fence triple, for service-layer
201 /// callers that hold a run record / lease-claim descriptor but no
202 /// worker [`Handle`] (cairn issue #322).
203 ///
204 /// Semantics mirror [`Self::suspend`] exactly — the same
205 /// [`SuspendArgs`] validation, the same [`SuspendOutcome`]
206 /// lifecycle, the same RFC-013 §3 dedup / replay contract. The
207 /// only difference is the fencing source: instead of the
208 /// `(lease_id, lease_epoch, attempt_id)` fields embedded in a
209 /// `Handle`, the backend fences against the triple passed directly.
210 /// Attempt-index, lane, and worker-instance metadata that
211 /// [`Self::suspend`] reads from the handle payload are recovered
212 /// from the backend's authoritative execution record (Valkey:
213 /// `exec_core` HGETs; Postgres: `ff_attempt` row lookup).
214 ///
215 /// The default impl returns [`EngineError::Unavailable`] so
216 /// existing backend impls remain non-breaking. Production backends
217 /// (Valkey, Postgres) override.
218 async fn suspend_by_triple(
219 &self,
220 exec_id: ExecutionId,
221 triple: LeaseFence,
222 args: SuspendArgs,
223 ) -> Result<SuspendOutcome, EngineError> {
224 let _ = (exec_id, triple, args);
225 Err(EngineError::Unavailable {
226 op: "suspend_by_triple",
227 })
228 }
229
230 /// Issue a pending waitpoint for future signal delivery.
231 ///
232 /// Waitpoints have two states in the Valkey wire contract:
233 /// **pending** (token issued, not yet backing a suspension) and
234 /// **active** (bound to a suspension). This method creates a
235 /// waitpoint in the **pending** state. A later `suspend` call
236 /// transitions a pending waitpoint to active (see Lua
237 /// `use_pending_waitpoint` ARGV flag at
238 /// `flowfabric.lua:3603,3641,3690`) — or, if buffered signals
239 /// already satisfy its condition, the suspend call returns
240 /// `SuspendOutcome::AlreadySatisfied` and the waitpoint activates
241 /// without ever releasing the lease.
242 ///
243 /// Pending-waitpoint expiry is a first-class terminal error on
244 /// the wire (`PendingWaitpointExpired` at
245 /// `ff-script/src/error.rs:170,403-408`). The attempt retains its
246 /// lease while the waitpoint is pending; signals delivered to
247 /// this waitpoint are buffered server-side (RFC-012 §R7.2.2).
248 async fn create_waitpoint(
249 &self,
250 handle: &Handle,
251 waitpoint_key: &str,
252 expires_in: Duration,
253 ) -> Result<PendingWaitpoint, EngineError>;
254
255 /// Non-mutating observation of signals that satisfied the handle's
256 /// resume condition.
257 async fn observe_signals(&self, handle: &Handle) -> Result<Vec<ResumeSignal>, EngineError>;
258
259 /// Consume a resume grant (via [`ResumeToken`]) to mint a
260 /// resumed-kind handle. Routes to `ff_claim_resumed_execution` on
261 /// Valkey / the epoch-bump reconciler on PG/SQLite. Returns
262 /// `Ok(None)` when the grant's target execution is no longer
263 /// resumable (already reclaimed, terminal, etc.).
264 ///
265 /// **Renamed from `claim_from_reclaim` (RFC-024 PR-B+C).** The
266 /// pre-rename name advertised "reclaim" but the semantic has
267 /// always been resume-after-suspend. The new lease-reclaim path
268 /// lives on [`Self::reclaim_execution`].
269 async fn claim_from_resume_grant(
270 &self,
271 token: ResumeToken,
272 ) -> Result<Option<Handle>, EngineError>;
273
274 /// Issue a lease-reclaim grant (RFC-024 §3.2). Admits executions
275 /// in `lease_expired_reclaimable` or `lease_revoked` state to the
276 /// reclaim path; the returned [`IssueReclaimGrantOutcome::Granted`]
277 /// carries a [`crate::contracts::ReclaimGrant`] which is then fed
278 /// to [`Self::reclaim_execution`] to mint a fresh attempt.
279 ///
280 /// Default impl returns [`EngineError::Unavailable`] — PR-D (PG),
281 /// PR-E (SQLite), and PR-F (Valkey) override with real bodies.
282 async fn issue_reclaim_grant(
283 &self,
284 _args: IssueReclaimGrantArgs,
285 ) -> Result<IssueReclaimGrantOutcome, EngineError> {
286 Err(EngineError::Unavailable {
287 op: "issue_reclaim_grant",
288 })
289 }
290
291 /// Consume a [`crate::contracts::ReclaimGrant`] to mint a fresh
292 /// attempt for a previously lease-expired / lease-revoked
293 /// execution (RFC-024 §3.2). Creates a new attempt row, bumps the
294 /// execution's `lease_reclaim_count`, and mints a
295 /// [`crate::backend::HandleKind::Reclaimed`] handle.
296 ///
297 /// Default impl returns [`EngineError::Unavailable`] — PR-D (PG),
298 /// PR-E (SQLite), and PR-F (Valkey) override with real bodies.
299 async fn reclaim_execution(
300 &self,
301 _args: ReclaimExecutionArgs,
302 ) -> Result<ReclaimExecutionOutcome, EngineError> {
303 Err(EngineError::Unavailable {
304 op: "reclaim_execution",
305 })
306 }
307
308 // Round-5 amendment: lease-releasing peers of `suspend`.
309
310 /// Park the execution until `delay_until`, releasing the lease.
311 async fn delay(&self, handle: &Handle, delay_until: TimestampMs) -> Result<(), EngineError>;
312
313 /// Mark the execution as waiting for its child flow to complete,
314 /// releasing the lease.
315 async fn wait_children(&self, handle: &Handle) -> Result<(), EngineError>;
316
317 // ── Read / admin ──
318
319 /// Snapshot an execution by id. `Ok(None)` ⇒ no such execution.
320 async fn describe_execution(
321 &self,
322 id: &ExecutionId,
323 ) -> Result<Option<ExecutionSnapshot>, EngineError>;
324
325 /// Point-read of the execution-scoped `(input_payload,
326 /// execution_kind, tags)` bundle used by the SDK worker when
327 /// assembling a `ClaimedTask` (see `ff_sdk::ClaimedTask`) after a
328 /// successful claim.
329 ///
330 /// No default impl — every `EngineBackend` must answer this
331 /// explicitly. Distinct from [`Self::describe_execution`]
332 /// (read-model projection) because the SDK needs the raw payload
333 /// bytes + kind + tags immediately post-claim, and the snapshot
334 /// projection deliberately omits the payload bytes.
335 ///
336 /// Per-backend shape:
337 ///
338 /// * **Valkey** — pipelined `GET :payload` + `HGETALL :core`
339 /// + `HGETALL :tags` on the execution's partition (same pattern
340 /// as [`Self::describe_execution`]).
341 /// * **Postgres** — single `SELECT payload, raw_fields` on
342 /// `ff_exec_core` keyed by `(partition_key, execution_id)`;
343 /// `execution_kind` + `tags` live in `raw_fields` JSONB.
344 /// * **SQLite** — identical shape to Postgres.
345 ///
346 /// Returns [`EngineError::Validation { kind: ValidationKind::InvalidInput, .. }`](crate::engine_error::EngineError::Validation)
347 /// when the execution does not exist — the SDK worker only calls
348 /// this after a successful claim, so a missing row is a loud
349 /// storage-tier invariant violation rather than a routine `Ok(None)`.
350 async fn read_execution_context(
351 &self,
352 execution_id: &ExecutionId,
353 ) -> Result<ExecutionContext, EngineError>;
354
355 /// Point-read of the execution's current attempt-index **pointer**
356 /// — the index of the currently-leased attempt row.
357 ///
358 /// Distinct from [`Self::read_total_attempt_count`]: this method
359 /// names the attempt that *already exists* (pointer), whereas
360 /// `read_total_attempt_count` is the monotonic claim counter used
361 /// to compute the next fresh attempt index. See the sibling's
362 /// rustdoc for the retry-path scenario that motivates the split.
363 ///
364 /// Used on the SDK worker's `claim_from_resume_grant` path —
365 /// specifically the private `claim_resumed_execution` helper —
366 /// immediately before dispatching [`Self::claim_resumed_execution`].
367 /// The returned index is fed into
368 /// [`ClaimResumedExecutionArgs::current_attempt_index`](crate::contracts::ClaimResumedExecutionArgs)
369 /// so the backend's script / transaction targets the correct
370 /// existing attempt row (KEYS[6] on Valkey; `ff_attempt` PK tuple
371 /// on PG/SQLite).
372 ///
373 /// Per-backend shape:
374 ///
375 /// * **Valkey** — `HGET {exec}:core current_attempt_index` on the
376 /// execution's partition. Single command. Both the
377 /// **missing-field** case (`exec_core` present but
378 /// `current_attempt_index` absent or empty-string, i.e. pre-claim
379 /// state) **and** the **missing-row** case (no `exec_core` hash
380 /// at all) read back as `AttemptIndex(0)`. This preserves the
381 /// pre-PR-3 inline-`HGET` semantic and is safe because Valkey's
382 /// happy path requires `exec_core` to exist before this method
383 /// is reached — the SDK only calls `read_current_attempt_index`
384 /// post-grant, and grant issuance is gated on `exec_core`
385 /// presence. A genuinely absent row would surface as the proper
386 /// business-logic error (`NotAResumedExecution` /
387 /// `ExecutionNotLeaseable`) on the downstream FCALL.
388 /// * **Postgres** — `SELECT attempt_index FROM ff_exec_core
389 /// WHERE partition_key = $1 AND execution_id = $2`. The column
390 /// is `NOT NULL DEFAULT 0` so a pre-claim row reads back as `0`
391 /// (matching Valkey's missing-field case). **Missing row**
392 /// surfaces as [`EngineError::Validation { kind:
393 /// ValidationKind::InvalidInput, .. }`](crate::engine_error::EngineError::Validation)
394 /// — diverges from Valkey's missing-row `→ 0` mapping.
395 /// * **SQLite** — `SELECT attempt_index FROM ff_exec_core
396 /// WHERE partition_key = ? AND execution_id = ?`; identical
397 /// semantics to Postgres (missing-row → `InvalidInput`).
398 ///
399 /// **Cross-backend asymmetry on missing row is intentional.** The
400 /// SDK happy path never observes it (grant issuance on Valkey
401 /// requires `exec_core`, and PG/SQLite currently return
402 /// `Unavailable` from `claim_from_grant` per
403 /// `project_claim_from_grant_pg_sqlite_gap.md`). Consumers writing
404 /// backend-agnostic tooling against this method directly must
405 /// treat the missing-row case as backend-dependent — match on
406 /// `InvalidInput` for PG/SQLite, and treat an unexpected `0` as
407 /// the Valkey equivalent signal.
408 ///
409 /// The default impl returns [`EngineError::Unavailable`] so the
410 /// trait addition is non-breaking for out-of-tree backends (same
411 /// precedent as [`Self::read_execution_context`] landing in v0.12
412 /// PR-1).
413 async fn read_current_attempt_index(
414 &self,
415 _execution_id: &ExecutionId,
416 ) -> Result<AttemptIndex, EngineError> {
417 Err(EngineError::Unavailable {
418 op: "read_current_attempt_index",
419 })
420 }
421
422 /// Point-read of the execution's **total attempt counter** — the
423 /// monotonic count of claims that have ever fired against this
424 /// execution (including the in-flight one once claimed).
425 ///
426 /// Used on the SDK worker's `claim_from_grant` / `claim_execution`
427 /// path — the next attempt-index for a fresh claim is this
428 /// counter's current value (so `1` on the second retry after the
429 /// first attempt failed terminally). This is semantically distinct
430 /// from [`Self::read_current_attempt_index`], which is a *pointer*
431 /// at the currently-leased attempt row and is only meaningful on
432 /// the `claim_from_resume_grant` path (where a live attempt already
433 /// exists and we want to re-seat its lease rather than mint a new
434 /// attempt row).
435 ///
436 /// Reading the pointer on the `claim_from_grant` path was a live
437 /// bug: on the retry-of-a-retry scenario the pointer still named
438 /// the *previous* terminal-failed attempt, so the newly-minted
439 /// attempt collided with it (Valkey KEYS[6]) or mis-targeted the
440 /// PG/SQLite `ff_attempt` PK tuple. This method fixes that by
441 /// reading the counter that Lua 5920 / PG `ff_claim_execution` /
442 /// SQLite `claim_impl` all already consult when computing the
443 /// next attempt index.
444 ///
445 /// Per-backend shape:
446 ///
447 /// * **Valkey** — `HGET {exec}:core total_attempt_count` on the
448 /// execution's partition. Single command; pre-claim read (field
449 /// absent or empty) maps to `0`.
450 /// * **Postgres** — `SELECT raw_fields->>'total_attempt_count'
451 /// FROM ff_exec_core WHERE (partition_key, execution_id) = ...`.
452 /// The field lives in the JSONB `raw_fields` bag rather than a
453 /// dedicated column (mirrors how `create_execution_impl` seeds
454 /// it on row creation). Missing row → `InvalidInput`; missing
455 /// field → `0`.
456 /// * **SQLite** — `SELECT CAST(json_extract(raw_fields,
457 /// '$.total_attempt_count') AS INTEGER) FROM ff_exec_core
458 /// WHERE ...`. Same JSON-in-`raw_fields` shape as PG; uses the
459 /// same `json_extract` idiom already employed in
460 /// `ff-backend-sqlite/src/queries/operator.rs` for replay_count.
461 ///
462 /// The default impl returns [`EngineError::Unavailable`] so the
463 /// trait addition is non-breaking for out-of-tree backends (same
464 /// precedent as [`Self::read_current_attempt_index`] landing in
465 /// v0.12 PR-3).
466 async fn read_total_attempt_count(
467 &self,
468 _execution_id: &ExecutionId,
469 ) -> Result<AttemptIndex, EngineError> {
470 Err(EngineError::Unavailable {
471 op: "read_total_attempt_count",
472 })
473 }
474
475 /// Snapshot a flow by id. `Ok(None)` ⇒ no such flow.
476 async fn describe_flow(&self, id: &FlowId) -> Result<Option<FlowSnapshot>, EngineError>;
477
478 /// List dependency edges adjacent to an execution. Read-only; the
479 /// backend resolves the subject execution's flow, reads the
480 /// direction-specific adjacency SET, and decodes each member's
481 /// flow-scoped `edge:<edge_id>` hash.
482 ///
483 /// Returns an empty `Vec` when the subject has no edges on the
484 /// requested side — including standalone executions (no owning
485 /// flow). Ordering is unspecified: the underlying adjacency SET
486 /// is an unordered SMEMBERS read. Callers that need deterministic
487 /// order should sort by [`EdgeSnapshot::edge_id`] /
488 /// [`EdgeSnapshot::created_at`] themselves.
489 ///
490 /// Parse failures on the edge hash surface as
491 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]
492 /// — unknown fields, missing required fields, endpoint mismatches
493 /// against the adjacency SET all fail loud rather than silently
494 /// returning partial results.
495 ///
496 /// Gated on the `core` feature — edge reads are part of the
497 /// minimal engine surface a Postgres-style backend must honour.
498 ///
499 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
500 #[cfg(feature = "core")]
501 async fn list_edges(
502 &self,
503 _flow_id: &FlowId,
504 _direction: EdgeDirection,
505 ) -> Result<Vec<EdgeSnapshot>, EngineError> {
506 Err(EngineError::Unavailable { op: "list_edges" })
507 }
508
509 /// Snapshot a single dependency edge by its owning flow + edge id.
510 ///
511 /// `Ok(None)` when the edge hash is absent (never staged, or
512 /// staged under a different flow than `flow_id`). Parse failures
513 /// on a present edge hash surface as
514 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]
515 /// — the stored `flow_id` field is cross-checked against the
516 /// caller's expected `flow_id` so a wrong-key read fails loud
517 /// rather than returning an unrelated edge.
518 ///
519 /// Gated on the `core` feature — single-edge reads are part of
520 /// the minimal snapshot surface an alternate backend must honour
521 /// alongside [`Self::describe_execution`] / [`Self::describe_flow`]
522 /// / [`Self::list_edges`].
523 ///
524 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
525 #[cfg(feature = "core")]
526 async fn describe_edge(
527 &self,
528 _flow_id: &FlowId,
529 _edge_id: &EdgeId,
530 ) -> Result<Option<EdgeSnapshot>, EngineError> {
531 Err(EngineError::Unavailable {
532 op: "describe_edge",
533 })
534 }
535
536 /// Resolve an execution's owning flow id, if any.
537 ///
538 /// `Ok(None)` when the execution's core record is absent or has
539 /// no associated flow (standalone execution). A present-but-
540 /// malformed `flow_id` field surfaces as
541 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`].
542 ///
543 /// Gated on the `core` feature. Used by ff-sdk's
544 /// `list_outgoing_edges` / `list_incoming_edges` to pivot from a
545 /// consumer-supplied `ExecutionId` to the `FlowId` required by
546 /// [`Self::list_edges`]. A Valkey backend serves this with a
547 /// single `HGET exec_core flow_id`; a Postgres backend serves it
548 /// with the equivalent single-column row lookup.
549 ///
550 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
551 #[cfg(feature = "core")]
552 async fn resolve_execution_flow_id(
553 &self,
554 _eid: &ExecutionId,
555 ) -> Result<Option<FlowId>, EngineError> {
556 Err(EngineError::Unavailable {
557 op: "resolve_execution_flow_id",
558 })
559 }
560
561 /// List flows on a partition with cursor-based pagination (issue
562 /// #185).
563 ///
564 /// Returns a [`ListFlowsPage`] of [`FlowSummary`](crate::contracts::FlowSummary)
565 /// rows ordered by `flow_id` (UUID byte-lexicographic). `cursor`
566 /// is `None` for the first page; callers forward the returned
567 /// `next_cursor` verbatim to continue iteration, and the listing
568 /// is exhausted when `next_cursor` is `None`. `limit` is the
569 /// maximum number of rows to return on this page — implementations
570 /// MAY return fewer (end of partition) but MUST NOT exceed it.
571 ///
572 /// Ordering rationale: flow ids are UUIDs, and both Valkey
573 /// (sort after-the-fact) and Postgres (`ORDER BY flow_id`) can
574 /// agree on byte-lexicographic order — the same order
575 /// `FlowId::to_string()` produces for canonical hyphenated UUIDs.
576 /// Mapping to `cursor > flow_id` keeps the contract backend-
577 /// independent.
578 ///
579 /// # Postgres implementation pattern
580 ///
581 /// A Postgres-backed implementation serves this directly with
582 ///
583 /// ```sql
584 /// SELECT flow_id, created_at_ms, public_flow_state
585 /// FROM ff_flow
586 /// WHERE partition_key = $1
587 /// AND ($2::uuid IS NULL OR flow_id > $2)
588 /// ORDER BY flow_id
589 /// LIMIT $3 + 1;
590 /// ```
591 ///
592 /// — reading one extra row to decide whether `next_cursor` should
593 /// be set to the last row's `flow_id`. The Valkey implementation
594 /// maintains the `ff:idx:{fp:N}:flow_index` SET and performs the
595 /// sort + slice client-side (SMEMBERS then sort-by-UUID-bytes),
596 /// pipelining `HGETALL flow_core` for each row on the page.
597 ///
598 /// Gated on the `core` feature — flow listing is part of the
599 /// minimal engine surface a Postgres-style backend must honour.
600 #[cfg(feature = "core")]
601 async fn list_flows(
602 &self,
603 _partition: PartitionKey,
604 _cursor: Option<FlowId>,
605 _limit: usize,
606 ) -> Result<ListFlowsPage, EngineError> {
607 Err(EngineError::Unavailable { op: "list_flows" })
608 }
609
610 /// Enumerate registered lanes with cursor-based pagination.
611 ///
612 /// Lanes are global (not partition-scoped) — the backend serves
613 /// this from its lane registry and does NOT accept a
614 /// [`crate::partition::Partition`] argument. Results are sorted
615 /// by [`LaneId`] name so the ordering is stable across calls and
616 /// cursors address a deterministic position in the sort.
617 ///
618 /// * `cursor` — exclusive lower bound. `None` starts from the
619 /// first lane. To continue a walk, pass the previous page's
620 /// [`ListLanesPage::next_cursor`].
621 /// * `limit` — hard cap on the number of lanes returned in the
622 /// page. Backends MAY round this down when the registry size
623 /// is smaller; they MUST NOT return more than `limit`.
624 ///
625 /// [`ListLanesPage::next_cursor`] is `Some(last_lane_in_page)`
626 /// iff at least one more lane exists after the returned page,
627 /// and `None` on the final page. Callers loop until `next_cursor`
628 /// is `None` to read the full registry.
629 ///
630 /// Gated on the `core` feature — lane enumeration is part of the
631 /// minimal snapshot surface an alternate backend must honour
632 /// alongside [`Self::describe_flow`] / [`Self::list_edges`].
633 #[cfg(feature = "core")]
634 async fn list_lanes(
635 &self,
636 _cursor: Option<LaneId>,
637 _limit: usize,
638 ) -> Result<ListLanesPage, EngineError> {
639 Err(EngineError::Unavailable { op: "list_lanes" })
640 }
641
642 /// List suspended executions in one partition, cursor-paginated,
643 /// with each entry's suspension `reason_code` populated (issue
644 /// #183).
645 ///
646 /// Consumer-facing "what's blocked on what?" panels (ff-board's
647 /// suspended-executions view, operator CLIs) need the reason in
648 /// the list response so the UI does not round-trip per row to
649 /// `describe_execution` for a field it knows it needs. `reason`
650 /// on [`SuspendedExecutionEntry`] carries the free-form
651 /// `suspension:current.reason_code` field — see the type rustdoc
652 /// for the String-not-enum rationale.
653 ///
654 /// `cursor` is opaque to callers; pass `None` to start a fresh
655 /// scan and feed the returned [`ListSuspendedPage::next_cursor`]
656 /// back in on subsequent pages until it comes back `None`.
657 /// `limit` bounds the `entries` count; backends MAY return fewer
658 /// when the partition is exhausted.
659 ///
660 /// Ordering is by ascending `suspended_at_ms` (the per-lane
661 /// suspended ZSET score == `timeout_at` or the no-timeout
662 /// sentinel) with execution id as a lex tiebreak, so cursor
663 /// continuation is deterministic across calls.
664 ///
665 /// Gated on the `core` feature — suspended-list enumeration is
666 /// part of the minimal engine surface a Postgres-style backend
667 /// must honour.
668 #[cfg(feature = "core")]
669 async fn list_suspended(
670 &self,
671 _partition: PartitionKey,
672 _cursor: Option<ExecutionId>,
673 _limit: usize,
674 ) -> Result<ListSuspendedPage, EngineError> {
675 Err(EngineError::Unavailable {
676 op: "list_suspended",
677 })
678 }
679
680 /// Forward-only paginated listing of the executions indexed under
681 /// one partition.
682 ///
683 /// Reads the partition-wide `ff:idx:{p:N}:all_executions` set,
684 /// sorts lexicographically on `ExecutionId`, and returns the page
685 /// of ids strictly greater than `cursor` (or starting from the
686 /// smallest id when `cursor = None`). The returned
687 /// [`ListExecutionsPage::next_cursor`] is the last id on the page
688 /// iff at least one more id exists past it; `None` signals
689 /// end-of-stream.
690 ///
691 /// `limit` is the maximum number of ids returned on this page. A
692 /// `limit` of `0` returns an empty page with `next_cursor = None`.
693 /// Backends MAY cap `limit` internally (Valkey: 1000) and return
694 /// fewer ids than requested; callers continue paginating until
695 /// `next_cursor == None`.
696 ///
697 /// Ordering is stable under concurrent inserts for already-emitted
698 /// ids (an id less-than-or-equal-to the caller's cursor is never
699 /// re-emitted in later pages) but new inserts past the cursor WILL
700 /// appear in subsequent pages — consistent with forward-only
701 /// cursor semantics.
702 ///
703 /// Gated on the `core` feature — partition-scoped listing is part
704 /// of the minimal engine surface every backend must honour.
705 #[cfg(feature = "core")]
706 async fn list_executions(
707 &self,
708 _partition: PartitionKey,
709 _cursor: Option<ExecutionId>,
710 _limit: usize,
711 ) -> Result<ListExecutionsPage, EngineError> {
712 Err(EngineError::Unavailable {
713 op: "list_executions",
714 })
715 }
716
717 // ── Trigger ops (issue #150) ──
718
719 /// Deliver an external signal to a suspended execution's waitpoint.
720 ///
721 /// The backend atomically records the signal, evaluates the resume
722 /// condition, and — when satisfied — transitions the execution
723 /// from `suspended` to `runnable` (or buffers the signal when the
724 /// waitpoint is still `pending`). Duplicate delivery — same
725 /// `idempotency_key` + waitpoint — surfaces as
726 /// [`DeliverSignalResult::Duplicate`] with the pre-existing
727 /// `signal_id` rather than mutating state twice.
728 ///
729 /// Input validation (HMAC token presence, payload size limits,
730 /// signal-name shape) is the backend's responsibility; callers
731 /// pass a fully populated [`DeliverSignalArgs`] and receive typed
732 /// outcomes or typed errors (`ScriptError::invalid_token`,
733 /// `ScriptError::token_expired`, `ScriptError::ExecutionNotFound`
734 /// surfaced via [`EngineError::Transport`] on the Valkey backend).
735 ///
736 /// Gated on the `core` feature — signal delivery is part of the
737 /// minimal trigger surface every backend must honour so ff-server
738 /// / REST handlers can dispatch against `Arc<dyn EngineBackend>`
739 /// without knowing which backend is running underneath.
740 #[cfg(feature = "core")]
741 async fn deliver_signal(
742 &self,
743 _args: DeliverSignalArgs,
744 ) -> Result<DeliverSignalResult, EngineError> {
745 Err(EngineError::Unavailable {
746 op: "deliver_signal",
747 })
748 }
749
750 /// Claim a resumed execution — a previously-suspended attempt that
751 /// has cleared its resume condition (e.g. via
752 /// [`Self::deliver_signal`]) and now needs a worker to pick up the
753 /// same attempt index.
754 ///
755 /// Distinct from [`Self::claim`] (fresh work) and
756 /// [`Self::claim_from_resume_grant`] (grant-based ownership transfer
757 /// after a crash): the resumed-claim path re-binds an existing
758 /// attempt rather than minting a new one. The backend issues a
759 /// fresh `lease_id` + bumps the `lease_epoch`, preserving
760 /// `attempt_id` / `attempt_index` so stream frames and progress
761 /// updates continue on the same attempt.
762 ///
763 /// Typed failures surface via `ScriptError` → `EngineError`:
764 /// `NotAResumedExecution` when the attempt state is not
765 /// `attempt_interrupted`, `ExecutionNotLeaseable` when the
766 /// lifecycle phase is not `runnable`, and `InvalidClaimGrant`
767 /// when the grant key is missing or was already consumed.
768 ///
769 /// Gated on the `core` feature — resumed-claim is part of the
770 /// minimal trigger surface every backend must honour.
771 #[cfg(feature = "core")]
772 async fn claim_resumed_execution(
773 &self,
774 _args: ClaimResumedExecutionArgs,
775 ) -> Result<ClaimResumedExecutionResult, EngineError> {
776 Err(EngineError::Unavailable {
777 op: "claim_resumed_execution",
778 })
779 }
780
781 /// Scan a lane's eligible ZSET on one partition for
782 /// highest-priority executions awaiting a worker (v0.12 PR-5).
783 ///
784 /// Lifted from the SDK-side `ZRANGEBYSCORE` inline on
785 /// `FlowFabricWorker::claim_next` — the scheduler-bypass scanner
786 /// gated behind `direct-valkey-claim`. The trait method itself is
787 /// backend-agnostic; consumers that drive the scanner loop
788 /// (bench harnesses, single-tenant dev) compose it with
789 /// [`Self::issue_claim_grant`] + [`Self::claim_execution`] to
790 /// replicate the pre-PR-5 `claim_next` body.
791 ///
792 /// # Backend coverage
793 ///
794 /// * **Valkey** — `ZRANGEBYSCORE eligible_zset -inf +inf LIMIT 0 <limit>`
795 /// on the lane's partition-scoped eligible key. Single
796 /// command; no script round-trip. Wire shape is byte-for-byte
797 /// identical to the pre-PR SDK inline call so bench traces
798 /// match pre-PR without new `#[tracing::instrument]` span names.
799 /// * **Postgres / SQLite** — use the `Err(Unavailable)` default.
800 /// PG/SQLite consumers drive work through the scheduler-routed
801 /// [`Self::claim_for_worker`] path instead of the scanner
802 /// primitives exposed here; lifting the scheduler itself onto
803 /// the trait is RFC-024 follow-up scope. See
804 /// `project_claim_from_grant_pg_sqlite_gap.md` for motivation.
805 ///
806 /// Default impl returns [`EngineError::Unavailable`] so the trait
807 /// addition is non-breaking for out-of-tree backends. Same
808 /// precedent as [`Self::claim_execution`] landing in v0.12 PR-4.
809 #[cfg(feature = "core")]
810 async fn scan_eligible_executions(
811 &self,
812 _args: ScanEligibleArgs,
813 ) -> Result<Vec<ExecutionId>, EngineError> {
814 Err(EngineError::Unavailable {
815 op: "scan_eligible_executions",
816 })
817 }
818
819 /// Issue a claim grant — the scheduler's admission write — for a
820 /// single execution on a single lane (v0.12 PR-5).
821 ///
822 /// Lifted from the SDK-side `ff_issue_claim_grant` inline helper
823 /// on `FlowFabricWorker::claim_next`. The backend atomically
824 /// writes the grant hash, appends to the per-worker grant index,
825 /// and removes the execution from the lane's eligible ZSET.
826 ///
827 /// Typed rejects surface via [`EngineError::Validation`]:
828 /// `CapabilityMismatch` when the worker's capabilities do not
829 /// cover the execution's `required_capabilities`, `InvalidInput`
830 /// for malformed args. Transport faults surface via
831 /// [`EngineError::Transport`].
832 ///
833 /// # Backend coverage
834 ///
835 /// * **Valkey** — one `ff_issue_claim_grant` FCALL. KEYS/ARGV
836 /// shape is byte-for-byte identical to the pre-PR SDK inline
837 /// call; bench traces match pre-PR.
838 /// * **Postgres / SQLite** — `Err(Unavailable)` default; use
839 /// [`Self::claim_for_worker`] instead. See
840 /// [`Self::scan_eligible_executions`] for the cross-link
841 /// rationale.
842 #[cfg(feature = "core")]
843 async fn issue_claim_grant(
844 &self,
845 _args: IssueClaimGrantArgs,
846 ) -> Result<IssueClaimGrantOutcome, EngineError> {
847 Err(EngineError::Unavailable {
848 op: "issue_claim_grant",
849 })
850 }
851
852 /// Move an execution from a lane's eligible ZSET into its
853 /// blocked_route ZSET (v0.12 PR-5).
854 ///
855 /// Lifted from the SDK-side `ff_block_execution_for_admission`
856 /// inline helper on `FlowFabricWorker::claim_next`. Called after
857 /// a [`Self::issue_claim_grant`] `CapabilityMismatch` reject —
858 /// without a block step, the inline scanner would re-pick the
859 /// same top-of-ZSET every tick (parity with
860 /// `ff-scheduler::Scheduler::block_candidate`).
861 ///
862 /// The engine's unblock scanner periodically promotes
863 /// blocked_route back to eligible once a worker with matching
864 /// caps registers.
865 ///
866 /// # Backend coverage
867 ///
868 /// * **Valkey** — one `ff_block_execution_for_admission` FCALL.
869 /// * **Postgres / SQLite** — `Err(Unavailable)` default; the
870 /// scheduler-routed [`Self::claim_for_worker`] path handles
871 /// admission rejects server-side.
872 #[cfg(feature = "core")]
873 async fn block_route(
874 &self,
875 _args: BlockRouteArgs,
876 ) -> Result<BlockRouteOutcome, EngineError> {
877 Err(EngineError::Unavailable { op: "block_route" })
878 }
879
880 /// Consume a scheduler-issued claim grant to mint a fresh attempt.
881 ///
882 /// The SDK's grant-consumer path — paired with `FlowFabricWorker::claim_from_grant`
883 /// in `ff-sdk` — routes through this method. The scheduler has
884 /// already validated budget / quota / capabilities and written a
885 /// grant (Valkey `claim_grant` hash); this call atomically
886 /// consumes that grant and creates the attempt row, mints
887 /// `lease_id` + `lease_epoch`, and returns a
888 /// [`ClaimExecutionResult::Claimed`] carrying the minted lease
889 /// triple.
890 ///
891 /// Distinct from [`Self::claim`] (the scheduler-bypass scanner
892 /// used by the `direct-valkey-claim` feature) — this method
893 /// assumes the grant already exists and skips capability / ZSET
894 /// scanning. The Valkey impl fires exactly one `ff_claim_execution`
895 /// FCALL.
896 ///
897 /// Typed failures surface via `ScriptError` → `EngineError`:
898 /// `UseClaimResumedExecution` when the attempt is actually
899 /// `attempt_interrupted` (caller should retry via
900 /// [`Self::claim_resumed_execution`] — see `ContentionKind` at
901 /// `ff_core::engine_error`), `InvalidClaimGrant` when the grant is
902 /// missing / consumed / worker-mismatched, `CapabilityMismatch`
903 /// when the execution's `required_capabilities` drifted after
904 /// grant issuance.
905 ///
906 /// # Backend coverage
907 ///
908 /// * **Valkey** — implemented in `ff-backend-valkey` (one
909 /// `ff_claim_execution` FCALL).
910 /// * **Postgres / SQLite** — use the `Err(Unavailable)` default in
911 /// this PR. Grants on PG / SQLite today flow through
912 /// `PostgresScheduler::claim_for_worker` (a sibling struct, not
913 /// an `EngineBackend` method); wiring the default-over-trait
914 /// behaviour into a PG / SQLite `claim_execution` impl lands
915 /// with a future RFC-024 grant-consumer extension.
916 ///
917 /// The default impl returns [`EngineError::Unavailable`] so the
918 /// trait addition is non-breaking for out-of-tree backends. Same
919 /// precedent as [`Self::read_current_attempt_index`] landing in
920 /// v0.12 PR-3.
921 #[cfg(feature = "core")]
922 async fn claim_execution(
923 &self,
924 _args: ClaimExecutionArgs,
925 ) -> Result<ClaimExecutionResult, EngineError> {
926 Err(EngineError::Unavailable {
927 op: "claim_execution",
928 })
929 }
930
931 /// Operator-initiated cancellation of a flow and (optionally) its
932 /// member executions. See RFC-012 §3.1.1 for the policy /wait
933 /// matrix.
934 async fn cancel_flow(
935 &self,
936 id: &FlowId,
937 policy: CancelFlowPolicy,
938 wait: CancelFlowWait,
939 ) -> Result<CancelFlowResult, EngineError>;
940
941 /// RFC-016 Stage A: set the inbound-edge-group policy for a
942 /// downstream execution. Must be called before the first
943 /// `add_dependency(... -> downstream_execution_id)` — the backend
944 /// rejects with [`EngineError::Conflict`] if edges have already
945 /// been staged for this group.
946 ///
947 /// Stage A honours only
948 /// [`EdgeDependencyPolicy::AllOf`](crate::contracts::EdgeDependencyPolicy::AllOf);
949 /// the `AnyOf` / `Quorum` variants return
950 /// [`EngineError::Validation`] with
951 /// `detail = "stage A supports AllOf only; AnyOf/Quorum land in stage B"`
952 /// until Stage B's resolver lands.
953 #[cfg(feature = "core")]
954 async fn set_edge_group_policy(
955 &self,
956 _flow_id: &FlowId,
957 _downstream_execution_id: &ExecutionId,
958 _policy: crate::contracts::EdgeDependencyPolicy,
959 ) -> Result<crate::contracts::SetEdgeGroupPolicyResult, EngineError> {
960 Err(EngineError::Unavailable {
961 op: "set_edge_group_policy",
962 })
963 }
964
965 // ── HMAC secret rotation (v0.7 migration-master Q4) ──
966
967 /// Rotate the waitpoint HMAC signing kid **cluster-wide**.
968 ///
969 /// **v0.7 migration-master Q4 (adjudicated 2026-04-24).**
970 /// Additive trait surface so Valkey and Postgres backends can
971 /// both expose the "rotate everywhere" semantic under one name.
972 ///
973 /// * Valkey impl fans out an `ff_rotate_waitpoint_hmac_secret`
974 /// FCALL per execution partition. `entries.len() == num_flow_partitions`
975 /// and per-partition failures are surfaced as inner `Err`
976 /// entries — the call as a whole does not fail when one
977 /// partition's FCALL fails, matching
978 /// [`ff_sdk::admin::rotate_waitpoint_hmac_secret_all_partitions`]'s
979 /// partial-success contract.
980 /// * Postgres impl (Wave 4) writes one row to
981 /// `ff_waitpoint_hmac(kid, secret, rotated_at)` and returns a
982 /// single-entry vec with `partition = 0`.
983 ///
984 /// The default impl returns
985 /// [`EngineError::Unavailable`] with
986 /// `op = "rotate_waitpoint_hmac_secret_all"` so backends that
987 /// haven't implemented the method surface the miss loudly rather
988 /// than silently no-op'ing. Both concrete backends override.
989 async fn rotate_waitpoint_hmac_secret_all(
990 &self,
991 _args: RotateWaitpointHmacSecretAllArgs,
992 ) -> Result<RotateWaitpointHmacSecretAllResult, EngineError> {
993 Err(EngineError::Unavailable {
994 op: "rotate_waitpoint_hmac_secret_all",
995 })
996 }
997
998 /// Seed the initial waitpoint HMAC secret for a fresh deployment
999 /// (issue #280).
1000 ///
1001 /// **Idempotent.** If a `current_kid` (Valkey per-partition) or
1002 /// an active kid row (Postgres) already exists with the given
1003 /// `kid`, the method returns
1004 /// [`SeedOutcome::AlreadySeeded`] without overwriting, reporting
1005 /// whether the stored secret matches the caller-supplied one via
1006 /// `same_secret`. Callers (cairn boot, operator tooling) invoke
1007 /// this on every boot and let the backend decide whether to
1008 /// install — removing the client-side "check then HSET" race that
1009 /// cairn's raw-HSET boot path silently tolerated.
1010 ///
1011 /// For rotation of an already-seeded secret, use
1012 /// [`Self::rotate_waitpoint_hmac_secret_all`] instead; seed is
1013 /// install-only.
1014 ///
1015 /// The default impl returns [`EngineError::Unavailable`] with
1016 /// `op = "seed_waitpoint_hmac_secret"` so backends that haven't
1017 /// implemented the method surface the miss loudly.
1018 async fn seed_waitpoint_hmac_secret(
1019 &self,
1020 _args: SeedWaitpointHmacSecretArgs,
1021 ) -> Result<SeedOutcome, EngineError> {
1022 Err(EngineError::Unavailable {
1023 op: "seed_waitpoint_hmac_secret",
1024 })
1025 }
1026
1027 // ── Budget ──
1028
1029 /// Report usage against a budget and check limits. Returns the
1030 /// typed [`ReportUsageResult`] variant; backends enforce
1031 /// idempotency via the caller-supplied
1032 /// [`UsageDimensions::dedup_key`] (RFC-012 §R7.2.3 — replaces
1033 /// the pre-Round-7 `AdmissionDecision` return).
1034 async fn report_usage(
1035 &self,
1036 handle: &Handle,
1037 budget: &BudgetId,
1038 dimensions: crate::backend::UsageDimensions,
1039 ) -> Result<ReportUsageResult, EngineError>;
1040
1041 // ── Stream reads (RFC-012 Stage 1c tranche-4; issue #87) ──
1042
1043 /// Read frames from a completed or in-flight attempt's stream.
1044 ///
1045 /// `from` / `to` are [`StreamCursor`] values — `StreamCursor::Start`
1046 /// / `StreamCursor::End` are equivalent to XRANGE `-` / `+`, and
1047 /// `StreamCursor::At("<id>")` reads from a concrete entry id.
1048 ///
1049 /// Input validation (count_limit bounds, cursor shape) is the
1050 /// caller's responsibility — SDK-side wrappers in
1051 /// [`ff-sdk`](https://docs.rs/ff-sdk) enforce bounds before
1052 /// forwarding. Backends MAY additionally reject out-of-range
1053 /// input via [`EngineError::Validation`].
1054 ///
1055 /// Gated on the `streaming` feature — stream reads are part of
1056 /// the stream-subset surface a backend without XREAD-like
1057 /// primitives may omit.
1058 #[cfg(feature = "streaming")]
1059 async fn read_stream(
1060 &self,
1061 _execution_id: &ExecutionId,
1062 _attempt_index: AttemptIndex,
1063 _from: StreamCursor,
1064 _to: StreamCursor,
1065 _count_limit: u64,
1066 ) -> Result<StreamFrames, EngineError> {
1067 Err(EngineError::Unavailable { op: "read_stream" })
1068 }
1069
1070 /// Tail a live attempt's stream.
1071 ///
1072 /// `after` is an exclusive [`StreamCursor`] — entries with id
1073 /// strictly greater than `after` are returned. `StreamCursor::Start`
1074 /// / `StreamCursor::End` are NOT accepted here; callers MUST pass
1075 /// a concrete id (or `StreamCursor::from_beginning()`). The SDK
1076 /// wrapper rejects the open markers before reaching the backend.
1077 ///
1078 /// `block_ms == 0` → non-blocking peek. `block_ms > 0` → blocks up
1079 /// to that many ms for a new entry.
1080 ///
1081 /// `visibility` (RFC-015 §6.1) filters the returned entries by
1082 /// their stored [`StreamMode`](crate::backend::StreamMode)
1083 /// `mode` field. Default
1084 /// [`TailVisibility::All`](crate::backend::TailVisibility::All)
1085 /// preserves v1 behaviour.
1086 ///
1087 /// Gated on the `streaming` feature — see [`read_stream`](Self::read_stream).
1088 #[cfg(feature = "streaming")]
1089 async fn tail_stream(
1090 &self,
1091 _execution_id: &ExecutionId,
1092 _attempt_index: AttemptIndex,
1093 _after: StreamCursor,
1094 _block_ms: u64,
1095 _count_limit: u64,
1096 _visibility: TailVisibility,
1097 ) -> Result<StreamFrames, EngineError> {
1098 Err(EngineError::Unavailable { op: "tail_stream" })
1099 }
1100
1101 /// Read the rolling summary document for an attempt (RFC-015 §6.3).
1102 ///
1103 /// Returns `Ok(None)` when no [`StreamMode::DurableSummary`](crate::backend::StreamMode::DurableSummary)
1104 /// frame has ever been appended for the attempt. Non-blocking Hash
1105 /// read; safe to call from any consumer without holding the lease.
1106 ///
1107 /// Gated on the `streaming` feature — summary reads are part of
1108 /// the stream-subset surface.
1109 #[cfg(feature = "streaming")]
1110 async fn read_summary(
1111 &self,
1112 _execution_id: &ExecutionId,
1113 _attempt_index: AttemptIndex,
1114 ) -> Result<Option<SummaryDocument>, EngineError> {
1115 Err(EngineError::Unavailable {
1116 op: "read_summary",
1117 })
1118 }
1119
1120 // ── RFC-017 Stage A — Ingress (5) ──────────────────────────
1121 //
1122 // Every method in this block has a default impl returning
1123 // `EngineError::Unavailable { op }` per RFC-017 §5.3. Concrete
1124 // backends override each method with a real body. A missing
1125 // override surfaces as a loud typed error at the call site rather
1126 // than a silent no-op.
1127
1128 /// Create an execution. Ingress row 6 (RFC-017 §4). Wraps
1129 /// `ff_create_execution` on Valkey; `INSERT INTO ff_execution ...`
1130 /// on Postgres. The `idempotency_key` + backend-side default
1131 /// `dedup_ttl_ms = 86400000` make duplicate submissions idempotent.
1132 #[cfg(feature = "core")]
1133 async fn create_execution(
1134 &self,
1135 _args: CreateExecutionArgs,
1136 ) -> Result<CreateExecutionResult, EngineError> {
1137 Err(EngineError::Unavailable {
1138 op: "create_execution",
1139 })
1140 }
1141
1142 /// Create a flow header. Ingress row 5.
1143 #[cfg(feature = "core")]
1144 async fn create_flow(
1145 &self,
1146 _args: CreateFlowArgs,
1147 ) -> Result<CreateFlowResult, EngineError> {
1148 Err(EngineError::Unavailable { op: "create_flow" })
1149 }
1150
1151 /// Atomically add an execution to a flow (single-FCALL co-located
1152 /// commit on Valkey; single-transaction UPSERT on Postgres).
1153 #[cfg(feature = "core")]
1154 async fn add_execution_to_flow(
1155 &self,
1156 _args: AddExecutionToFlowArgs,
1157 ) -> Result<AddExecutionToFlowResult, EngineError> {
1158 Err(EngineError::Unavailable {
1159 op: "add_execution_to_flow",
1160 })
1161 }
1162
1163 /// Stage a dependency edge between flow members. CAS-guarded on
1164 /// `graph_revision` — stale rev returns `Contention(StaleGraphRevision)`.
1165 #[cfg(feature = "core")]
1166 async fn stage_dependency_edge(
1167 &self,
1168 _args: StageDependencyEdgeArgs,
1169 ) -> Result<StageDependencyEdgeResult, EngineError> {
1170 Err(EngineError::Unavailable {
1171 op: "stage_dependency_edge",
1172 })
1173 }
1174
1175 /// Apply a staged dependency edge to its downstream child.
1176 #[cfg(feature = "core")]
1177 async fn apply_dependency_to_child(
1178 &self,
1179 _args: ApplyDependencyToChildArgs,
1180 ) -> Result<ApplyDependencyToChildResult, EngineError> {
1181 Err(EngineError::Unavailable {
1182 op: "apply_dependency_to_child",
1183 })
1184 }
1185
1186 // ── RFC-017 Stage A — Operator control (4) ─────────────────
1187
1188 /// Operator-initiated execution cancel (row 2).
1189 #[cfg(feature = "core")]
1190 async fn cancel_execution(
1191 &self,
1192 _args: CancelExecutionArgs,
1193 ) -> Result<CancelExecutionResult, EngineError> {
1194 Err(EngineError::Unavailable {
1195 op: "cancel_execution",
1196 })
1197 }
1198
1199 /// Re-score an execution's eligibility priority (row 17).
1200 #[cfg(feature = "core")]
1201 async fn change_priority(
1202 &self,
1203 _args: ChangePriorityArgs,
1204 ) -> Result<ChangePriorityResult, EngineError> {
1205 Err(EngineError::Unavailable {
1206 op: "change_priority",
1207 })
1208 }
1209
1210 /// Replay a terminal execution (row 22). Variadic KEYS handling
1211 /// (inbound-edge pre-read) is hidden inside the Valkey impl per
1212 /// RFC-017 §4 row 3.
1213 #[cfg(feature = "core")]
1214 async fn replay_execution(
1215 &self,
1216 _args: ReplayExecutionArgs,
1217 ) -> Result<ReplayExecutionResult, EngineError> {
1218 Err(EngineError::Unavailable {
1219 op: "replay_execution",
1220 })
1221 }
1222
1223 /// Operator-initiated lease revoke (row 19).
1224 #[cfg(feature = "core")]
1225 async fn revoke_lease(
1226 &self,
1227 _args: RevokeLeaseArgs,
1228 ) -> Result<RevokeLeaseResult, EngineError> {
1229 Err(EngineError::Unavailable { op: "revoke_lease" })
1230 }
1231
1232 // ── RFC-017 Stage A — Budget + quota admin (5) ─────────────
1233
1234 /// Create a budget definition (row 6).
1235 #[cfg(feature = "core")]
1236 async fn create_budget(
1237 &self,
1238 _args: CreateBudgetArgs,
1239 ) -> Result<CreateBudgetResult, EngineError> {
1240 Err(EngineError::Unavailable {
1241 op: "create_budget",
1242 })
1243 }
1244
1245 /// Reset a budget's usage counters (row 10).
1246 #[cfg(feature = "core")]
1247 async fn reset_budget(
1248 &self,
1249 _args: ResetBudgetArgs,
1250 ) -> Result<ResetBudgetResult, EngineError> {
1251 Err(EngineError::Unavailable { op: "reset_budget" })
1252 }
1253
1254 /// Create a quota policy (row 7).
1255 #[cfg(feature = "core")]
1256 async fn create_quota_policy(
1257 &self,
1258 _args: CreateQuotaPolicyArgs,
1259 ) -> Result<CreateQuotaPolicyResult, EngineError> {
1260 Err(EngineError::Unavailable {
1261 op: "create_quota_policy",
1262 })
1263 }
1264
1265 /// Read-only budget status for operator visibility (row 8).
1266 #[cfg(feature = "core")]
1267 async fn get_budget_status(
1268 &self,
1269 _id: &BudgetId,
1270 ) -> Result<BudgetStatus, EngineError> {
1271 Err(EngineError::Unavailable {
1272 op: "get_budget_status",
1273 })
1274 }
1275
1276 /// Admin-path `report_usage` (row 9 + RFC-017 §5 round-1 F4).
1277 /// Distinct from the existing [`Self::report_usage`] which takes
1278 /// a worker handle — the admin path has no lease context.
1279 #[cfg(feature = "core")]
1280 async fn report_usage_admin(
1281 &self,
1282 _budget: &BudgetId,
1283 _args: ReportUsageAdminArgs,
1284 ) -> Result<ReportUsageResult, EngineError> {
1285 Err(EngineError::Unavailable {
1286 op: "report_usage_admin",
1287 })
1288 }
1289
1290 // ── RFC-017 Stage A — Read + diagnostics (3) ───────────────
1291
1292 /// Fetch the stored result payload for a completed execution
1293 /// (row 4). Returns `Ok(None)` when the execution is missing, not
1294 /// yet complete, or its payload was trimmed by retention policy.
1295 async fn get_execution_result(
1296 &self,
1297 _id: &ExecutionId,
1298 ) -> Result<Option<Vec<u8>>, EngineError> {
1299 Err(EngineError::Unavailable {
1300 op: "get_execution_result",
1301 })
1302 }
1303
1304 /// List the pending-or-active waitpoints for an execution, cursor
1305 /// paginated (row 5 / §8). Stage A preserves the existing
1306 /// `PendingWaitpointInfo` shape; Stage D ships the §8 HMAC
1307 /// sanitisation + `(token_kid, token_fingerprint)` schema.
1308 #[cfg(feature = "core")]
1309 async fn list_pending_waitpoints(
1310 &self,
1311 _args: ListPendingWaitpointsArgs,
1312 ) -> Result<ListPendingWaitpointsResult, EngineError> {
1313 Err(EngineError::Unavailable {
1314 op: "list_pending_waitpoints",
1315 })
1316 }
1317
1318 /// Backend-level reachability probe (row 1). Valkey: `PING`;
1319 /// Postgres: `SELECT 1`.
1320 async fn ping(&self) -> Result<(), EngineError> {
1321 Err(EngineError::Unavailable { op: "ping" })
1322 }
1323
1324 // ── RFC-017 Stage A — Scheduling (1) ───────────────────────
1325
1326 /// Scheduler-routed claim entrypoint (row 18, RFC-017 §7). Valkey
1327 /// forwards to its `ff_scheduler::Scheduler` cursor; Postgres
1328 /// forwards to `PostgresScheduler`'s `FOR UPDATE SKIP LOCKED`
1329 /// path.
1330 ///
1331 /// Backends that carry an embedded scheduler (e.g. `ValkeyBackend`
1332 /// constructed via `with_embedded_scheduler`, or `PostgresBackend`
1333 /// with its `with_scanners` sibling) route the claim through it.
1334 /// Backends without a wired scheduler return
1335 /// [`EngineError::Unavailable`]. HTTP consumers use
1336 /// `FlowFabricWorker::claim_via_server` instead.
1337 #[cfg(feature = "core")]
1338 async fn claim_for_worker(
1339 &self,
1340 _args: ClaimForWorkerArgs,
1341 ) -> Result<ClaimForWorkerOutcome, EngineError> {
1342 Err(EngineError::Unavailable {
1343 op: "claim_for_worker",
1344 })
1345 }
1346
1347 // ── Cross-cutting (RFC-017 Stage B trait-lift) ──────────────
1348
1349 /// Static observability label identifying the backend family in
1350 /// logs + metrics (RFC-017 §5.4 + §9 Stage B). Default impl
1351 /// returns `"unknown"` so legacy `impl EngineBackend` blocks that
1352 /// have not upgraded keep compiling; every in-tree backend
1353 /// overrides — `ValkeyBackend` → `"valkey"`, `PostgresBackend` →
1354 /// `"postgres"`.
1355 fn backend_label(&self) -> &'static str {
1356 "unknown"
1357 }
1358
1359 /// Backend downcast escape hatch (v0.12 PR-7a transitional).
1360 ///
1361 /// Scanner supervisors in `ff-engine` still dispatch through a
1362 /// concrete `ferriskey::Client`; to keep the engine's public
1363 /// boundary backend-agnostic (`Arc<dyn EngineBackend>`) while the
1364 /// scanner internals remain Valkey-shaped, the engine downcasts
1365 /// via this method and reaches in for the embedded client. Every
1366 /// backend that wants to be consumed by `Engine::start_with_completions`
1367 /// overrides this to return `self` as `&dyn Any`; the default
1368 /// returns a placeholder so a stray `downcast_ref` fails cleanly
1369 /// rather than risking unsound behaviour.
1370 ///
1371 /// v0.13 (PR-7b) will trait-ify individual scanners onto
1372 /// `EngineBackend` and retire `ff-engine`'s dependence on this
1373 /// downcast path. The method itself will remain on the trait
1374 /// (likely deprecated) rather than be removed — removing a
1375 /// public trait method is a breaking change for external
1376 /// `impl EngineBackend` blocks.
1377 fn as_any(&self) -> &(dyn std::any::Any + 'static) {
1378 // Placeholder so the default does not expose `Self` for
1379 // downcast. Backends override to return `self`.
1380 &()
1381 }
1382
1383 /// RFC-018 Stage A: snapshot of this backend's identity + the
1384 /// flat `Supports` surface it can actually service. Consumers use
1385 /// this at startup to gate UI features / choose between alternative
1386 /// code paths before dispatching. See
1387 /// `rfcs/RFC-018-backend-capability-discovery.md` for the full
1388 /// discovery contract and the four owner-adjudicated open
1389 /// questions (granularity: coarse; version: struct; sync; no
1390 /// event stream).
1391 ///
1392 /// Default: returns a value tagged `family = "unknown"` with every
1393 /// `supports.*` bool `false`, so pre-RFC-018 out-of-tree backends
1394 /// keep compiling and consumers treat "all false" as "dispatch
1395 /// and catch [`EngineError::Unavailable`]" (pre-RFC-018 behaviour).
1396 /// Concrete in-tree backends (`ValkeyBackend`, `PostgresBackend`)
1397 /// override to populate a real value.
1398 ///
1399 /// Sync (no `.await`): backend-static info should not require a
1400 /// probe on every query. Dynamic probes happen once at
1401 /// `connect*` time and cache the result.
1402 fn capabilities(&self) -> crate::capability::Capabilities {
1403 crate::capability::Capabilities::new(
1404 crate::capability::BackendIdentity::new(
1405 "unknown",
1406 crate::capability::Version::new(0, 0, 0),
1407 "unknown",
1408 ),
1409 crate::capability::Supports::none(),
1410 )
1411 }
1412
1413 /// Issue #281: run one-time backend-specific boot preparation.
1414 ///
1415 /// Intended to run ONCE per deployment startup — NOT per request.
1416 /// Idempotent and safe for consumers to call on every application
1417 /// boot; backends that have nothing to do return
1418 /// [`PrepareOutcome::NoOp`] without side effects.
1419 ///
1420 /// Per-backend behaviour:
1421 ///
1422 /// * **Valkey** — issues `FUNCTION LOAD REPLACE` for the
1423 /// `flowfabric` Lua library (with bounded retry on transient
1424 /// transport faults; permanent compile errors surface as
1425 /// [`EngineError::Transport`] without retry). Returns
1426 /// [`PrepareOutcome::Applied`] carrying
1427 /// `"FUNCTION LOAD (flowfabric lib v<N>)"`.
1428 /// * **Postgres** — returns [`PrepareOutcome::NoOp`]. Schema
1429 /// migrations are applied out-of-band per
1430 /// `rfcs/drafts/v0.7-migration-master.md §Q12`; the backend
1431 /// runs a schema-version check at connect time and refuses to
1432 /// start on mismatch, so no boot-side prepare work remains.
1433 /// * **Default impl** — returns [`PrepareOutcome::NoOp`] so
1434 /// out-of-tree backends without preparation work compile
1435 /// without boilerplate.
1436 ///
1437 /// # Relationship to the in-tree boot path
1438 ///
1439 /// `ValkeyBackend::initialize_deployment` (called from
1440 /// `Server::start_with_metrics`) already invokes
1441 /// [`ensure_library`](ff_script::loader::ensure_library) inline as
1442 /// its step 4; that path is unchanged. `prepare()` exists as a
1443 /// **trait-surface entry point** so consumers that construct an
1444 /// `Arc<dyn EngineBackend>` outside of `Server` (e.g.
1445 /// cairn-fabric's boot path at `cairn-fabric/src/boot.rs`) can
1446 /// run the same preparation without reaching into
1447 /// backend-specific modules. The overlap is intentional: calling
1448 /// both `prepare()` and `initialize_deployment` is safe because
1449 /// `FUNCTION LOAD REPLACE` is idempotent under the version
1450 /// check.
1451 ///
1452 /// # Layer forwarding
1453 ///
1454 /// Layer impls (`HookedBackend`, ff-sdk layers) do NOT forward
1455 /// `prepare` today — consistent with `backend_label` / `ping` /
1456 /// `shutdown_prepare`. Consumers that wrap a backend in layers
1457 /// MUST call `prepare()` on the raw backend before wrapping, or
1458 /// accept the default [`PrepareOutcome::NoOp`].
1459 async fn prepare(&self) -> Result<PrepareOutcome, EngineError> {
1460 Ok(PrepareOutcome::NoOp)
1461 }
1462
1463 /// Drain-before-shutdown hook (RFC-017 §5.4). The server calls
1464 /// this before draining its own background tasks so backend-
1465 /// scoped primitives (Valkey stream semaphore, Postgres sqlx
1466 /// pool, …) can close their gates and await in-flight work up to
1467 /// `grace`.
1468 ///
1469 /// Default impl returns `Ok(())` — a no-op backend has nothing
1470 /// backend-scoped to drain. Concrete backends whose data plane
1471 /// owns resources (connection pools, semaphores, listeners)
1472 /// override with a real body.
1473 async fn shutdown_prepare(&self, _grace: Duration) -> Result<(), EngineError> {
1474 Ok(())
1475 }
1476
1477 // ── RFC-017 Stage E2 — `Server::client` removal (header + reads) ───
1478
1479 /// RFC-017 Stage E2: the "header" portion of `cancel_flow` — run the
1480 /// atomic flow-state flip (Valkey: `ff_cancel_flow` FCALL; Postgres:
1481 /// `cancel_flow_once` tx), decode policy + membership, and surface
1482 /// the `flow_already_terminal` idempotency branch as a first-class
1483 /// [`CancelFlowHeader::AlreadyTerminal`] so the Server can build
1484 /// the wire [`CancelFlowResult`] without reaching for a raw
1485 /// `Client`. Separate from the existing
1486 /// [`EngineBackend::cancel_flow`] entry point (which takes the
1487 /// enum-typed `(policy, wait)` split and returns the wait-collapsed
1488 /// `CancelFlowResult`) because the Server owns its own
1489 /// wait-dispatch + member-cancel machinery via
1490 /// [`EngineBackend::cancel_execution`] + backlog ack.
1491 ///
1492 /// Default impl returns [`EngineError::Unavailable`] so un-migrated
1493 /// backends surface the miss loudly.
1494 #[cfg(feature = "core")]
1495 async fn cancel_flow_header(
1496 &self,
1497 _args: CancelFlowArgs,
1498 ) -> Result<crate::contracts::CancelFlowHeader, EngineError> {
1499 Err(EngineError::Unavailable {
1500 op: "cancel_flow_header",
1501 })
1502 }
1503
1504 /// RFC-017 Stage E2: best-effort acknowledgement that one member of
1505 /// a `cancel_all` flow has completed its per-member cancel. Drains
1506 /// the member from the flow's `pending_cancels` set and, if empty,
1507 /// removes the flow from the partition-level `cancel_backlog`
1508 /// (Valkey: `ff_ack_cancel_member` FCALL; Postgres: table write —
1509 /// default `Unavailable` until Wave 9).
1510 ///
1511 /// Failures are swallowed by the caller — the cancel-backlog
1512 /// reconciler is the authoritative drain — but a typed error here
1513 /// lets the caller log a backend-scoped context string.
1514 #[cfg(feature = "core")]
1515 async fn ack_cancel_member(
1516 &self,
1517 _flow_id: &FlowId,
1518 _execution_id: &ExecutionId,
1519 ) -> Result<(), EngineError> {
1520 Err(EngineError::Unavailable {
1521 op: "ack_cancel_member",
1522 })
1523 }
1524
1525 /// RFC-017 Stage E2: full-shape execution read used by the
1526 /// `GET /v1/executions/{id}` HTTP route. Returns the legacy
1527 /// [`ExecutionInfo`] wire shape (not the decoupled
1528 /// [`ExecutionSnapshot`]) so the existing HTTP response bytes stay
1529 /// identical across the migration.
1530 ///
1531 /// `Ok(None)` ⇒ no such execution. Default `Unavailable` because
1532 /// the Valkey HGETALL-and-parse is backend-specific.
1533 #[cfg(feature = "core")]
1534 async fn read_execution_info(
1535 &self,
1536 _id: &ExecutionId,
1537 ) -> Result<Option<ExecutionInfo>, EngineError> {
1538 Err(EngineError::Unavailable {
1539 op: "read_execution_info",
1540 })
1541 }
1542
1543 /// RFC-017 Stage E2: narrow `public_state` read used by the
1544 /// `GET /v1/executions/{id}/state` HTTP route. Returns `Ok(None)`
1545 /// when the execution is missing. Default `Unavailable`.
1546 #[cfg(feature = "core")]
1547 async fn read_execution_state(
1548 &self,
1549 _id: &ExecutionId,
1550 ) -> Result<Option<PublicState>, EngineError> {
1551 Err(EngineError::Unavailable {
1552 op: "read_execution_state",
1553 })
1554 }
1555
1556 // ── RFC-019 Stage A/B/C — Stream-cursor subscriptions ─────────
1557 //
1558 // Four owner-adjudicated families (RFC-019 §Open Questions #5):
1559 // `lease_history`, `completion`, `signal_delivery`,
1560 // `instance_tags`. Stage C (this crate) promotes each family to
1561 // a typed event enum; consumers `match` on variants instead of
1562 // parsing a backend-shaped byte blob.
1563 //
1564 // Each method returns a family-specific subscription alias (see
1565 // [`crate::stream_events`]). All defaults return
1566 // `EngineError::Unavailable` per RFC-017 trait-growth conventions.
1567
1568 /// Subscribe to lease lifecycle events (acquired / renewed /
1569 /// expired / reclaimed / revoked) for the partition this backend
1570 /// is configured with.
1571 ///
1572 /// Cross-partition fan-out is consumer-side merge: subscribe
1573 /// per-partition backend instance and interleave on the read
1574 /// side. Yields
1575 /// `Err(EngineError::StreamDisconnected { cursor })` on backend
1576 /// disconnect; resume by calling this method again with the
1577 /// returned cursor.
1578 ///
1579 /// `filter` (#282): when `filter.instance_tag` is `Some((k, v))`,
1580 /// only events whose execution carries tag `k = v` are yielded
1581 /// (matching the [`crate::backend::ScannerFilter`] surface from
1582 /// #122). Pass `&ScannerFilter::default()` for unfiltered
1583 /// behaviour. Filtering happens inside the backend stream; the
1584 /// [`crate::stream_events::LeaseHistorySubscription`] return type
1585 /// is unchanged.
1586 async fn subscribe_lease_history(
1587 &self,
1588 _cursor: crate::stream_subscribe::StreamCursor,
1589 _filter: &crate::backend::ScannerFilter,
1590 ) -> Result<crate::stream_events::LeaseHistorySubscription, EngineError> {
1591 Err(EngineError::Unavailable {
1592 op: "subscribe_lease_history",
1593 })
1594 }
1595
1596 /// Subscribe to completion events (terminal state transitions).
1597 ///
1598 /// - **Postgres**: wraps the `ff_completion_event` outbox +
1599 /// LISTEN/NOTIFY machinery. Durable via event-id cursor.
1600 /// - **Valkey**: wraps the RESP3 `ff:dag:completions` pubsub
1601 /// subscriber. Pubsub is at-most-once over the live
1602 /// subscription window; the cursor is always the empty
1603 /// sentinel. If you need at-least-once replay with durable
1604 /// cursor resume, use the Postgres backend (see
1605 /// `docs/POSTGRES_PARITY_MATRIX.md` row `subscribe_completion`).
1606 ///
1607 /// `filter` (#282): see [`Self::subscribe_lease_history`]. Valkey
1608 /// reuses the `subscribe_completions_filtered` per-event HGET
1609 /// gate; Postgres filters inline against the outbox's denormalised
1610 /// `instance_tag` column.
1611 async fn subscribe_completion(
1612 &self,
1613 _cursor: crate::stream_subscribe::StreamCursor,
1614 _filter: &crate::backend::ScannerFilter,
1615 ) -> Result<crate::stream_events::CompletionSubscription, EngineError> {
1616 Err(EngineError::Unavailable {
1617 op: "subscribe_completion",
1618 })
1619 }
1620
1621 /// Subscribe to signal-delivery events (satisfied / buffered /
1622 /// deduped).
1623 ///
1624 /// `filter` (#282): see [`Self::subscribe_lease_history`].
1625 async fn subscribe_signal_delivery(
1626 &self,
1627 _cursor: crate::stream_subscribe::StreamCursor,
1628 _filter: &crate::backend::ScannerFilter,
1629 ) -> Result<crate::stream_events::SignalDeliverySubscription, EngineError> {
1630 Err(EngineError::Unavailable {
1631 op: "subscribe_signal_delivery",
1632 })
1633 }
1634
1635 /// Subscribe to instance-tag events (tag attached / cleared).
1636 ///
1637 /// Producer wiring is deferred per #311 audit ("no concrete
1638 /// demand"); the trait method exists for API uniformity across
1639 /// the four families. Backends currently return
1640 /// `EngineError::Unavailable`.
1641 async fn subscribe_instance_tags(
1642 &self,
1643 _cursor: crate::stream_subscribe::StreamCursor,
1644 ) -> Result<crate::stream_events::InstanceTagSubscription, EngineError> {
1645 Err(EngineError::Unavailable {
1646 op: "subscribe_instance_tags",
1647 })
1648 }
1649}
1650
1651/// Object-safety assertion: `dyn EngineBackend` compiles iff every
1652/// method is dyn-compatible. Kept as a compile-time guard so a future
1653/// trait change that accidentally breaks dyn-safety fails the build
1654/// at this site rather than at every downstream `Arc<dyn
1655/// EngineBackend>` use.
1656#[allow(dead_code)]
1657fn _assert_dyn_compatible(_: &dyn EngineBackend) {}
1658
1659/// Polling interval for [`wait_for_flow_cancellation`]. Tight enough
1660/// that a local single-node cancel cascade observes `cancelled` within
1661/// one or two polls; slack enough that a `WaitIndefinite` caller does
1662/// not hammer `describe_flow` on a live cluster.
1663const CANCEL_WAIT_POLL_INTERVAL: Duration = Duration::from_millis(100);
1664
1665/// Defensive ceiling for [`CancelFlowWait::WaitIndefinite`] — if the
1666/// reconciler cascade has not converged in five minutes, something is
1667/// wedged and returning `Timeout` is strictly more useful than blocking
1668/// forever. RFC-012 §3.1.1 expects real-world cascades to finish within
1669/// `reconciler_interval + grace`, which is orders of magnitude below
1670/// this.
1671const CANCEL_WAIT_INDEFINITE_CEILING: Duration = Duration::from_secs(300);
1672
1673/// Poll `backend.describe_flow(flow_id)` until `public_flow_state` is
1674/// `"cancelled"` or `deadline` elapses.
1675///
1676/// Shared by every backend's `cancel_flow` trait impl that honours
1677/// [`CancelFlowWait::WaitTimeout`] / [`CancelFlowWait::WaitIndefinite`].
1678/// The underlying `cancel_flow` FCALL / SQL transaction flips the
1679/// flow-level state synchronously; member cancellations dispatch
1680/// asynchronously via the reconciler, which also flips
1681/// `public_flow_state` to `cancelled` once the cascade completes. This
1682/// helper waits for that terminal flip.
1683///
1684/// Returns:
1685/// * `Ok(())` once `public_flow_state = "cancelled"` is observed.
1686/// * `Err(EngineError::Timeout { op: "cancel_flow", elapsed })` when
1687/// `deadline` elapses first. `elapsed` is the wait budget (the
1688/// requested timeout), not wall-clock precision.
1689/// * `Err(e)` if `describe_flow` itself errors (propagated).
1690pub async fn wait_for_flow_cancellation<B: EngineBackend + ?Sized>(
1691 backend: &B,
1692 flow_id: &crate::types::FlowId,
1693 deadline: Duration,
1694) -> Result<(), EngineError> {
1695 let start = std::time::Instant::now();
1696 loop {
1697 match backend.describe_flow(flow_id).await? {
1698 Some(snap) if snap.public_flow_state == "cancelled" => return Ok(()),
1699 // `None` (flow removed) is also terminal from the caller's
1700 // perspective — nothing left to wait on.
1701 None => return Ok(()),
1702 Some(_) => {}
1703 }
1704 if start.elapsed() >= deadline {
1705 return Err(EngineError::Timeout {
1706 op: "cancel_flow",
1707 elapsed: deadline,
1708 });
1709 }
1710 tokio::time::sleep(CANCEL_WAIT_POLL_INTERVAL).await;
1711 }
1712}
1713
1714/// Convert a [`CancelFlowWait`] into the deadline passed to
1715/// [`wait_for_flow_cancellation`]. `NoWait` returns `None` — the caller
1716/// must skip the wait entirely.
1717pub fn cancel_flow_wait_deadline(wait: CancelFlowWait) -> Option<Duration> {
1718 // `CancelFlowWait` is `#[non_exhaustive]`; this match lives in the
1719 // defining crate so the exhaustiveness check keeps the compiler
1720 // honest. Future variants must be wired here explicitly.
1721 match wait {
1722 CancelFlowWait::NoWait => None,
1723 CancelFlowWait::WaitTimeout(d) => Some(d),
1724 CancelFlowWait::WaitIndefinite => Some(CANCEL_WAIT_INDEFINITE_CEILING),
1725 }
1726}
1727
1728#[cfg(test)]
1729mod tests {
1730 use super::*;
1731
1732 /// A zero-state backend stub used to exercise the default
1733 /// `capabilities()` impl without pulling in a real
1734 /// transport. Only the default method is under test here; every
1735 /// other method is unreachable on this type.
1736 struct DefaultBackend;
1737
1738 #[async_trait]
1739 impl EngineBackend for DefaultBackend {
1740 async fn claim(
1741 &self,
1742 _lane: &LaneId,
1743 _capabilities: &CapabilitySet,
1744 _policy: ClaimPolicy,
1745 ) -> Result<Option<Handle>, EngineError> {
1746 unreachable!()
1747 }
1748 async fn renew(&self, _handle: &Handle) -> Result<LeaseRenewal, EngineError> {
1749 unreachable!()
1750 }
1751 async fn progress(
1752 &self,
1753 _handle: &Handle,
1754 _percent: Option<u8>,
1755 _message: Option<String>,
1756 ) -> Result<(), EngineError> {
1757 unreachable!()
1758 }
1759 async fn append_frame(
1760 &self,
1761 _handle: &Handle,
1762 _frame: Frame,
1763 ) -> Result<AppendFrameOutcome, EngineError> {
1764 unreachable!()
1765 }
1766 async fn complete(
1767 &self,
1768 _handle: &Handle,
1769 _payload: Option<Vec<u8>>,
1770 ) -> Result<(), EngineError> {
1771 unreachable!()
1772 }
1773 async fn fail(
1774 &self,
1775 _handle: &Handle,
1776 _reason: FailureReason,
1777 _classification: FailureClass,
1778 ) -> Result<FailOutcome, EngineError> {
1779 unreachable!()
1780 }
1781 async fn cancel(&self, _handle: &Handle, _reason: &str) -> Result<(), EngineError> {
1782 unreachable!()
1783 }
1784 async fn suspend(
1785 &self,
1786 _handle: &Handle,
1787 _args: SuspendArgs,
1788 ) -> Result<SuspendOutcome, EngineError> {
1789 unreachable!()
1790 }
1791 async fn create_waitpoint(
1792 &self,
1793 _handle: &Handle,
1794 _waitpoint_key: &str,
1795 _expires_in: Duration,
1796 ) -> Result<PendingWaitpoint, EngineError> {
1797 unreachable!()
1798 }
1799 async fn observe_signals(
1800 &self,
1801 _handle: &Handle,
1802 ) -> Result<Vec<ResumeSignal>, EngineError> {
1803 unreachable!()
1804 }
1805 async fn claim_from_resume_grant(
1806 &self,
1807 _token: ResumeToken,
1808 ) -> Result<Option<Handle>, EngineError> {
1809 unreachable!()
1810 }
1811 async fn delay(
1812 &self,
1813 _handle: &Handle,
1814 _delay_until: TimestampMs,
1815 ) -> Result<(), EngineError> {
1816 unreachable!()
1817 }
1818 async fn wait_children(&self, _handle: &Handle) -> Result<(), EngineError> {
1819 unreachable!()
1820 }
1821 async fn describe_execution(
1822 &self,
1823 _id: &ExecutionId,
1824 ) -> Result<Option<ExecutionSnapshot>, EngineError> {
1825 unreachable!()
1826 }
1827 async fn read_execution_context(
1828 &self,
1829 _execution_id: &ExecutionId,
1830 ) -> Result<ExecutionContext, EngineError> {
1831 Ok(ExecutionContext::new(
1832 Vec::new(),
1833 String::new(),
1834 std::collections::HashMap::new(),
1835 ))
1836 }
1837 async fn read_current_attempt_index(
1838 &self,
1839 _execution_id: &ExecutionId,
1840 ) -> Result<AttemptIndex, EngineError> {
1841 Ok(AttemptIndex::new(0))
1842 }
1843 async fn read_total_attempt_count(
1844 &self,
1845 _execution_id: &ExecutionId,
1846 ) -> Result<AttemptIndex, EngineError> {
1847 Ok(AttemptIndex::new(0))
1848 }
1849 async fn describe_flow(
1850 &self,
1851 _id: &FlowId,
1852 ) -> Result<Option<FlowSnapshot>, EngineError> {
1853 unreachable!()
1854 }
1855 #[cfg(feature = "core")]
1856 async fn list_edges(
1857 &self,
1858 _flow_id: &FlowId,
1859 _direction: EdgeDirection,
1860 ) -> Result<Vec<EdgeSnapshot>, EngineError> {
1861 unreachable!()
1862 }
1863 #[cfg(feature = "core")]
1864 async fn describe_edge(
1865 &self,
1866 _flow_id: &FlowId,
1867 _edge_id: &EdgeId,
1868 ) -> Result<Option<EdgeSnapshot>, EngineError> {
1869 unreachable!()
1870 }
1871 #[cfg(feature = "core")]
1872 async fn resolve_execution_flow_id(
1873 &self,
1874 _eid: &ExecutionId,
1875 ) -> Result<Option<FlowId>, EngineError> {
1876 unreachable!()
1877 }
1878 #[cfg(feature = "core")]
1879 async fn list_flows(
1880 &self,
1881 _partition: PartitionKey,
1882 _cursor: Option<FlowId>,
1883 _limit: usize,
1884 ) -> Result<ListFlowsPage, EngineError> {
1885 unreachable!()
1886 }
1887 #[cfg(feature = "core")]
1888 async fn list_lanes(
1889 &self,
1890 _cursor: Option<LaneId>,
1891 _limit: usize,
1892 ) -> Result<ListLanesPage, EngineError> {
1893 unreachable!()
1894 }
1895 #[cfg(feature = "core")]
1896 async fn list_suspended(
1897 &self,
1898 _partition: PartitionKey,
1899 _cursor: Option<ExecutionId>,
1900 _limit: usize,
1901 ) -> Result<ListSuspendedPage, EngineError> {
1902 unreachable!()
1903 }
1904 #[cfg(feature = "core")]
1905 async fn list_executions(
1906 &self,
1907 _partition: PartitionKey,
1908 _cursor: Option<ExecutionId>,
1909 _limit: usize,
1910 ) -> Result<ListExecutionsPage, EngineError> {
1911 unreachable!()
1912 }
1913 #[cfg(feature = "core")]
1914 async fn deliver_signal(
1915 &self,
1916 _args: DeliverSignalArgs,
1917 ) -> Result<DeliverSignalResult, EngineError> {
1918 unreachable!()
1919 }
1920 #[cfg(feature = "core")]
1921 async fn claim_resumed_execution(
1922 &self,
1923 _args: ClaimResumedExecutionArgs,
1924 ) -> Result<ClaimResumedExecutionResult, EngineError> {
1925 unreachable!()
1926 }
1927 async fn cancel_flow(
1928 &self,
1929 _id: &FlowId,
1930 _policy: CancelFlowPolicy,
1931 _wait: CancelFlowWait,
1932 ) -> Result<CancelFlowResult, EngineError> {
1933 unreachable!()
1934 }
1935 #[cfg(feature = "core")]
1936 async fn set_edge_group_policy(
1937 &self,
1938 _flow_id: &FlowId,
1939 _downstream_execution_id: &ExecutionId,
1940 _policy: crate::contracts::EdgeDependencyPolicy,
1941 ) -> Result<crate::contracts::SetEdgeGroupPolicyResult, EngineError> {
1942 unreachable!()
1943 }
1944 async fn report_usage(
1945 &self,
1946 _handle: &Handle,
1947 _budget: &BudgetId,
1948 _dimensions: crate::backend::UsageDimensions,
1949 ) -> Result<ReportUsageResult, EngineError> {
1950 unreachable!()
1951 }
1952 #[cfg(feature = "streaming")]
1953 async fn read_stream(
1954 &self,
1955 _execution_id: &ExecutionId,
1956 _attempt_index: AttemptIndex,
1957 _from: StreamCursor,
1958 _to: StreamCursor,
1959 _count_limit: u64,
1960 ) -> Result<StreamFrames, EngineError> {
1961 unreachable!()
1962 }
1963 #[cfg(feature = "streaming")]
1964 async fn tail_stream(
1965 &self,
1966 _execution_id: &ExecutionId,
1967 _attempt_index: AttemptIndex,
1968 _after: StreamCursor,
1969 _block_ms: u64,
1970 _count_limit: u64,
1971 _visibility: TailVisibility,
1972 ) -> Result<StreamFrames, EngineError> {
1973 unreachable!()
1974 }
1975 #[cfg(feature = "streaming")]
1976 async fn read_summary(
1977 &self,
1978 _execution_id: &ExecutionId,
1979 _attempt_index: AttemptIndex,
1980 ) -> Result<Option<SummaryDocument>, EngineError> {
1981 unreachable!()
1982 }
1983 }
1984
1985 /// The default `capabilities()` impl returns a value tagged
1986 /// `family = "unknown"` with every `supports.*` bool false, so
1987 /// pre-RFC-018 out-of-tree backends keep compiling and consumers
1988 /// can distinguish "backend predates RFC-018" from "backend
1989 /// reports concrete bools." Every concrete in-tree backend
1990 /// overrides.
1991 #[test]
1992 fn default_capabilities_is_unknown_family_all_false() {
1993 let b = DefaultBackend;
1994 let caps = b.capabilities();
1995 assert_eq!(caps.identity.family, "unknown");
1996 assert_eq!(
1997 caps.identity.version,
1998 crate::capability::Version::new(0, 0, 0)
1999 );
2000 assert_eq!(caps.identity.rfc017_stage, "unknown");
2001 // Every field false on the default (matches `Supports::none()`).
2002 assert_eq!(caps.supports, crate::capability::Supports::none());
2003 }
2004}