ff_core/engine_backend.rs
1//! The `EngineBackend` trait — abstracting FlowFabric's write surface.
2//!
3//! **RFC-012 Stage 1a:** this is the trait landing. The
4//! Valkey-backed impl lives in `ff-backend-valkey`; future backends
5//! (Postgres) add a sibling crate with their own impl. ff-sdk's
6//! `FlowFabricWorker` gains `connect_with(backend)` /
7//! `backend(&self)` accessors so consumers that want to bring their
8//! own backend (tests, future non-Valkey deployments) can hand one
9//! in. The hot-path migration of `ClaimedTask` / `FlowFabricWorker`
10//! to forward through the trait lands across Stages 1b-1d.
11//!
12//! # Object safety
13//!
14//! `EngineBackend` is object-safe: all methods are `async fn` behind
15//! `#[async_trait]` and take `&self`. Consumers can hold
16//! `Arc<dyn EngineBackend>` for heterogenous-backend deployments.
17//! The trait is `Send + Sync + 'static` per RFC-012 §4.1; every impl
18//! must honour that bound.
19//!
20//! # Error surface
21//!
22//! Every method returns [`Result<_, EngineError>`]. `EngineError`'s
23//! `Transport` variant carries a boxed `dyn Error + Send + Sync`;
24//! Valkey-backed transport faults box a
25//! `ff_script::error::ScriptError` (downcast via
26//! `ff_script::engine_error_ext::transport_script_ref`). Other
27//! backends box their native error type and set the `backend` tag
28//! accordingly.
29//!
30//! # Atomicity contract
31//!
32//! Per-op state transitions MUST be atomic (RFC-012 §3.4). On Valkey
33//! this is the single-FCALL-per-op property; on Postgres it is the
34//! per-transaction property. A backend that cannot honour atomicity
35//! for a given op either MUST NOT implement `EngineBackend` or MUST
36//! return `EngineError::Unavailable { op }` for the affected method.
37//!
38//! # Replay semantics
39//!
40//! `complete`, `fail`, `cancel`, `suspend`, `delay`, `wait_children`
41//! are idempotent under replay — calling twice with the same handle
42//! and args returns the same outcome (success on first call, typed
43//! `State` / `Contention` on subsequent calls where the fence triple
44//! no longer matches a live lease).
45
46use std::time::Duration;
47
48use async_trait::async_trait;
49
50use crate::backend::{
51 AppendFrameOutcome, CancelFlowPolicy, CancelFlowWait, CapabilitySet, ClaimPolicy,
52 FailOutcome, FailureClass, FailureReason, Frame, Handle, LeaseRenewal, PendingWaitpoint,
53 PrepareOutcome, ReclaimToken, ResumeSignal, SummaryDocument, TailVisibility,
54};
55use crate::contracts::{
56 CancelFlowResult, ExecutionSnapshot, FlowSnapshot, ReportUsageResult,
57 RotateWaitpointHmacSecretAllArgs, RotateWaitpointHmacSecretAllResult, SeedOutcome,
58 SeedWaitpointHmacSecretArgs, SuspendArgs, SuspendOutcome,
59};
60#[cfg(feature = "core")]
61use crate::contracts::{
62 AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
63 ApplyDependencyToChildResult, BudgetStatus, CancelExecutionArgs, CancelExecutionResult,
64 CancelFlowArgs, ChangePriorityArgs, ChangePriorityResult, ClaimForWorkerArgs,
65 ClaimForWorkerOutcome, ClaimResumedExecutionArgs, ClaimResumedExecutionResult,
66 CreateBudgetArgs, CreateBudgetResult, CreateExecutionArgs, CreateExecutionResult,
67 CreateFlowArgs, CreateFlowResult, CreateQuotaPolicyArgs, CreateQuotaPolicyResult,
68 DeliverSignalArgs, DeliverSignalResult, EdgeDirection, EdgeSnapshot, ExecutionInfo,
69 ListExecutionsPage, ListFlowsPage, ListLanesPage, ListPendingWaitpointsArgs,
70 ListPendingWaitpointsResult, ListSuspendedPage, ReplayExecutionArgs, ReplayExecutionResult,
71 ReportUsageAdminArgs, ResetBudgetArgs, ResetBudgetResult, RevokeLeaseArgs, RevokeLeaseResult,
72 StageDependencyEdgeArgs, StageDependencyEdgeResult,
73};
74#[cfg(feature = "core")]
75use crate::state::PublicState;
76#[cfg(feature = "core")]
77use crate::partition::PartitionKey;
78#[cfg(feature = "streaming")]
79use crate::contracts::{StreamCursor, StreamFrames};
80use crate::engine_error::EngineError;
81#[cfg(feature = "streaming")]
82use crate::types::AttemptIndex;
83#[cfg(feature = "core")]
84use crate::types::EdgeId;
85use crate::types::{BudgetId, ExecutionId, FlowId, LaneId, TimestampMs};
86
87/// The engine write surface — a single trait a backend implementation
88/// honours to serve a `FlowFabricWorker`.
89///
90/// See RFC-012 §3.1 for the inventory rationale and §3.3 for the
91/// type-level shape. 16 methods (Round-7 added `create_waitpoint`;
92/// `append_frame` return widened; `report_usage` return replaced —
93/// RFC-012 §R7). Issue #150 added the two trigger-surface methods
94/// (`deliver_signal` / `claim_resumed_execution`).
95///
96/// # Note on `complete` payload shape
97///
98/// The RFC §3.3 sketch uses `Option<Bytes>`; the Stage 1a trait uses
99/// `Option<Vec<u8>>` to match the existing
100/// `ff_sdk::ClaimedTask::complete` signature and avoid adding a
101/// `bytes` public-type dep for zero consumer benefit. Round-4 §7.17
102/// resolved the payload container debate to `Box<[u8]>` in the
103/// public type (see `HandleOpaque`); `Option<Vec<u8>>` is the
104/// zero-churn choice consistent with today's code. Consumers that
105/// need `&[u8]` can borrow via `.as_deref()` on the Option.
106#[async_trait]
107pub trait EngineBackend: Send + Sync + 'static {
108 // ── Claim + lifecycle ──
109
110 /// Fresh-work claim. Returns `Ok(None)` when no work is currently
111 /// available; `Err` only on transport or input-validation faults.
112 async fn claim(
113 &self,
114 lane: &LaneId,
115 capabilities: &CapabilitySet,
116 policy: ClaimPolicy,
117 ) -> Result<Option<Handle>, EngineError>;
118
119 /// Renew a held lease. Returns the updated expiry + epoch on
120 /// success; typed `State::StaleLease` / `State::LeaseExpired`
121 /// when the lease has been stolen or timed out.
122 async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError>;
123
124 /// Numeric-progress heartbeat.
125 ///
126 /// Writes scalar `progress_percent` / `progress_message` fields on
127 /// `exec_core`; each call overwrites the previous value. This does
128 /// NOT append to the output stream — stream-frame producers must use
129 /// [`append_frame`](Self::append_frame) instead.
130 async fn progress(
131 &self,
132 handle: &Handle,
133 percent: Option<u8>,
134 message: Option<String>,
135 ) -> Result<(), EngineError>;
136
137 /// Append one stream frame. Distinct from [`progress`](Self::progress)
138 /// per RFC-012 §3.1.1 K#6. Returns the backend-assigned stream entry
139 /// id and post-append frame count (RFC-012 §R7.2.1).
140 ///
141 /// Stream-frame producers (arbitrary `frame_type` + payload, consumed
142 /// via the read/tail surfaces) MUST use this method rather than
143 /// [`progress`](Self::progress); the latter updates scalar fields on
144 /// `exec_core` and is invisible to stream consumers.
145 async fn append_frame(
146 &self,
147 handle: &Handle,
148 frame: Frame,
149 ) -> Result<AppendFrameOutcome, EngineError>;
150
151 /// Terminal success. Borrows `handle` (round-4 M-D2) so callers
152 /// can retry under `EngineError::Transport` without losing the
153 /// cookie. Payload is `Option<Vec<u8>>` per the note above.
154 async fn complete(&self, handle: &Handle, payload: Option<Vec<u8>>) -> Result<(), EngineError>;
155
156 /// Terminal failure with classification. Returns [`FailOutcome`]
157 /// so the caller learns whether a retry was scheduled.
158 async fn fail(
159 &self,
160 handle: &Handle,
161 reason: FailureReason,
162 classification: FailureClass,
163 ) -> Result<FailOutcome, EngineError>;
164
165 /// Cooperative cancel by the worker holding the lease.
166 async fn cancel(&self, handle: &Handle, reason: &str) -> Result<(), EngineError>;
167
168 /// Suspend the execution awaiting a typed resume condition
169 /// (RFC-013 Stage 1d).
170 ///
171 /// Borrows `handle` (round-4 M-D2). Terminal-looking behaviour is
172 /// expressed through [`SuspendOutcome`]:
173 ///
174 /// * [`SuspendOutcome::Suspended`] — the pre-suspend handle is
175 /// logically invalidated; the fresh `HandleKind::Suspended`
176 /// handle inside the variant supersedes it. Runtime enforcement
177 /// via the fence triple: subsequent ops against the stale handle
178 /// surface as `Contention(LeaseConflict)`.
179 /// * [`SuspendOutcome::AlreadySatisfied`] — buffered signals on a
180 /// pending waitpoint already matched the resume condition at
181 /// suspension time. The lease is NOT released; the caller's
182 /// pre-suspend handle remains valid.
183 ///
184 /// See RFC-013 §2 for the type shapes, §3 for the replay /
185 /// idempotency contract, §4 for the error taxonomy.
186 async fn suspend(
187 &self,
188 handle: &Handle,
189 args: SuspendArgs,
190 ) -> Result<SuspendOutcome, EngineError>;
191
192 /// Issue a pending waitpoint for future signal delivery.
193 ///
194 /// Waitpoints have two states in the Valkey wire contract:
195 /// **pending** (token issued, not yet backing a suspension) and
196 /// **active** (bound to a suspension). This method creates a
197 /// waitpoint in the **pending** state. A later `suspend` call
198 /// transitions a pending waitpoint to active (see Lua
199 /// `use_pending_waitpoint` ARGV flag at
200 /// `flowfabric.lua:3603,3641,3690`) — or, if buffered signals
201 /// already satisfy its condition, the suspend call returns
202 /// `SuspendOutcome::AlreadySatisfied` and the waitpoint activates
203 /// without ever releasing the lease.
204 ///
205 /// Pending-waitpoint expiry is a first-class terminal error on
206 /// the wire (`PendingWaitpointExpired` at
207 /// `ff-script/src/error.rs:170,403-408`). The attempt retains its
208 /// lease while the waitpoint is pending; signals delivered to
209 /// this waitpoint are buffered server-side (RFC-012 §R7.2.2).
210 async fn create_waitpoint(
211 &self,
212 handle: &Handle,
213 waitpoint_key: &str,
214 expires_in: Duration,
215 ) -> Result<PendingWaitpoint, EngineError>;
216
217 /// Non-mutating observation of signals that satisfied the handle's
218 /// resume condition.
219 async fn observe_signals(&self, handle: &Handle) -> Result<Vec<ResumeSignal>, EngineError>;
220
221 /// Consume a reclaim grant to mint a resumed-kind handle. Returns
222 /// `Ok(None)` when the grant's target execution is no longer
223 /// resumable (already reclaimed, terminal, etc.).
224 async fn claim_from_reclaim(&self, token: ReclaimToken) -> Result<Option<Handle>, EngineError>;
225
226 // Round-5 amendment: lease-releasing peers of `suspend`.
227
228 /// Park the execution until `delay_until`, releasing the lease.
229 async fn delay(&self, handle: &Handle, delay_until: TimestampMs) -> Result<(), EngineError>;
230
231 /// Mark the execution as waiting for its child flow to complete,
232 /// releasing the lease.
233 async fn wait_children(&self, handle: &Handle) -> Result<(), EngineError>;
234
235 // ── Read / admin ──
236
237 /// Snapshot an execution by id. `Ok(None)` ⇒ no such execution.
238 async fn describe_execution(
239 &self,
240 id: &ExecutionId,
241 ) -> Result<Option<ExecutionSnapshot>, EngineError>;
242
243 /// Snapshot a flow by id. `Ok(None)` ⇒ no such flow.
244 async fn describe_flow(&self, id: &FlowId) -> Result<Option<FlowSnapshot>, EngineError>;
245
246 /// List dependency edges adjacent to an execution. Read-only; the
247 /// backend resolves the subject execution's flow, reads the
248 /// direction-specific adjacency SET, and decodes each member's
249 /// flow-scoped `edge:<edge_id>` hash.
250 ///
251 /// Returns an empty `Vec` when the subject has no edges on the
252 /// requested side — including standalone executions (no owning
253 /// flow). Ordering is unspecified: the underlying adjacency SET
254 /// is an unordered SMEMBERS read. Callers that need deterministic
255 /// order should sort by [`EdgeSnapshot::edge_id`] /
256 /// [`EdgeSnapshot::created_at`] themselves.
257 ///
258 /// Parse failures on the edge hash surface as
259 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]
260 /// — unknown fields, missing required fields, endpoint mismatches
261 /// against the adjacency SET all fail loud rather than silently
262 /// returning partial results.
263 ///
264 /// Gated on the `core` feature — edge reads are part of the
265 /// minimal engine surface a Postgres-style backend must honour.
266 ///
267 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
268 #[cfg(feature = "core")]
269 async fn list_edges(
270 &self,
271 flow_id: &FlowId,
272 direction: EdgeDirection,
273 ) -> Result<Vec<EdgeSnapshot>, EngineError>;
274
275 /// Snapshot a single dependency edge by its owning flow + edge id.
276 ///
277 /// `Ok(None)` when the edge hash is absent (never staged, or
278 /// staged under a different flow than `flow_id`). Parse failures
279 /// on a present edge hash surface as
280 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]
281 /// — the stored `flow_id` field is cross-checked against the
282 /// caller's expected `flow_id` so a wrong-key read fails loud
283 /// rather than returning an unrelated edge.
284 ///
285 /// Gated on the `core` feature — single-edge reads are part of
286 /// the minimal snapshot surface an alternate backend must honour
287 /// alongside [`Self::describe_execution`] / [`Self::describe_flow`]
288 /// / [`Self::list_edges`].
289 ///
290 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
291 #[cfg(feature = "core")]
292 async fn describe_edge(
293 &self,
294 flow_id: &FlowId,
295 edge_id: &EdgeId,
296 ) -> Result<Option<EdgeSnapshot>, EngineError>;
297
298 /// Resolve an execution's owning flow id, if any.
299 ///
300 /// `Ok(None)` when the execution's core record is absent or has
301 /// no associated flow (standalone execution). A present-but-
302 /// malformed `flow_id` field surfaces as
303 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`].
304 ///
305 /// Gated on the `core` feature. Used by ff-sdk's
306 /// `list_outgoing_edges` / `list_incoming_edges` to pivot from a
307 /// consumer-supplied `ExecutionId` to the `FlowId` required by
308 /// [`Self::list_edges`]. A Valkey backend serves this with a
309 /// single `HGET exec_core flow_id`; a Postgres backend serves it
310 /// with the equivalent single-column row lookup.
311 ///
312 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
313 #[cfg(feature = "core")]
314 async fn resolve_execution_flow_id(
315 &self,
316 eid: &ExecutionId,
317 ) -> Result<Option<FlowId>, EngineError>;
318
319 /// List flows on a partition with cursor-based pagination (issue
320 /// #185).
321 ///
322 /// Returns a [`ListFlowsPage`] of [`FlowSummary`](crate::contracts::FlowSummary)
323 /// rows ordered by `flow_id` (UUID byte-lexicographic). `cursor`
324 /// is `None` for the first page; callers forward the returned
325 /// `next_cursor` verbatim to continue iteration, and the listing
326 /// is exhausted when `next_cursor` is `None`. `limit` is the
327 /// maximum number of rows to return on this page — implementations
328 /// MAY return fewer (end of partition) but MUST NOT exceed it.
329 ///
330 /// Ordering rationale: flow ids are UUIDs, and both Valkey
331 /// (sort after-the-fact) and Postgres (`ORDER BY flow_id`) can
332 /// agree on byte-lexicographic order — the same order
333 /// `FlowId::to_string()` produces for canonical hyphenated UUIDs.
334 /// Mapping to `cursor > flow_id` keeps the contract backend-
335 /// independent.
336 ///
337 /// # Postgres implementation pattern
338 ///
339 /// A Postgres-backed implementation serves this directly with
340 ///
341 /// ```sql
342 /// SELECT flow_id, created_at_ms, public_flow_state
343 /// FROM ff_flow
344 /// WHERE partition_key = $1
345 /// AND ($2::uuid IS NULL OR flow_id > $2)
346 /// ORDER BY flow_id
347 /// LIMIT $3 + 1;
348 /// ```
349 ///
350 /// — reading one extra row to decide whether `next_cursor` should
351 /// be set to the last row's `flow_id`. The Valkey implementation
352 /// maintains the `ff:idx:{fp:N}:flow_index` SET and performs the
353 /// sort + slice client-side (SMEMBERS then sort-by-UUID-bytes),
354 /// pipelining `HGETALL flow_core` for each row on the page.
355 ///
356 /// Gated on the `core` feature — flow listing is part of the
357 /// minimal engine surface a Postgres-style backend must honour.
358 #[cfg(feature = "core")]
359 async fn list_flows(
360 &self,
361 partition: PartitionKey,
362 cursor: Option<FlowId>,
363 limit: usize,
364 ) -> Result<ListFlowsPage, EngineError>;
365
366 /// Enumerate registered lanes with cursor-based pagination.
367 ///
368 /// Lanes are global (not partition-scoped) — the backend serves
369 /// this from its lane registry and does NOT accept a
370 /// [`crate::partition::Partition`] argument. Results are sorted
371 /// by [`LaneId`] name so the ordering is stable across calls and
372 /// cursors address a deterministic position in the sort.
373 ///
374 /// * `cursor` — exclusive lower bound. `None` starts from the
375 /// first lane. To continue a walk, pass the previous page's
376 /// [`ListLanesPage::next_cursor`].
377 /// * `limit` — hard cap on the number of lanes returned in the
378 /// page. Backends MAY round this down when the registry size
379 /// is smaller; they MUST NOT return more than `limit`.
380 ///
381 /// [`ListLanesPage::next_cursor`] is `Some(last_lane_in_page)`
382 /// iff at least one more lane exists after the returned page,
383 /// and `None` on the final page. Callers loop until `next_cursor`
384 /// is `None` to read the full registry.
385 ///
386 /// Gated on the `core` feature — lane enumeration is part of the
387 /// minimal snapshot surface an alternate backend must honour
388 /// alongside [`Self::describe_flow`] / [`Self::list_edges`].
389 #[cfg(feature = "core")]
390 async fn list_lanes(
391 &self,
392 cursor: Option<LaneId>,
393 limit: usize,
394 ) -> Result<ListLanesPage, EngineError>;
395
396 /// List suspended executions in one partition, cursor-paginated,
397 /// with each entry's suspension `reason_code` populated (issue
398 /// #183).
399 ///
400 /// Consumer-facing "what's blocked on what?" panels (ff-board's
401 /// suspended-executions view, operator CLIs) need the reason in
402 /// the list response so the UI does not round-trip per row to
403 /// `describe_execution` for a field it knows it needs. `reason`
404 /// on [`SuspendedExecutionEntry`] carries the free-form
405 /// `suspension:current.reason_code` field — see the type rustdoc
406 /// for the String-not-enum rationale.
407 ///
408 /// `cursor` is opaque to callers; pass `None` to start a fresh
409 /// scan and feed the returned [`ListSuspendedPage::next_cursor`]
410 /// back in on subsequent pages until it comes back `None`.
411 /// `limit` bounds the `entries` count; backends MAY return fewer
412 /// when the partition is exhausted.
413 ///
414 /// Ordering is by ascending `suspended_at_ms` (the per-lane
415 /// suspended ZSET score == `timeout_at` or the no-timeout
416 /// sentinel) with execution id as a lex tiebreak, so cursor
417 /// continuation is deterministic across calls.
418 ///
419 /// Gated on the `core` feature — suspended-list enumeration is
420 /// part of the minimal engine surface a Postgres-style backend
421 /// must honour.
422 #[cfg(feature = "core")]
423 async fn list_suspended(
424 &self,
425 partition: PartitionKey,
426 cursor: Option<ExecutionId>,
427 limit: usize,
428 ) -> Result<ListSuspendedPage, EngineError>;
429
430 /// Forward-only paginated listing of the executions indexed under
431 /// one partition.
432 ///
433 /// Reads the partition-wide `ff:idx:{p:N}:all_executions` set,
434 /// sorts lexicographically on `ExecutionId`, and returns the page
435 /// of ids strictly greater than `cursor` (or starting from the
436 /// smallest id when `cursor = None`). The returned
437 /// [`ListExecutionsPage::next_cursor`] is the last id on the page
438 /// iff at least one more id exists past it; `None` signals
439 /// end-of-stream.
440 ///
441 /// `limit` is the maximum number of ids returned on this page. A
442 /// `limit` of `0` returns an empty page with `next_cursor = None`.
443 /// Backends MAY cap `limit` internally (Valkey: 1000) and return
444 /// fewer ids than requested; callers continue paginating until
445 /// `next_cursor == None`.
446 ///
447 /// Ordering is stable under concurrent inserts for already-emitted
448 /// ids (an id less-than-or-equal-to the caller's cursor is never
449 /// re-emitted in later pages) but new inserts past the cursor WILL
450 /// appear in subsequent pages — consistent with forward-only
451 /// cursor semantics.
452 ///
453 /// Gated on the `core` feature — partition-scoped listing is part
454 /// of the minimal engine surface every backend must honour.
455 #[cfg(feature = "core")]
456 async fn list_executions(
457 &self,
458 partition: PartitionKey,
459 cursor: Option<ExecutionId>,
460 limit: usize,
461 ) -> Result<ListExecutionsPage, EngineError>;
462
463 // ── Trigger ops (issue #150) ──
464
465 /// Deliver an external signal to a suspended execution's waitpoint.
466 ///
467 /// The backend atomically records the signal, evaluates the resume
468 /// condition, and — when satisfied — transitions the execution
469 /// from `suspended` to `runnable` (or buffers the signal when the
470 /// waitpoint is still `pending`). Duplicate delivery — same
471 /// `idempotency_key` + waitpoint — surfaces as
472 /// [`DeliverSignalResult::Duplicate`] with the pre-existing
473 /// `signal_id` rather than mutating state twice.
474 ///
475 /// Input validation (HMAC token presence, payload size limits,
476 /// signal-name shape) is the backend's responsibility; callers
477 /// pass a fully populated [`DeliverSignalArgs`] and receive typed
478 /// outcomes or typed errors (`ScriptError::invalid_token`,
479 /// `ScriptError::token_expired`, `ScriptError::ExecutionNotFound`
480 /// surfaced via [`EngineError::Transport`] on the Valkey backend).
481 ///
482 /// Gated on the `core` feature — signal delivery is part of the
483 /// minimal trigger surface every backend must honour so ff-server
484 /// / REST handlers can dispatch against `Arc<dyn EngineBackend>`
485 /// without knowing which backend is running underneath.
486 #[cfg(feature = "core")]
487 async fn deliver_signal(
488 &self,
489 args: DeliverSignalArgs,
490 ) -> Result<DeliverSignalResult, EngineError>;
491
492 /// Claim a resumed execution — a previously-suspended attempt that
493 /// has cleared its resume condition (e.g. via
494 /// [`Self::deliver_signal`]) and now needs a worker to pick up the
495 /// same attempt index.
496 ///
497 /// Distinct from [`Self::claim`] (fresh work) and
498 /// [`Self::claim_from_reclaim`] (grant-based ownership transfer
499 /// after a crash): the resumed-claim path re-binds an existing
500 /// attempt rather than minting a new one. The backend issues a
501 /// fresh `lease_id` + bumps the `lease_epoch`, preserving
502 /// `attempt_id` / `attempt_index` so stream frames and progress
503 /// updates continue on the same attempt.
504 ///
505 /// Typed failures surface via `ScriptError` → `EngineError`:
506 /// `NotAResumedExecution` when the attempt state is not
507 /// `attempt_interrupted`, `ExecutionNotLeaseable` when the
508 /// lifecycle phase is not `runnable`, and `InvalidClaimGrant`
509 /// when the grant key is missing or was already consumed.
510 ///
511 /// Gated on the `core` feature — resumed-claim is part of the
512 /// minimal trigger surface every backend must honour.
513 #[cfg(feature = "core")]
514 async fn claim_resumed_execution(
515 &self,
516 args: ClaimResumedExecutionArgs,
517 ) -> Result<ClaimResumedExecutionResult, EngineError>;
518
519 /// Operator-initiated cancellation of a flow and (optionally) its
520 /// member executions. See RFC-012 §3.1.1 for the policy /wait
521 /// matrix.
522 async fn cancel_flow(
523 &self,
524 id: &FlowId,
525 policy: CancelFlowPolicy,
526 wait: CancelFlowWait,
527 ) -> Result<CancelFlowResult, EngineError>;
528
529 /// RFC-016 Stage A: set the inbound-edge-group policy for a
530 /// downstream execution. Must be called before the first
531 /// `add_dependency(... -> downstream_execution_id)` — the backend
532 /// rejects with [`EngineError::Conflict`] if edges have already
533 /// been staged for this group.
534 ///
535 /// Stage A honours only
536 /// [`EdgeDependencyPolicy::AllOf`](crate::contracts::EdgeDependencyPolicy::AllOf);
537 /// the `AnyOf` / `Quorum` variants return
538 /// [`EngineError::Validation`] with
539 /// `detail = "stage A supports AllOf only; AnyOf/Quorum land in stage B"`
540 /// until Stage B's resolver lands.
541 #[cfg(feature = "core")]
542 async fn set_edge_group_policy(
543 &self,
544 flow_id: &FlowId,
545 downstream_execution_id: &ExecutionId,
546 policy: crate::contracts::EdgeDependencyPolicy,
547 ) -> Result<crate::contracts::SetEdgeGroupPolicyResult, EngineError>;
548
549 // ── HMAC secret rotation (v0.7 migration-master Q4) ──
550
551 /// Rotate the waitpoint HMAC signing kid **cluster-wide**.
552 ///
553 /// **v0.7 migration-master Q4 (adjudicated 2026-04-24).**
554 /// Additive trait surface so Valkey and Postgres backends can
555 /// both expose the "rotate everywhere" semantic under one name.
556 ///
557 /// * Valkey impl fans out an `ff_rotate_waitpoint_hmac_secret`
558 /// FCALL per execution partition. `entries.len() == num_flow_partitions`
559 /// and per-partition failures are surfaced as inner `Err`
560 /// entries — the call as a whole does not fail when one
561 /// partition's FCALL fails, matching
562 /// [`ff_sdk::admin::rotate_waitpoint_hmac_secret_all_partitions`]'s
563 /// partial-success contract.
564 /// * Postgres impl (Wave 4) writes one row to
565 /// `ff_waitpoint_hmac(kid, secret, rotated_at)` and returns a
566 /// single-entry vec with `partition = 0`.
567 ///
568 /// The default impl returns
569 /// [`EngineError::Unavailable`] with
570 /// `op = "rotate_waitpoint_hmac_secret_all"` so backends that
571 /// haven't implemented the method surface the miss loudly rather
572 /// than silently no-op'ing. Both concrete backends override.
573 async fn rotate_waitpoint_hmac_secret_all(
574 &self,
575 _args: RotateWaitpointHmacSecretAllArgs,
576 ) -> Result<RotateWaitpointHmacSecretAllResult, EngineError> {
577 Err(EngineError::Unavailable {
578 op: "rotate_waitpoint_hmac_secret_all",
579 })
580 }
581
582 /// Seed the initial waitpoint HMAC secret for a fresh deployment
583 /// (issue #280).
584 ///
585 /// **Idempotent.** If a `current_kid` (Valkey per-partition) or
586 /// an active kid row (Postgres) already exists with the given
587 /// `kid`, the method returns
588 /// [`SeedOutcome::AlreadySeeded`] without overwriting, reporting
589 /// whether the stored secret matches the caller-supplied one via
590 /// `same_secret`. Callers (cairn boot, operator tooling) invoke
591 /// this on every boot and let the backend decide whether to
592 /// install — removing the client-side "check then HSET" race that
593 /// cairn's raw-HSET boot path silently tolerated.
594 ///
595 /// For rotation of an already-seeded secret, use
596 /// [`Self::rotate_waitpoint_hmac_secret_all`] instead; seed is
597 /// install-only.
598 ///
599 /// The default impl returns [`EngineError::Unavailable`] with
600 /// `op = "seed_waitpoint_hmac_secret"` so backends that haven't
601 /// implemented the method surface the miss loudly.
602 async fn seed_waitpoint_hmac_secret(
603 &self,
604 _args: SeedWaitpointHmacSecretArgs,
605 ) -> Result<SeedOutcome, EngineError> {
606 Err(EngineError::Unavailable {
607 op: "seed_waitpoint_hmac_secret",
608 })
609 }
610
611 // ── Budget ──
612
613 /// Report usage against a budget and check limits. Returns the
614 /// typed [`ReportUsageResult`] variant; backends enforce
615 /// idempotency via the caller-supplied
616 /// [`UsageDimensions::dedup_key`] (RFC-012 §R7.2.3 — replaces
617 /// the pre-Round-7 `AdmissionDecision` return).
618 async fn report_usage(
619 &self,
620 handle: &Handle,
621 budget: &BudgetId,
622 dimensions: crate::backend::UsageDimensions,
623 ) -> Result<ReportUsageResult, EngineError>;
624
625 // ── Stream reads (RFC-012 Stage 1c tranche-4; issue #87) ──
626
627 /// Read frames from a completed or in-flight attempt's stream.
628 ///
629 /// `from` / `to` are [`StreamCursor`] values — `StreamCursor::Start`
630 /// / `StreamCursor::End` are equivalent to XRANGE `-` / `+`, and
631 /// `StreamCursor::At("<id>")` reads from a concrete entry id.
632 ///
633 /// Input validation (count_limit bounds, cursor shape) is the
634 /// caller's responsibility — SDK-side wrappers in
635 /// [`ff-sdk`](https://docs.rs/ff-sdk) enforce bounds before
636 /// forwarding. Backends MAY additionally reject out-of-range
637 /// input via [`EngineError::Validation`].
638 ///
639 /// Gated on the `streaming` feature — stream reads are part of
640 /// the stream-subset surface a backend without XREAD-like
641 /// primitives may omit.
642 #[cfg(feature = "streaming")]
643 async fn read_stream(
644 &self,
645 execution_id: &ExecutionId,
646 attempt_index: AttemptIndex,
647 from: StreamCursor,
648 to: StreamCursor,
649 count_limit: u64,
650 ) -> Result<StreamFrames, EngineError>;
651
652 /// Tail a live attempt's stream.
653 ///
654 /// `after` is an exclusive [`StreamCursor`] — entries with id
655 /// strictly greater than `after` are returned. `StreamCursor::Start`
656 /// / `StreamCursor::End` are NOT accepted here; callers MUST pass
657 /// a concrete id (or `StreamCursor::from_beginning()`). The SDK
658 /// wrapper rejects the open markers before reaching the backend.
659 ///
660 /// `block_ms == 0` → non-blocking peek. `block_ms > 0` → blocks up
661 /// to that many ms for a new entry.
662 ///
663 /// `visibility` (RFC-015 §6.1) filters the returned entries by
664 /// their stored [`StreamMode`](crate::backend::StreamMode)
665 /// `mode` field. Default
666 /// [`TailVisibility::All`](crate::backend::TailVisibility::All)
667 /// preserves v1 behaviour.
668 ///
669 /// Gated on the `streaming` feature — see [`read_stream`](Self::read_stream).
670 #[cfg(feature = "streaming")]
671 async fn tail_stream(
672 &self,
673 execution_id: &ExecutionId,
674 attempt_index: AttemptIndex,
675 after: StreamCursor,
676 block_ms: u64,
677 count_limit: u64,
678 visibility: TailVisibility,
679 ) -> Result<StreamFrames, EngineError>;
680
681 /// Read the rolling summary document for an attempt (RFC-015 §6.3).
682 ///
683 /// Returns `Ok(None)` when no [`StreamMode::DurableSummary`](crate::backend::StreamMode::DurableSummary)
684 /// frame has ever been appended for the attempt. Non-blocking Hash
685 /// read; safe to call from any consumer without holding the lease.
686 ///
687 /// Gated on the `streaming` feature — summary reads are part of
688 /// the stream-subset surface.
689 #[cfg(feature = "streaming")]
690 async fn read_summary(
691 &self,
692 execution_id: &ExecutionId,
693 attempt_index: AttemptIndex,
694 ) -> Result<Option<SummaryDocument>, EngineError>;
695
696 // ── RFC-017 Stage A — Ingress (5) ──────────────────────────
697 //
698 // Every method in this block has a default impl returning
699 // `EngineError::Unavailable { op }` per RFC-017 §5.3. Concrete
700 // backends override each method with a real body. A missing
701 // override surfaces as a loud typed error at the call site rather
702 // than a silent no-op.
703
704 /// Create an execution. Ingress row 6 (RFC-017 §4). Wraps
705 /// `ff_create_execution` on Valkey; `INSERT INTO ff_execution ...`
706 /// on Postgres. The `idempotency_key` + backend-side default
707 /// `dedup_ttl_ms = 86400000` make duplicate submissions idempotent.
708 #[cfg(feature = "core")]
709 async fn create_execution(
710 &self,
711 _args: CreateExecutionArgs,
712 ) -> Result<CreateExecutionResult, EngineError> {
713 Err(EngineError::Unavailable {
714 op: "create_execution",
715 })
716 }
717
718 /// Create a flow header. Ingress row 5.
719 #[cfg(feature = "core")]
720 async fn create_flow(
721 &self,
722 _args: CreateFlowArgs,
723 ) -> Result<CreateFlowResult, EngineError> {
724 Err(EngineError::Unavailable { op: "create_flow" })
725 }
726
727 /// Atomically add an execution to a flow (single-FCALL co-located
728 /// commit on Valkey; single-transaction UPSERT on Postgres).
729 #[cfg(feature = "core")]
730 async fn add_execution_to_flow(
731 &self,
732 _args: AddExecutionToFlowArgs,
733 ) -> Result<AddExecutionToFlowResult, EngineError> {
734 Err(EngineError::Unavailable {
735 op: "add_execution_to_flow",
736 })
737 }
738
739 /// Stage a dependency edge between flow members. CAS-guarded on
740 /// `graph_revision` — stale rev returns `Contention(StaleGraphRevision)`.
741 #[cfg(feature = "core")]
742 async fn stage_dependency_edge(
743 &self,
744 _args: StageDependencyEdgeArgs,
745 ) -> Result<StageDependencyEdgeResult, EngineError> {
746 Err(EngineError::Unavailable {
747 op: "stage_dependency_edge",
748 })
749 }
750
751 /// Apply a staged dependency edge to its downstream child.
752 #[cfg(feature = "core")]
753 async fn apply_dependency_to_child(
754 &self,
755 _args: ApplyDependencyToChildArgs,
756 ) -> Result<ApplyDependencyToChildResult, EngineError> {
757 Err(EngineError::Unavailable {
758 op: "apply_dependency_to_child",
759 })
760 }
761
762 // ── RFC-017 Stage A — Operator control (4) ─────────────────
763
764 /// Operator-initiated execution cancel (row 2).
765 #[cfg(feature = "core")]
766 async fn cancel_execution(
767 &self,
768 _args: CancelExecutionArgs,
769 ) -> Result<CancelExecutionResult, EngineError> {
770 Err(EngineError::Unavailable {
771 op: "cancel_execution",
772 })
773 }
774
775 /// Re-score an execution's eligibility priority (row 17).
776 #[cfg(feature = "core")]
777 async fn change_priority(
778 &self,
779 _args: ChangePriorityArgs,
780 ) -> Result<ChangePriorityResult, EngineError> {
781 Err(EngineError::Unavailable {
782 op: "change_priority",
783 })
784 }
785
786 /// Replay a terminal execution (row 22). Variadic KEYS handling
787 /// (inbound-edge pre-read) is hidden inside the Valkey impl per
788 /// RFC-017 §4 row 3.
789 #[cfg(feature = "core")]
790 async fn replay_execution(
791 &self,
792 _args: ReplayExecutionArgs,
793 ) -> Result<ReplayExecutionResult, EngineError> {
794 Err(EngineError::Unavailable {
795 op: "replay_execution",
796 })
797 }
798
799 /// Operator-initiated lease revoke (row 19).
800 #[cfg(feature = "core")]
801 async fn revoke_lease(
802 &self,
803 _args: RevokeLeaseArgs,
804 ) -> Result<RevokeLeaseResult, EngineError> {
805 Err(EngineError::Unavailable { op: "revoke_lease" })
806 }
807
808 // ── RFC-017 Stage A — Budget + quota admin (5) ─────────────
809
810 /// Create a budget definition (row 6).
811 #[cfg(feature = "core")]
812 async fn create_budget(
813 &self,
814 _args: CreateBudgetArgs,
815 ) -> Result<CreateBudgetResult, EngineError> {
816 Err(EngineError::Unavailable {
817 op: "create_budget",
818 })
819 }
820
821 /// Reset a budget's usage counters (row 10).
822 #[cfg(feature = "core")]
823 async fn reset_budget(
824 &self,
825 _args: ResetBudgetArgs,
826 ) -> Result<ResetBudgetResult, EngineError> {
827 Err(EngineError::Unavailable { op: "reset_budget" })
828 }
829
830 /// Create a quota policy (row 7).
831 #[cfg(feature = "core")]
832 async fn create_quota_policy(
833 &self,
834 _args: CreateQuotaPolicyArgs,
835 ) -> Result<CreateQuotaPolicyResult, EngineError> {
836 Err(EngineError::Unavailable {
837 op: "create_quota_policy",
838 })
839 }
840
841 /// Read-only budget status for operator visibility (row 8).
842 #[cfg(feature = "core")]
843 async fn get_budget_status(
844 &self,
845 _id: &BudgetId,
846 ) -> Result<BudgetStatus, EngineError> {
847 Err(EngineError::Unavailable {
848 op: "get_budget_status",
849 })
850 }
851
852 /// Admin-path `report_usage` (row 9 + RFC-017 §5 round-1 F4).
853 /// Distinct from the existing [`Self::report_usage`] which takes
854 /// a worker handle — the admin path has no lease context.
855 #[cfg(feature = "core")]
856 async fn report_usage_admin(
857 &self,
858 _budget: &BudgetId,
859 _args: ReportUsageAdminArgs,
860 ) -> Result<ReportUsageResult, EngineError> {
861 Err(EngineError::Unavailable {
862 op: "report_usage_admin",
863 })
864 }
865
866 // ── RFC-017 Stage A — Read + diagnostics (3) ───────────────
867
868 /// Fetch the stored result payload for a completed execution
869 /// (row 4). Returns `Ok(None)` when the execution is missing, not
870 /// yet complete, or its payload was trimmed by retention policy.
871 async fn get_execution_result(
872 &self,
873 _id: &ExecutionId,
874 ) -> Result<Option<Vec<u8>>, EngineError> {
875 Err(EngineError::Unavailable {
876 op: "get_execution_result",
877 })
878 }
879
880 /// List the pending-or-active waitpoints for an execution, cursor
881 /// paginated (row 5 / §8). Stage A preserves the existing
882 /// `PendingWaitpointInfo` shape; Stage D ships the §8 HMAC
883 /// sanitisation + `(token_kid, token_fingerprint)` schema.
884 #[cfg(feature = "core")]
885 async fn list_pending_waitpoints(
886 &self,
887 _args: ListPendingWaitpointsArgs,
888 ) -> Result<ListPendingWaitpointsResult, EngineError> {
889 Err(EngineError::Unavailable {
890 op: "list_pending_waitpoints",
891 })
892 }
893
894 /// Backend-level reachability probe (row 1). Valkey: `PING`;
895 /// Postgres: `SELECT 1`.
896 async fn ping(&self) -> Result<(), EngineError> {
897 Err(EngineError::Unavailable { op: "ping" })
898 }
899
900 // ── RFC-017 Stage A — Scheduling (1) ───────────────────────
901
902 /// Scheduler-routed claim entrypoint (row 18, RFC-017 §7). Valkey
903 /// forwards to its `ff_scheduler::Scheduler` cursor; Postgres
904 /// forwards to `PostgresScheduler`'s `FOR UPDATE SKIP LOCKED`
905 /// path.
906 ///
907 /// Backends that carry an embedded scheduler (e.g. `ValkeyBackend`
908 /// constructed via `with_embedded_scheduler`, or `PostgresBackend`
909 /// with its `with_scanners` sibling) route the claim through it.
910 /// Backends without a wired scheduler return
911 /// [`EngineError::Unavailable`]. HTTP consumers use
912 /// `FlowFabricWorker::claim_via_server` instead.
913 #[cfg(feature = "core")]
914 async fn claim_for_worker(
915 &self,
916 _args: ClaimForWorkerArgs,
917 ) -> Result<ClaimForWorkerOutcome, EngineError> {
918 Err(EngineError::Unavailable {
919 op: "claim_for_worker",
920 })
921 }
922
923 // ── Cross-cutting (RFC-017 Stage B trait-lift) ──────────────
924
925 /// Static observability label identifying the backend family in
926 /// logs + metrics (RFC-017 §5.4 + §9 Stage B). Default impl
927 /// returns `"unknown"` so legacy `impl EngineBackend` blocks that
928 /// have not upgraded keep compiling; every in-tree backend
929 /// overrides — `ValkeyBackend` → `"valkey"`, `PostgresBackend` →
930 /// `"postgres"`.
931 fn backend_label(&self) -> &'static str {
932 "unknown"
933 }
934
935 /// RFC-018 Stage A: snapshot of this backend's identity + the
936 /// capability matrix it can actually service. Consumers use this
937 /// at startup to gate UI features / choose between alternative
938 /// code paths before dispatching. See
939 /// `rfcs/RFC-018-backend-capability-discovery.md` for the full
940 /// discovery contract and the four owner-adjudicated open
941 /// questions (granularity: coarse; version: struct; sync; no
942 /// event stream).
943 ///
944 /// Default: returns an empty matrix tagged `family = "unknown"`
945 /// so pre-RFC-018 out-of-tree backends keep compiling and
946 /// consumers treat "no rows" as "dispatch and catch
947 /// [`EngineError::Unavailable`]" (pre-RFC-018 behaviour).
948 /// Concrete in-tree backends (`ValkeyBackend`, `PostgresBackend`)
949 /// override to populate the real matrix.
950 ///
951 /// Sync (no `.await`): backend-static info should not require a
952 /// probe on every query. Dynamic probes happen once at
953 /// `connect*` time and cache the result.
954 fn capabilities_matrix(&self) -> crate::capability::CapabilityMatrix {
955 crate::capability::CapabilityMatrix::new(crate::capability::BackendIdentity::new(
956 "unknown",
957 crate::capability::Version::new(0, 0, 0),
958 "unknown",
959 ))
960 }
961
962 /// Issue #281: run one-time backend-specific boot preparation.
963 ///
964 /// Intended to run ONCE per deployment startup — NOT per request.
965 /// Idempotent and safe for consumers to call on every application
966 /// boot; backends that have nothing to do return
967 /// [`PrepareOutcome::NoOp`] without side effects.
968 ///
969 /// Per-backend behaviour:
970 ///
971 /// * **Valkey** — issues `FUNCTION LOAD REPLACE` for the
972 /// `flowfabric` Lua library (with bounded retry on transient
973 /// transport faults; permanent compile errors surface as
974 /// [`EngineError::Transport`] without retry). Returns
975 /// [`PrepareOutcome::Applied`] carrying
976 /// `"FUNCTION LOAD (flowfabric lib v<N>)"`.
977 /// * **Postgres** — returns [`PrepareOutcome::NoOp`]. Schema
978 /// migrations are applied out-of-band per
979 /// `rfcs/drafts/v0.7-migration-master.md §Q12`; the backend
980 /// runs a schema-version check at connect time and refuses to
981 /// start on mismatch, so no boot-side prepare work remains.
982 /// * **Default impl** — returns [`PrepareOutcome::NoOp`] so
983 /// out-of-tree backends without preparation work compile
984 /// without boilerplate.
985 ///
986 /// # Relationship to the in-tree boot path
987 ///
988 /// `ValkeyBackend::initialize_deployment` (called from
989 /// `Server::start_with_metrics`) already invokes
990 /// [`ensure_library`](ff_script::loader::ensure_library) inline as
991 /// its step 4; that path is unchanged. `prepare()` exists as a
992 /// **trait-surface entry point** so consumers that construct an
993 /// `Arc<dyn EngineBackend>` outside of `Server` (e.g.
994 /// cairn-fabric's boot path at `cairn-fabric/src/boot.rs`) can
995 /// run the same preparation without reaching into
996 /// backend-specific modules. The overlap is intentional: calling
997 /// both `prepare()` and `initialize_deployment` is safe because
998 /// `FUNCTION LOAD REPLACE` is idempotent under the version
999 /// check.
1000 ///
1001 /// # Layer forwarding
1002 ///
1003 /// Layer impls (`HookedBackend`, ff-sdk layers) do NOT forward
1004 /// `prepare` today — consistent with `backend_label` / `ping` /
1005 /// `shutdown_prepare`. Consumers that wrap a backend in layers
1006 /// MUST call `prepare()` on the raw backend before wrapping, or
1007 /// accept the default [`PrepareOutcome::NoOp`].
1008 async fn prepare(&self) -> Result<PrepareOutcome, EngineError> {
1009 Ok(PrepareOutcome::NoOp)
1010 }
1011
1012 /// Drain-before-shutdown hook (RFC-017 §5.4). The server calls
1013 /// this before draining its own background tasks so backend-
1014 /// scoped primitives (Valkey stream semaphore, Postgres sqlx
1015 /// pool, …) can close their gates and await in-flight work up to
1016 /// `grace`.
1017 ///
1018 /// Default impl returns `Ok(())` — a no-op backend has nothing
1019 /// backend-scoped to drain. Concrete backends whose data plane
1020 /// owns resources (connection pools, semaphores, listeners)
1021 /// override with a real body.
1022 async fn shutdown_prepare(&self, _grace: Duration) -> Result<(), EngineError> {
1023 Ok(())
1024 }
1025
1026 // ── RFC-017 Stage E2 — `Server::client` removal (header + reads) ───
1027
1028 /// RFC-017 Stage E2: the "header" portion of `cancel_flow` — run the
1029 /// atomic flow-state flip (Valkey: `ff_cancel_flow` FCALL; Postgres:
1030 /// `cancel_flow_once` tx), decode policy + membership, and surface
1031 /// the `flow_already_terminal` idempotency branch as a first-class
1032 /// [`CancelFlowHeader::AlreadyTerminal`] so the Server can build
1033 /// the wire [`CancelFlowResult`] without reaching for a raw
1034 /// `Client`. Separate from the existing
1035 /// [`EngineBackend::cancel_flow`] entry point (which takes the
1036 /// enum-typed `(policy, wait)` split and returns the wait-collapsed
1037 /// `CancelFlowResult`) because the Server owns its own
1038 /// wait-dispatch + member-cancel machinery via
1039 /// [`EngineBackend::cancel_execution`] + backlog ack.
1040 ///
1041 /// Default impl returns [`EngineError::Unavailable`] so un-migrated
1042 /// backends surface the miss loudly.
1043 #[cfg(feature = "core")]
1044 async fn cancel_flow_header(
1045 &self,
1046 _args: CancelFlowArgs,
1047 ) -> Result<crate::contracts::CancelFlowHeader, EngineError> {
1048 Err(EngineError::Unavailable {
1049 op: "cancel_flow_header",
1050 })
1051 }
1052
1053 /// RFC-017 Stage E2: best-effort acknowledgement that one member of
1054 /// a `cancel_all` flow has completed its per-member cancel. Drains
1055 /// the member from the flow's `pending_cancels` set and, if empty,
1056 /// removes the flow from the partition-level `cancel_backlog`
1057 /// (Valkey: `ff_ack_cancel_member` FCALL; Postgres: table write —
1058 /// default `Unavailable` until Wave 9).
1059 ///
1060 /// Failures are swallowed by the caller — the cancel-backlog
1061 /// reconciler is the authoritative drain — but a typed error here
1062 /// lets the caller log a backend-scoped context string.
1063 #[cfg(feature = "core")]
1064 async fn ack_cancel_member(
1065 &self,
1066 _flow_id: &FlowId,
1067 _execution_id: &ExecutionId,
1068 ) -> Result<(), EngineError> {
1069 Err(EngineError::Unavailable {
1070 op: "ack_cancel_member",
1071 })
1072 }
1073
1074 /// RFC-017 Stage E2: full-shape execution read used by the
1075 /// `GET /v1/executions/{id}` HTTP route. Returns the legacy
1076 /// [`ExecutionInfo`] wire shape (not the decoupled
1077 /// [`ExecutionSnapshot`]) so the existing HTTP response bytes stay
1078 /// identical across the migration.
1079 ///
1080 /// `Ok(None)` ⇒ no such execution. Default `Unavailable` because
1081 /// the Valkey HGETALL-and-parse is backend-specific.
1082 #[cfg(feature = "core")]
1083 async fn read_execution_info(
1084 &self,
1085 _id: &ExecutionId,
1086 ) -> Result<Option<ExecutionInfo>, EngineError> {
1087 Err(EngineError::Unavailable {
1088 op: "read_execution_info",
1089 })
1090 }
1091
1092 /// RFC-017 Stage E2: narrow `public_state` read used by the
1093 /// `GET /v1/executions/{id}/state` HTTP route. Returns `Ok(None)`
1094 /// when the execution is missing. Default `Unavailable`.
1095 #[cfg(feature = "core")]
1096 async fn read_execution_state(
1097 &self,
1098 _id: &ExecutionId,
1099 ) -> Result<Option<PublicState>, EngineError> {
1100 Err(EngineError::Unavailable {
1101 op: "read_execution_state",
1102 })
1103 }
1104
1105 // ── RFC-019 Stage A — Stream-cursor subscriptions ─────────────
1106 //
1107 // Four owner-adjudicated families (RFC-019 §Open Questions #5):
1108 // `lease_history`, `completion`, `signal_delivery`,
1109 // `instance_tags`. Each returns a `StreamSubscription`
1110 // (`Pin<Box<dyn Stream<Item = Result<StreamEvent, EngineError>> +
1111 // Send>>`); the consumer drives with `StreamExt::next`.
1112 //
1113 // The cursor is backend-opaque bytes; see
1114 // [`crate::stream_subscribe`] for the shared cursor codec + event
1115 // payload. All defaults return `EngineError::Unavailable` per
1116 // RFC-017 trait-growth conventions. Stage A ships Valkey
1117 // `subscribe_lease_history` + Postgres `subscribe_completion`;
1118 // the other six (family × backend) combinations stay `Unavailable`
1119 // and are tracked in the RFC-019 Stage B follow-up issues.
1120
1121 /// Subscribe to lease lifecycle events (expired / reclaimed /
1122 /// revoked) for the partition this backend is configured with.
1123 ///
1124 /// Cross-partition fan-out is consumer-side merge: subscribe
1125 /// per-partition backend instance and interleave on the read
1126 /// side. Yields
1127 /// `Err(EngineError::StreamDisconnected { cursor })` on backend
1128 /// disconnect; resume by calling this method again with the
1129 /// returned cursor.
1130 async fn subscribe_lease_history(
1131 &self,
1132 _cursor: crate::stream_subscribe::StreamCursor,
1133 ) -> Result<crate::stream_subscribe::StreamSubscription, EngineError> {
1134 Err(EngineError::Unavailable {
1135 op: "subscribe_lease_history",
1136 })
1137 }
1138
1139 /// Subscribe to completion events (terminal state transitions).
1140 /// Postgres: wraps the existing `ff_completion_event` outbox +
1141 /// LISTEN/NOTIFY machinery. Valkey: Stage B follow-up.
1142 async fn subscribe_completion(
1143 &self,
1144 _cursor: crate::stream_subscribe::StreamCursor,
1145 ) -> Result<crate::stream_subscribe::StreamSubscription, EngineError> {
1146 Err(EngineError::Unavailable {
1147 op: "subscribe_completion",
1148 })
1149 }
1150
1151 /// Subscribe to signal-delivery events (waitpoint arming /
1152 /// satisfied). Stage B follow-up.
1153 async fn subscribe_signal_delivery(
1154 &self,
1155 _cursor: crate::stream_subscribe::StreamCursor,
1156 ) -> Result<crate::stream_subscribe::StreamSubscription, EngineError> {
1157 Err(EngineError::Unavailable {
1158 op: "subscribe_signal_delivery",
1159 })
1160 }
1161
1162 /// Subscribe to instance-tag events (tag attached / cleared).
1163 /// Stage B follow-up.
1164 async fn subscribe_instance_tags(
1165 &self,
1166 _cursor: crate::stream_subscribe::StreamCursor,
1167 ) -> Result<crate::stream_subscribe::StreamSubscription, EngineError> {
1168 Err(EngineError::Unavailable {
1169 op: "subscribe_instance_tags",
1170 })
1171 }
1172}
1173
1174/// Object-safety assertion: `dyn EngineBackend` compiles iff every
1175/// method is dyn-compatible. Kept as a compile-time guard so a future
1176/// trait change that accidentally breaks dyn-safety fails the build
1177/// at this site rather than at every downstream `Arc<dyn
1178/// EngineBackend>` use.
1179#[allow(dead_code)]
1180fn _assert_dyn_compatible(_: &dyn EngineBackend) {}
1181
1182/// Polling interval for [`wait_for_flow_cancellation`]. Tight enough
1183/// that a local single-node cancel cascade observes `cancelled` within
1184/// one or two polls; slack enough that a `WaitIndefinite` caller does
1185/// not hammer `describe_flow` on a live cluster.
1186const CANCEL_WAIT_POLL_INTERVAL: Duration = Duration::from_millis(100);
1187
1188/// Defensive ceiling for [`CancelFlowWait::WaitIndefinite`] — if the
1189/// reconciler cascade has not converged in five minutes, something is
1190/// wedged and returning `Timeout` is strictly more useful than blocking
1191/// forever. RFC-012 §3.1.1 expects real-world cascades to finish within
1192/// `reconciler_interval + grace`, which is orders of magnitude below
1193/// this.
1194const CANCEL_WAIT_INDEFINITE_CEILING: Duration = Duration::from_secs(300);
1195
1196/// Poll `backend.describe_flow(flow_id)` until `public_flow_state` is
1197/// `"cancelled"` or `deadline` elapses.
1198///
1199/// Shared by every backend's `cancel_flow` trait impl that honours
1200/// [`CancelFlowWait::WaitTimeout`] / [`CancelFlowWait::WaitIndefinite`].
1201/// The underlying `cancel_flow` FCALL / SQL transaction flips the
1202/// flow-level state synchronously; member cancellations dispatch
1203/// asynchronously via the reconciler, which also flips
1204/// `public_flow_state` to `cancelled` once the cascade completes. This
1205/// helper waits for that terminal flip.
1206///
1207/// Returns:
1208/// * `Ok(())` once `public_flow_state = "cancelled"` is observed.
1209/// * `Err(EngineError::Timeout { op: "cancel_flow", elapsed })` when
1210/// `deadline` elapses first. `elapsed` is the wait budget (the
1211/// requested timeout), not wall-clock precision.
1212/// * `Err(e)` if `describe_flow` itself errors (propagated).
1213pub async fn wait_for_flow_cancellation<B: EngineBackend + ?Sized>(
1214 backend: &B,
1215 flow_id: &crate::types::FlowId,
1216 deadline: Duration,
1217) -> Result<(), EngineError> {
1218 let start = std::time::Instant::now();
1219 loop {
1220 match backend.describe_flow(flow_id).await? {
1221 Some(snap) if snap.public_flow_state == "cancelled" => return Ok(()),
1222 // `None` (flow removed) is also terminal from the caller's
1223 // perspective — nothing left to wait on.
1224 None => return Ok(()),
1225 Some(_) => {}
1226 }
1227 if start.elapsed() >= deadline {
1228 return Err(EngineError::Timeout {
1229 op: "cancel_flow",
1230 elapsed: deadline,
1231 });
1232 }
1233 tokio::time::sleep(CANCEL_WAIT_POLL_INTERVAL).await;
1234 }
1235}
1236
1237/// Convert a [`CancelFlowWait`] into the deadline passed to
1238/// [`wait_for_flow_cancellation`]. `NoWait` returns `None` — the caller
1239/// must skip the wait entirely.
1240pub fn cancel_flow_wait_deadline(wait: CancelFlowWait) -> Option<Duration> {
1241 // `CancelFlowWait` is `#[non_exhaustive]`; this match lives in the
1242 // defining crate so the exhaustiveness check keeps the compiler
1243 // honest. Future variants must be wired here explicitly.
1244 match wait {
1245 CancelFlowWait::NoWait => None,
1246 CancelFlowWait::WaitTimeout(d) => Some(d),
1247 CancelFlowWait::WaitIndefinite => Some(CANCEL_WAIT_INDEFINITE_CEILING),
1248 }
1249}
1250
1251#[cfg(test)]
1252mod tests {
1253 use super::*;
1254 use crate::capability::{Capability, CapabilityStatus};
1255
1256 /// A zero-state backend stub used to exercise the default
1257 /// `capabilities_matrix()` impl without pulling in a real
1258 /// transport. Only the default method is under test here; every
1259 /// other method is unreachable on this type.
1260 struct DefaultBackend;
1261
1262 #[async_trait]
1263 impl EngineBackend for DefaultBackend {
1264 async fn claim(
1265 &self,
1266 _lane: &LaneId,
1267 _capabilities: &CapabilitySet,
1268 _policy: ClaimPolicy,
1269 ) -> Result<Option<Handle>, EngineError> {
1270 unreachable!()
1271 }
1272 async fn renew(&self, _handle: &Handle) -> Result<LeaseRenewal, EngineError> {
1273 unreachable!()
1274 }
1275 async fn progress(
1276 &self,
1277 _handle: &Handle,
1278 _percent: Option<u8>,
1279 _message: Option<String>,
1280 ) -> Result<(), EngineError> {
1281 unreachable!()
1282 }
1283 async fn append_frame(
1284 &self,
1285 _handle: &Handle,
1286 _frame: Frame,
1287 ) -> Result<AppendFrameOutcome, EngineError> {
1288 unreachable!()
1289 }
1290 async fn complete(
1291 &self,
1292 _handle: &Handle,
1293 _payload: Option<Vec<u8>>,
1294 ) -> Result<(), EngineError> {
1295 unreachable!()
1296 }
1297 async fn fail(
1298 &self,
1299 _handle: &Handle,
1300 _reason: FailureReason,
1301 _classification: FailureClass,
1302 ) -> Result<FailOutcome, EngineError> {
1303 unreachable!()
1304 }
1305 async fn cancel(&self, _handle: &Handle, _reason: &str) -> Result<(), EngineError> {
1306 unreachable!()
1307 }
1308 async fn suspend(
1309 &self,
1310 _handle: &Handle,
1311 _args: SuspendArgs,
1312 ) -> Result<SuspendOutcome, EngineError> {
1313 unreachable!()
1314 }
1315 async fn create_waitpoint(
1316 &self,
1317 _handle: &Handle,
1318 _waitpoint_key: &str,
1319 _expires_in: Duration,
1320 ) -> Result<PendingWaitpoint, EngineError> {
1321 unreachable!()
1322 }
1323 async fn observe_signals(
1324 &self,
1325 _handle: &Handle,
1326 ) -> Result<Vec<ResumeSignal>, EngineError> {
1327 unreachable!()
1328 }
1329 async fn claim_from_reclaim(
1330 &self,
1331 _token: ReclaimToken,
1332 ) -> Result<Option<Handle>, EngineError> {
1333 unreachable!()
1334 }
1335 async fn delay(
1336 &self,
1337 _handle: &Handle,
1338 _delay_until: TimestampMs,
1339 ) -> Result<(), EngineError> {
1340 unreachable!()
1341 }
1342 async fn wait_children(&self, _handle: &Handle) -> Result<(), EngineError> {
1343 unreachable!()
1344 }
1345 async fn describe_execution(
1346 &self,
1347 _id: &ExecutionId,
1348 ) -> Result<Option<ExecutionSnapshot>, EngineError> {
1349 unreachable!()
1350 }
1351 async fn describe_flow(
1352 &self,
1353 _id: &FlowId,
1354 ) -> Result<Option<FlowSnapshot>, EngineError> {
1355 unreachable!()
1356 }
1357 #[cfg(feature = "core")]
1358 async fn list_edges(
1359 &self,
1360 _flow_id: &FlowId,
1361 _direction: EdgeDirection,
1362 ) -> Result<Vec<EdgeSnapshot>, EngineError> {
1363 unreachable!()
1364 }
1365 #[cfg(feature = "core")]
1366 async fn describe_edge(
1367 &self,
1368 _flow_id: &FlowId,
1369 _edge_id: &EdgeId,
1370 ) -> Result<Option<EdgeSnapshot>, EngineError> {
1371 unreachable!()
1372 }
1373 #[cfg(feature = "core")]
1374 async fn resolve_execution_flow_id(
1375 &self,
1376 _eid: &ExecutionId,
1377 ) -> Result<Option<FlowId>, EngineError> {
1378 unreachable!()
1379 }
1380 #[cfg(feature = "core")]
1381 async fn list_flows(
1382 &self,
1383 _partition: PartitionKey,
1384 _cursor: Option<FlowId>,
1385 _limit: usize,
1386 ) -> Result<ListFlowsPage, EngineError> {
1387 unreachable!()
1388 }
1389 #[cfg(feature = "core")]
1390 async fn list_lanes(
1391 &self,
1392 _cursor: Option<LaneId>,
1393 _limit: usize,
1394 ) -> Result<ListLanesPage, EngineError> {
1395 unreachable!()
1396 }
1397 #[cfg(feature = "core")]
1398 async fn list_suspended(
1399 &self,
1400 _partition: PartitionKey,
1401 _cursor: Option<ExecutionId>,
1402 _limit: usize,
1403 ) -> Result<ListSuspendedPage, EngineError> {
1404 unreachable!()
1405 }
1406 #[cfg(feature = "core")]
1407 async fn list_executions(
1408 &self,
1409 _partition: PartitionKey,
1410 _cursor: Option<ExecutionId>,
1411 _limit: usize,
1412 ) -> Result<ListExecutionsPage, EngineError> {
1413 unreachable!()
1414 }
1415 #[cfg(feature = "core")]
1416 async fn deliver_signal(
1417 &self,
1418 _args: DeliverSignalArgs,
1419 ) -> Result<DeliverSignalResult, EngineError> {
1420 unreachable!()
1421 }
1422 #[cfg(feature = "core")]
1423 async fn claim_resumed_execution(
1424 &self,
1425 _args: ClaimResumedExecutionArgs,
1426 ) -> Result<ClaimResumedExecutionResult, EngineError> {
1427 unreachable!()
1428 }
1429 async fn cancel_flow(
1430 &self,
1431 _id: &FlowId,
1432 _policy: CancelFlowPolicy,
1433 _wait: CancelFlowWait,
1434 ) -> Result<CancelFlowResult, EngineError> {
1435 unreachable!()
1436 }
1437 #[cfg(feature = "core")]
1438 async fn set_edge_group_policy(
1439 &self,
1440 _flow_id: &FlowId,
1441 _downstream_execution_id: &ExecutionId,
1442 _policy: crate::contracts::EdgeDependencyPolicy,
1443 ) -> Result<crate::contracts::SetEdgeGroupPolicyResult, EngineError> {
1444 unreachable!()
1445 }
1446 async fn report_usage(
1447 &self,
1448 _handle: &Handle,
1449 _budget: &BudgetId,
1450 _dimensions: crate::backend::UsageDimensions,
1451 ) -> Result<ReportUsageResult, EngineError> {
1452 unreachable!()
1453 }
1454 #[cfg(feature = "streaming")]
1455 async fn read_stream(
1456 &self,
1457 _execution_id: &ExecutionId,
1458 _attempt_index: AttemptIndex,
1459 _from: StreamCursor,
1460 _to: StreamCursor,
1461 _count_limit: u64,
1462 ) -> Result<StreamFrames, EngineError> {
1463 unreachable!()
1464 }
1465 #[cfg(feature = "streaming")]
1466 async fn tail_stream(
1467 &self,
1468 _execution_id: &ExecutionId,
1469 _attempt_index: AttemptIndex,
1470 _after: StreamCursor,
1471 _block_ms: u64,
1472 _count_limit: u64,
1473 _visibility: TailVisibility,
1474 ) -> Result<StreamFrames, EngineError> {
1475 unreachable!()
1476 }
1477 #[cfg(feature = "streaming")]
1478 async fn read_summary(
1479 &self,
1480 _execution_id: &ExecutionId,
1481 _attempt_index: AttemptIndex,
1482 ) -> Result<Option<SummaryDocument>, EngineError> {
1483 unreachable!()
1484 }
1485 }
1486
1487 /// The default `capabilities_matrix()` impl returns an empty
1488 /// matrix tagged `family = "unknown"` so pre-RFC-018 out-of-tree
1489 /// backends keep compiling and consumers can distinguish
1490 /// "backend predates RFC-018" from "backend reports concrete
1491 /// rows." Every concrete in-tree backend overrides.
1492 #[test]
1493 fn default_capabilities_matrix_is_unknown_family() {
1494 let b = DefaultBackend;
1495 let m = b.capabilities_matrix();
1496 assert_eq!(m.identity.family, "unknown");
1497 assert_eq!(
1498 m.identity.version,
1499 crate::capability::Version::new(0, 0, 0)
1500 );
1501 assert_eq!(m.identity.rfc017_stage, "unknown");
1502 assert!(m.caps.is_empty());
1503 // Any capability resolves to Unknown on a default matrix.
1504 assert_eq!(m.get(Capability::Ping), CapabilityStatus::Unknown);
1505 assert!(!m.supports(Capability::Ping));
1506 }
1507}