Skip to main content

ff_core/
engine_backend.rs

1//! The `EngineBackend` trait — abstracting FlowFabric's write surface.
2//!
3//! **RFC-012 Stage 1a:** this is the trait landing. The
4//! Valkey-backed impl lives in `ff-backend-valkey`; future backends
5//! (Postgres) add a sibling crate with their own impl. ff-sdk's
6//! `FlowFabricWorker` gains `connect_with(backend)` /
7//! `backend(&self)` accessors so consumers that want to bring their
8//! own backend (tests, future non-Valkey deployments) can hand one
9//! in. The hot-path migration of `ClaimedTask` / `FlowFabricWorker`
10//! to forward through the trait lands across Stages 1b-1d.
11//!
12//! # Object safety
13//!
14//! `EngineBackend` is object-safe: all methods are `async fn` behind
15//! `#[async_trait]` and take `&self`. Consumers can hold
16//! `Arc<dyn EngineBackend>` for heterogenous-backend deployments.
17//! The trait is `Send + Sync + 'static` per RFC-012 §4.1; every impl
18//! must honour that bound.
19//!
20//! # Error surface
21//!
22//! Every method returns [`Result<_, EngineError>`]. `EngineError`'s
23//! `Transport` variant carries a boxed `dyn Error + Send + Sync`;
24//! Valkey-backed transport faults box a
25//! `ff_script::error::ScriptError` (downcast via
26//! `ff_script::engine_error_ext::transport_script_ref`). Other
27//! backends box their native error type and set the `backend` tag
28//! accordingly.
29//!
30//! # Atomicity contract
31//!
32//! Per-op state transitions MUST be atomic (RFC-012 §3.4). On Valkey
33//! this is the single-FCALL-per-op property; on Postgres it is the
34//! per-transaction property. A backend that cannot honour atomicity
35//! for a given op either MUST NOT implement `EngineBackend` or MUST
36//! return `EngineError::Unavailable { op }` for the affected method.
37//!
38//! # Replay semantics
39//!
40//! `complete`, `fail`, `cancel`, `suspend`, `delay`, `wait_children`
41//! are idempotent under replay — calling twice with the same handle
42//! and args returns the same outcome (success on first call, typed
43//! `State` / `Contention` on subsequent calls where the fence triple
44//! no longer matches a live lease).
45
46use std::time::Duration;
47
48use async_trait::async_trait;
49
50use crate::backend::{
51    AppendFrameOutcome, CancelFlowPolicy, CancelFlowWait, CapabilitySet, ClaimPolicy,
52    FailOutcome, FailureClass, FailureReason, Frame, Handle, LeaseRenewal, PendingWaitpoint,
53    PrepareOutcome, ResumeSignal, ResumeToken,
54};
55// `SummaryDocument` and `TailVisibility` are referenced only inside
56// `#[cfg(feature = "streaming")]` trait methods below, so the imports
57// must be gated to avoid an unused-imports warning on the non-streaming
58// build.
59#[cfg(feature = "streaming")]
60use crate::backend::{SummaryDocument, TailVisibility};
61use crate::contracts::{
62    CancelFlowResult, ExecutionContext, ExecutionSnapshot, FlowSnapshot, IssueReclaimGrantArgs,
63    IssueReclaimGrantOutcome, ReclaimExecutionArgs, ReclaimExecutionOutcome, ReportUsageResult,
64    RotateWaitpointHmacSecretAllArgs, RotateWaitpointHmacSecretAllResult, SeedOutcome,
65    SeedWaitpointHmacSecretArgs, SuspendArgs, SuspendOutcome,
66};
67#[cfg(feature = "core")]
68use crate::contracts::{
69    AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
70    ApplyDependencyToChildResult, BudgetStatus, CancelExecutionArgs, CancelExecutionResult,
71    CancelFlowArgs, ChangePriorityArgs, ChangePriorityResult, ClaimExecutionArgs,
72    ClaimExecutionResult, ClaimForWorkerArgs, ClaimForWorkerOutcome, ClaimResumedExecutionArgs,
73    ClaimResumedExecutionResult,
74    BlockRouteArgs, BlockRouteOutcome, CheckAdmissionArgs, CheckAdmissionResult,
75    ClaimGrantOutcome,
76    CompleteExecutionArgs, CompleteExecutionResult, CreateBudgetArgs, CreateBudgetResult,
77    CreateExecutionArgs, CreateExecutionResult, CreateFlowArgs, CreateFlowResult,
78    CreateQuotaPolicyArgs, CreateQuotaPolicyResult,
79    DeliverApprovalSignalArgs, DeliverSignalArgs, DeliverSignalResult, EdgeDirection, EdgeSnapshot,
80    EvaluateFlowEligibilityArgs, EvaluateFlowEligibilityResult, ExecutionInfo,
81    FailExecutionArgs, FailExecutionResult,
82    IssueClaimGrantArgs, IssueClaimGrantOutcome, IssueGrantAndClaimArgs,
83    RecordSpendArgs, ReleaseBudgetArgs, ScanEligibleArgs,
84    ListExecutionsPage, ListFlowsPage, ListLanesPage, ListPendingWaitpointsArgs,
85    ListPendingWaitpointsResult, ListSuspendedPage, RenewLeaseArgs, RenewLeaseResult,
86    ReplayExecutionArgs, ReplayExecutionResult,
87    ReportUsageAdminArgs, ResetBudgetArgs, ResetBudgetResult, ResumeExecutionArgs,
88    ResumeExecutionResult, RevokeLeaseArgs, RevokeLeaseResult,
89    StageDependencyEdgeArgs, StageDependencyEdgeResult,
90};
91#[cfg(feature = "core")]
92use crate::state::PublicState;
93#[cfg(feature = "core")]
94use crate::partition::PartitionKey;
95#[cfg(feature = "streaming")]
96use crate::contracts::{StreamCursor, StreamFrames};
97use crate::engine_error::EngineError;
98#[cfg(feature = "core")]
99use crate::types::EdgeId;
100#[cfg(feature = "core")]
101use crate::types::WaitpointId;
102use crate::types::{AttemptIndex, BudgetId, ExecutionId, FlowId, LaneId, LeaseFence, TimestampMs};
103
104/// The engine write surface — a single trait a backend implementation
105/// honours to serve a `FlowFabricWorker`.
106///
107/// See RFC-012 §3.1 for the inventory rationale and §3.3 for the
108/// type-level shape. 16 methods (Round-7 added `create_waitpoint`;
109/// `append_frame` return widened; `report_usage` return replaced —
110/// RFC-012 §R7). Issue #150 added the two trigger-surface methods
111/// (`deliver_signal` / `claim_resumed_execution`).
112///
113/// # Note on `complete` payload shape
114///
115/// The RFC §3.3 sketch uses `Option<Bytes>`; the Stage 1a trait uses
116/// `Option<Vec<u8>>` to match the existing
117/// `ff_sdk::ClaimedTask::complete` signature and avoid adding a
118/// `bytes` public-type dep for zero consumer benefit. Round-4 §7.17
119/// resolved the payload container debate to `Box<[u8]>` in the
120/// public type (see `HandleOpaque`); `Option<Vec<u8>>` is the
121/// zero-churn choice consistent with today's code. Consumers that
122/// need `&[u8]` can borrow via `.as_deref()` on the Option.
123#[async_trait]
124pub trait EngineBackend: Send + Sync + 'static {
125    // ── Claim + lifecycle ──
126
127    /// Fresh-work claim. Returns `Ok(None)` when no work is currently
128    /// available; `Err` only on transport or input-validation faults.
129    async fn claim(
130        &self,
131        lane: &LaneId,
132        capabilities: &CapabilitySet,
133        policy: ClaimPolicy,
134    ) -> Result<Option<Handle>, EngineError>;
135
136    /// Renew a held lease. Returns the updated expiry + epoch on
137    /// success; typed `State::StaleLease` / `State::LeaseExpired`
138    /// when the lease has been stolen or timed out.
139    async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError>;
140
141    /// Numeric-progress heartbeat.
142    ///
143    /// Writes scalar `progress_percent` / `progress_message` fields on
144    /// `exec_core`; each call overwrites the previous value. This does
145    /// NOT append to the output stream — stream-frame producers must use
146    /// [`append_frame`](Self::append_frame) instead.
147    async fn progress(
148        &self,
149        handle: &Handle,
150        percent: Option<u8>,
151        message: Option<String>,
152    ) -> Result<(), EngineError>;
153
154    /// Append one stream frame. Distinct from [`progress`](Self::progress)
155    /// per RFC-012 §3.1.1 K#6. Returns the backend-assigned stream entry
156    /// id and post-append frame count (RFC-012 §R7.2.1).
157    ///
158    /// Stream-frame producers (arbitrary `frame_type` + payload, consumed
159    /// via the read/tail surfaces) MUST use this method rather than
160    /// [`progress`](Self::progress); the latter updates scalar fields on
161    /// `exec_core` and is invisible to stream consumers.
162    async fn append_frame(
163        &self,
164        handle: &Handle,
165        frame: Frame,
166    ) -> Result<AppendFrameOutcome, EngineError>;
167
168    /// Terminal success. Borrows `handle` (round-4 M-D2) so callers
169    /// can retry under `EngineError::Transport` without losing the
170    /// cookie. Payload is `Option<Vec<u8>>` per the note above.
171    async fn complete(&self, handle: &Handle, payload: Option<Vec<u8>>) -> Result<(), EngineError>;
172
173    /// Terminal failure with classification. Returns [`FailOutcome`]
174    /// so the caller learns whether a retry was scheduled.
175    async fn fail(
176        &self,
177        handle: &Handle,
178        reason: FailureReason,
179        classification: FailureClass,
180    ) -> Result<FailOutcome, EngineError>;
181
182    /// Cooperative cancel by the worker holding the lease.
183    async fn cancel(&self, handle: &Handle, reason: &str) -> Result<(), EngineError>;
184
185    /// Suspend the execution awaiting a typed resume condition
186    /// (RFC-013 Stage 1d).
187    ///
188    /// Borrows `handle` (round-4 M-D2). Terminal-looking behaviour is
189    /// expressed through [`SuspendOutcome`]:
190    ///
191    /// * [`SuspendOutcome::Suspended`] — the pre-suspend handle is
192    ///   logically invalidated; the fresh `HandleKind::Suspended`
193    ///   handle inside the variant supersedes it. Runtime enforcement
194    ///   via the fence triple: subsequent ops against the stale handle
195    ///   surface as `Contention(LeaseConflict)`.
196    /// * [`SuspendOutcome::AlreadySatisfied`] — buffered signals on a
197    ///   pending waitpoint already matched the resume condition at
198    ///   suspension time. The lease is NOT released; the caller's
199    ///   pre-suspend handle remains valid.
200    ///
201    /// See RFC-013 §2 for the type shapes, §3 for the replay /
202    /// idempotency contract, §4 for the error taxonomy.
203    async fn suspend(
204        &self,
205        handle: &Handle,
206        args: SuspendArgs,
207    ) -> Result<SuspendOutcome, EngineError>;
208
209    /// Suspend by execution id + lease fence triple, for service-layer
210    /// callers that hold a run record / lease-claim descriptor but no
211    /// worker [`Handle`] (cairn issue #322).
212    ///
213    /// Semantics mirror [`Self::suspend`] exactly — the same
214    /// [`SuspendArgs`] validation, the same [`SuspendOutcome`]
215    /// lifecycle, the same RFC-013 §3 dedup / replay contract. The
216    /// only difference is the fencing source: instead of the
217    /// `(lease_id, lease_epoch, attempt_id)` fields embedded in a
218    /// `Handle`, the backend fences against the triple passed directly.
219    /// Attempt-index, lane, and worker-instance metadata that
220    /// [`Self::suspend`] reads from the handle payload are recovered
221    /// from the backend's authoritative execution record (Valkey:
222    /// `exec_core` HGETs; Postgres: `ff_attempt` row lookup).
223    ///
224    /// The default impl returns [`EngineError::Unavailable`] so
225    /// existing backend impls remain non-breaking. Production backends
226    /// (Valkey, Postgres) override.
227    async fn suspend_by_triple(
228        &self,
229        exec_id: ExecutionId,
230        triple: LeaseFence,
231        args: SuspendArgs,
232    ) -> Result<SuspendOutcome, EngineError> {
233        let _ = (exec_id, triple, args);
234        Err(EngineError::Unavailable {
235            op: "suspend_by_triple",
236        })
237    }
238
239    /// Issue a pending waitpoint for future signal delivery.
240    ///
241    /// Waitpoints have two states in the Valkey wire contract:
242    /// **pending** (token issued, not yet backing a suspension) and
243    /// **active** (bound to a suspension). This method creates a
244    /// waitpoint in the **pending** state. A later `suspend` call
245    /// transitions a pending waitpoint to active (see Lua
246    /// `use_pending_waitpoint` ARGV flag at
247    /// `flowfabric.lua:3603,3641,3690`) — or, if buffered signals
248    /// already satisfy its condition, the suspend call returns
249    /// `SuspendOutcome::AlreadySatisfied` and the waitpoint activates
250    /// without ever releasing the lease.
251    ///
252    /// Pending-waitpoint expiry is a first-class terminal error on
253    /// the wire (`PendingWaitpointExpired` at
254    /// `ff-script/src/error.rs:170,403-408`). The attempt retains its
255    /// lease while the waitpoint is pending; signals delivered to
256    /// this waitpoint are buffered server-side (RFC-012 §R7.2.2).
257    async fn create_waitpoint(
258        &self,
259        handle: &Handle,
260        waitpoint_key: &str,
261        expires_in: Duration,
262    ) -> Result<PendingWaitpoint, EngineError>;
263
264    /// Read the HMAC token stored on a waitpoint record, keyed by
265    /// `(partition, waitpoint_id)`.
266    ///
267    /// Returns `Ok(Some(token))` when the waitpoint exists and carries
268    /// a token, `Ok(None)` when the waitpoint does not exist or no
269    /// token field is present. A missing waitpoint is not an error —
270    /// signals can legitimately arrive after a waitpoint has been
271    /// consumed or expired, and the signal-bridge authenticates on the
272    /// presence of a matching token, not on the waitpoint's liveness.
273    ///
274    /// # Use case
275    ///
276    /// Control-plane signal delivery (cairn signal-bridge): at
277    /// signal-resume time the bridge reads the token off the
278    /// waitpoint hash / row to authenticate the resume request it
279    /// subsequently issues. Previously implemented as direct
280    /// `ferriskey::Client::hget(waitpoint_key, "waitpoint_token")` —
281    /// Valkey-only. This method routes the read through the trait so
282    /// the pattern works on Postgres + SQLite as well.
283    ///
284    /// # Per-backend shape
285    ///
286    /// * **Valkey** — `HGET ff:wp:<tag>:<waitpoint_id> waitpoint_token`
287    ///   on the waitpoint's partition. Empty string / missing field
288    ///   maps to `None`.
289    /// * **Postgres** — `SELECT token FROM ff_waitpoint_pending
290    ///   WHERE partition_key = $1 AND waitpoint_id = $2 LIMIT 1`.
291    ///   Row-absent → `None`; empty `token` → `None`.
292    /// * **SQLite** — same shape as Postgres.
293    ///
294    /// The `partition` argument is the opaque [`PartitionKey`]
295    /// produced by FlowFabric (typically extracted from the
296    /// `Handle` / `ResumeToken` the waitpoint was minted against).
297    ///
298    /// # Default impl
299    ///
300    /// Returns [`EngineError::Unavailable`] with
301    /// `op = "read_waitpoint_token"` so out-of-tree backends and
302    /// in-tree backends not yet overriding this method continue to
303    /// compile. Mirrors the precedent used by
304    /// [`Self::issue_reclaim_grant`] / [`Self::reclaim_execution`].
305    #[cfg(feature = "core")]
306    async fn read_waitpoint_token(
307        &self,
308        _partition: PartitionKey,
309        _waitpoint_id: &WaitpointId,
310    ) -> Result<Option<String>, EngineError> {
311        Err(EngineError::Unavailable {
312            op: "read_waitpoint_token",
313        })
314    }
315
316    /// Non-mutating observation of signals that satisfied the handle's
317    /// resume condition.
318    async fn observe_signals(&self, handle: &Handle) -> Result<Vec<ResumeSignal>, EngineError>;
319
320    /// Consume a resume grant (via [`ResumeToken`]) to mint a
321    /// resumed-kind handle. Routes to `ff_claim_resumed_execution` on
322    /// Valkey / the epoch-bump reconciler on PG/SQLite. Returns
323    /// `Ok(None)` when the grant's target execution is no longer
324    /// resumable (already reclaimed, terminal, etc.).
325    ///
326    /// **Renamed from `claim_from_reclaim` (RFC-024 PR-B+C).** The
327    /// pre-rename name advertised "reclaim" but the semantic has
328    /// always been resume-after-suspend. The new lease-reclaim path
329    /// lives on [`Self::reclaim_execution`].
330    async fn claim_from_resume_grant(
331        &self,
332        token: ResumeToken,
333    ) -> Result<Option<Handle>, EngineError>;
334
335    /// Issue a lease-reclaim grant (RFC-024 §3.2). Admits executions
336    /// in `lease_expired_reclaimable` or `lease_revoked` state to the
337    /// reclaim path; the returned [`IssueReclaimGrantOutcome::Granted`]
338    /// carries a [`crate::contracts::ReclaimGrant`] which is then fed
339    /// to [`Self::reclaim_execution`] to mint a fresh attempt.
340    ///
341    /// Default impl returns [`EngineError::Unavailable`] — PR-D (PG),
342    /// PR-E (SQLite), and PR-F (Valkey) override with real bodies.
343    async fn issue_reclaim_grant(
344        &self,
345        _args: IssueReclaimGrantArgs,
346    ) -> Result<IssueReclaimGrantOutcome, EngineError> {
347        Err(EngineError::Unavailable {
348            op: "issue_reclaim_grant",
349        })
350    }
351
352    /// Consume a [`crate::contracts::ReclaimGrant`] to mint a fresh
353    /// attempt for a previously lease-expired / lease-revoked
354    /// execution (RFC-024 §3.2). Creates a new attempt row, bumps the
355    /// execution's `lease_reclaim_count`, and mints a
356    /// [`crate::backend::HandleKind::Reclaimed`] handle.
357    ///
358    /// Default impl returns [`EngineError::Unavailable`] — PR-D (PG),
359    /// PR-E (SQLite), and PR-F (Valkey) override with real bodies.
360    async fn reclaim_execution(
361        &self,
362        _args: ReclaimExecutionArgs,
363    ) -> Result<ReclaimExecutionOutcome, EngineError> {
364        Err(EngineError::Unavailable {
365            op: "reclaim_execution",
366        })
367    }
368
369    // Round-5 amendment: lease-releasing peers of `suspend`.
370
371    /// Park the execution until `delay_until`, releasing the lease.
372    async fn delay(&self, handle: &Handle, delay_until: TimestampMs) -> Result<(), EngineError>;
373
374    /// Mark the execution as waiting for its child flow to complete,
375    /// releasing the lease.
376    async fn wait_children(&self, handle: &Handle) -> Result<(), EngineError>;
377
378    // ── Read / admin ──
379
380    /// Snapshot an execution by id. `Ok(None)` ⇒ no such execution.
381    async fn describe_execution(
382        &self,
383        id: &ExecutionId,
384    ) -> Result<Option<ExecutionSnapshot>, EngineError>;
385
386    /// Point-read of the execution-scoped `(input_payload,
387    /// execution_kind, tags)` bundle used by the SDK worker when
388    /// assembling a `ClaimedTask` (see `ff_sdk::ClaimedTask`) after a
389    /// successful claim.
390    ///
391    /// No default impl — every `EngineBackend` must answer this
392    /// explicitly. Distinct from [`Self::describe_execution`]
393    /// (read-model projection) because the SDK needs the raw payload
394    /// bytes + kind + tags immediately post-claim, and the snapshot
395    /// projection deliberately omits the payload bytes.
396    ///
397    /// Per-backend shape:
398    ///
399    /// * **Valkey** — pipelined `GET :payload` + `HGETALL :core`
400    ///   + `HGETALL :tags` on the execution's partition (same pattern
401    ///   as [`Self::describe_execution`]).
402    /// * **Postgres** — single `SELECT payload, raw_fields` on
403    ///   `ff_exec_core` keyed by `(partition_key, execution_id)`;
404    ///   `execution_kind` + `tags` live in `raw_fields` JSONB.
405    /// * **SQLite** — identical shape to Postgres.
406    ///
407    /// Returns [`EngineError::Validation { kind: ValidationKind::InvalidInput, .. }`](crate::engine_error::EngineError::Validation)
408    /// when the execution does not exist — the SDK worker only calls
409    /// this after a successful claim, so a missing row is a loud
410    /// storage-tier invariant violation rather than a routine `Ok(None)`.
411    async fn read_execution_context(
412        &self,
413        execution_id: &ExecutionId,
414    ) -> Result<ExecutionContext, EngineError>;
415
416    /// Point-read of the execution's current attempt-index **pointer**
417    /// — the index of the currently-leased attempt row.
418    ///
419    /// Distinct from [`Self::read_total_attempt_count`]: this method
420    /// names the attempt that *already exists* (pointer), whereas
421    /// `read_total_attempt_count` is the monotonic claim counter used
422    /// to compute the next fresh attempt index. See the sibling's
423    /// rustdoc for the retry-path scenario that motivates the split.
424    ///
425    /// Used on the SDK worker's `claim_from_resume_grant` path —
426    /// specifically the private `claim_resumed_execution` helper —
427    /// immediately before dispatching [`Self::claim_resumed_execution`].
428    /// The returned index is fed into
429    /// [`ClaimResumedExecutionArgs::current_attempt_index`](crate::contracts::ClaimResumedExecutionArgs)
430    /// so the backend's script / transaction targets the correct
431    /// existing attempt row (KEYS[6] on Valkey; `ff_attempt` PK tuple
432    /// on PG/SQLite).
433    ///
434    /// Per-backend shape:
435    ///
436    /// * **Valkey** — `HGET {exec}:core current_attempt_index` on the
437    ///   execution's partition. Single command. Both the
438    ///   **missing-field** case (`exec_core` present but
439    ///   `current_attempt_index` absent or empty-string, i.e. pre-claim
440    ///   state) **and** the **missing-row** case (no `exec_core` hash
441    ///   at all) read back as `AttemptIndex(0)`. This preserves the
442    ///   pre-PR-3 inline-`HGET` semantic and is safe because Valkey's
443    ///   happy path requires `exec_core` to exist before this method
444    ///   is reached — the SDK only calls `read_current_attempt_index`
445    ///   post-grant, and grant issuance is gated on `exec_core`
446    ///   presence. A genuinely absent row would surface as the proper
447    ///   business-logic error (`NotAResumedExecution` /
448    ///   `ExecutionNotLeaseable`) on the downstream FCALL.
449    /// * **Postgres** — `SELECT attempt_index FROM ff_exec_core
450    ///   WHERE partition_key = $1 AND execution_id = $2`. The column
451    ///   is `NOT NULL DEFAULT 0` so a pre-claim row reads back as `0`
452    ///   (matching Valkey's missing-field case). **Missing row**
453    ///   surfaces as [`EngineError::Validation { kind:
454    ///   ValidationKind::InvalidInput, .. }`](crate::engine_error::EngineError::Validation)
455    ///   — diverges from Valkey's missing-row `→ 0` mapping.
456    /// * **SQLite** — `SELECT attempt_index FROM ff_exec_core
457    ///   WHERE partition_key = ? AND execution_id = ?`; identical
458    ///   semantics to Postgres (missing-row → `InvalidInput`).
459    ///
460    /// **Cross-backend asymmetry on missing row is intentional.** The
461    /// SDK happy path never observes it (grant issuance on Valkey
462    /// requires `exec_core`, and PG/SQLite currently return
463    /// `Unavailable` from `claim_from_grant` per
464    /// `project_claim_from_grant_pg_sqlite_gap.md`). Consumers writing
465    /// backend-agnostic tooling against this method directly must
466    /// treat the missing-row case as backend-dependent — match on
467    /// `InvalidInput` for PG/SQLite, and treat an unexpected `0` as
468    /// the Valkey equivalent signal.
469    ///
470    /// The default impl returns [`EngineError::Unavailable`] so the
471    /// trait addition is non-breaking for out-of-tree backends (same
472    /// precedent as [`Self::read_execution_context`] landing in v0.12
473    /// PR-1).
474    async fn read_current_attempt_index(
475        &self,
476        _execution_id: &ExecutionId,
477    ) -> Result<AttemptIndex, EngineError> {
478        Err(EngineError::Unavailable {
479            op: "read_current_attempt_index",
480        })
481    }
482
483    /// Point-read of the execution's **total attempt counter** — the
484    /// monotonic count of claims that have ever fired against this
485    /// execution (including the in-flight one once claimed).
486    ///
487    /// Used on the SDK worker's `claim_from_grant` / `claim_execution`
488    /// path — the next attempt-index for a fresh claim is this
489    /// counter's current value (so `1` on the second retry after the
490    /// first attempt failed terminally). This is semantically distinct
491    /// from [`Self::read_current_attempt_index`], which is a *pointer*
492    /// at the currently-leased attempt row and is only meaningful on
493    /// the `claim_from_resume_grant` path (where a live attempt already
494    /// exists and we want to re-seat its lease rather than mint a new
495    /// attempt row).
496    ///
497    /// Reading the pointer on the `claim_from_grant` path was a live
498    /// bug: on the retry-of-a-retry scenario the pointer still named
499    /// the *previous* terminal-failed attempt, so the newly-minted
500    /// attempt collided with it (Valkey KEYS[6]) or mis-targeted the
501    /// PG/SQLite `ff_attempt` PK tuple. This method fixes that by
502    /// reading the counter that Lua 5920 / PG `ff_claim_execution` /
503    /// SQLite `claim_impl` all already consult when computing the
504    /// next attempt index.
505    ///
506    /// Per-backend shape:
507    ///
508    /// * **Valkey** — `HGET {exec}:core total_attempt_count` on the
509    ///   execution's partition. Single command; pre-claim read (field
510    ///   absent or empty) maps to `0`.
511    /// * **Postgres** — `SELECT raw_fields->>'total_attempt_count'
512    ///   FROM ff_exec_core WHERE (partition_key, execution_id) = ...`.
513    ///   The field lives in the JSONB `raw_fields` bag rather than a
514    ///   dedicated column (mirrors how `create_execution_impl` seeds
515    ///   it on row creation). Missing row → `InvalidInput`; missing
516    ///   field → `0`.
517    /// * **SQLite** — `SELECT CAST(json_extract(raw_fields,
518    ///   '$.total_attempt_count') AS INTEGER) FROM ff_exec_core
519    ///   WHERE ...`. Same JSON-in-`raw_fields` shape as PG; uses the
520    ///   same `json_extract` idiom already employed in
521    ///   `ff-backend-sqlite/src/queries/operator.rs` for replay_count.
522    ///
523    /// The default impl returns [`EngineError::Unavailable`] so the
524    /// trait addition is non-breaking for out-of-tree backends (same
525    /// precedent as [`Self::read_current_attempt_index`] landing in
526    /// v0.12 PR-3).
527    async fn read_total_attempt_count(
528        &self,
529        _execution_id: &ExecutionId,
530    ) -> Result<AttemptIndex, EngineError> {
531        Err(EngineError::Unavailable {
532            op: "read_total_attempt_count",
533        })
534    }
535
536    /// Snapshot a flow by id. `Ok(None)` ⇒ no such flow.
537    async fn describe_flow(&self, id: &FlowId) -> Result<Option<FlowSnapshot>, EngineError>;
538
539    // ── Namespaced tag point-writes / reads (issue #433) ──
540
541    /// Set a single namespaced tag on an execution. Tag `key` MUST match
542    /// the reserved caller-namespace pattern `^[a-z][a-z0-9_]*\.[a-z0-9_][a-z0-9_.]*$` —
543    /// i.e. `<caller>.<field>` — or the call returns
544    /// [`EngineError::Validation { kind: ValidationKind::InvalidInput, .. }`](crate::engine_error::EngineError::Validation)
545    /// with the offending key in `detail`. `value` is arbitrary UTF-8.
546    ///
547    /// The namespace prefix is carried inline in `key` (e.g.
548    /// `"cairn.session_id"`) — there is no separate `namespace` arg.
549    /// This matches the existing `ff_set_execution_tags` wire shape and
550    /// the flow-tag projection in [`ExecutionSnapshot::tags`].
551    ///
552    /// Validation is performed by each overriding backend impl via
553    /// [`validate_tag_key`] **before** the wire hop so PG / SQLite /
554    /// Valkey reject the same set of keys. The default trait impl
555    /// returns [`EngineError::Unavailable`] without running validation
556    /// — there is no meaningful storage to validate against on an
557    /// unsupported backend, and surfacing `Unavailable` before
558    /// `Validation` matches the precedence used elsewhere on the trait.
559    /// Backends MAY additionally validate on the storage tier (Valkey's
560    /// Lua path does, with a more permissive prefix-only check).
561    ///
562    /// Per-backend shape:
563    ///
564    /// * **Valkey** — `ff_set_execution_tags` FCALL with a single
565    ///   `{key → value}` pair. Routes through the existing Lua
566    ///   contract (no new wire format).
567    /// * **Postgres** — `UPDATE ff_exec_core SET raw_fields = jsonb_set(
568    ///   coalesce(raw_fields, '{}'::jsonb), '{tags,<key>}', to_jsonb($value))
569    ///   WHERE (partition_key, execution_id) = ...`. Same storage shape
570    ///   read by [`Self::describe_execution`] / [`Self::read_execution_context`].
571    /// * **SQLite** — `UPDATE ff_exec_core SET raw_fields = json_set(
572    ///   coalesce(raw_fields, '{}'), '$.tags."<key>"', $value) WHERE ...`.
573    ///   The key is quoted in the JSON path so dots inside the
574    ///   namespaced key (e.g. `cairn.session_id`) are treated as a
575    ///   single literal member name rather than JSON-path separators —
576    ///   yielding the same flat `raw_fields.tags` shape as PG.
577    ///
578    /// Missing execution surfaces as
579    /// [`EngineError::NotFound { entity: "execution" }`](crate::engine_error::EngineError::NotFound)
580    /// — matches the Valkey FCALL's `execution_not_found` mapping and
581    /// the existing `ScriptError::ExecutionNotFound` → `EngineError`
582    /// conversion (`ff_script::engine_error_ext`).
583    ///
584    /// The default impl returns [`EngineError::Unavailable`] so the
585    /// trait addition is non-breaking for out-of-tree backends.
586    async fn set_execution_tag(
587        &self,
588        _execution_id: &ExecutionId,
589        _key: &str,
590        _value: &str,
591    ) -> Result<(), EngineError> {
592        Err(EngineError::Unavailable {
593            op: "set_execution_tag",
594        })
595    }
596
597    /// Set a single namespaced tag on a flow. Same namespace rule as
598    /// [`Self::set_execution_tag`]: `key` MUST match
599    /// `^[a-z][a-z0-9_]*\.[a-z0-9_][a-z0-9_.]*$`.
600    ///
601    /// Per-backend shape:
602    ///
603    /// * **Valkey** — `ff_set_flow_tags` FCALL with a single pair.
604    ///   Tags land on the dedicated `ff:flow:{fp:N}:<flow_id>:tags`
605    ///   hash, not on the `flow_core` hash (diverges from the
606    ///   execution shape — execution tags live on `ff:exec:...:tags`
607    ///   by the same split). **Lazy migration on first write**: the
608    ///   Lua (`ff_script::flowfabric.lua`, `ff_set_flow_tags`) scans
609    ///   `flow_core` once per flow for pre-58.4 inline namespaced
610    ///   fields (anything matching `^[a-z][a-z0-9_]*\.`), HSETs them
611    ///   onto `:tags`, HDELs them from `flow_core`, and stamps
612    ///   `tags_migrated=1` on `flow_core` so subsequent calls
613    ///   short-circuit to O(1). This heals flows created before
614    ///   RFC-058.4 landed; well-formed flows pay the migration cost
615    ///   only on their very first tag write. Callers MUST read tags
616    ///   via [`Self::get_flow_tag`] (`HGET :tags <key>`) — direct
617    ///   `HGETALL` against `flow_core` will not see post-migration
618    ///   values.
619    ///
620    ///   **Cross-backend parity caveat on `describe_flow`**: the
621    ///   pre-existing `ValkeyBackend::describe_flow` /
622    ///   `FlowSnapshot::tags` read path snapshots `flow_core` fields
623    ///   only and does NOT today merge the `:tags` sub-hash, whereas
624    ///   Postgres `describe_flow` DOES surface flow tags via
625    ///   `ff_backend_postgres::flow::extract_tags` (which reads them
626    ///   off `raw_fields` — the same store `set_flow_tag` writes on
627    ///   PG). Trait consumers MUST NOT assume a tag written here
628    ///   will be visible via `describe_flow` on every backend: on
629    ///   Valkey, callers that need the full tag set should
630    ///   complement the snapshot with per-key [`Self::get_flow_tag`]
631    ///   reads. Extending Valkey `describe_flow` to merge `:tags`
632    ///   is additive and out of scope for this trait addition.
633    /// * **Postgres** — `UPDATE ff_flow_core SET raw_fields =
634    ///   jsonb_set(..., '{<key>}', ...)` — flow tags are stored as
635    ///   top-level `raw_fields` keys (matches
636    ///   `ff_backend_postgres::flow::extract_tags`). No `tags` nesting
637    ///   on flows, which diverges from the execution shape.
638    /// * **SQLite** — mirrors PG: `UPDATE ff_flow_core SET raw_fields =
639    ///   json_set(..., '$."<key>"', $value) WHERE ...`. The key is
640    ///   quoted so the dotted namespaced key lands as a single flat
641    ///   top-level member of `raw_fields`.
642    ///
643    /// Missing flow surfaces as
644    /// [`EngineError::NotFound { entity: "flow" }`](crate::engine_error::EngineError::NotFound)
645    /// (matches the Valkey FCALL's `flow_not_found` mapping).
646    ///
647    /// The default impl returns [`EngineError::Unavailable`].
648    async fn set_flow_tag(
649        &self,
650        _flow_id: &FlowId,
651        _key: &str,
652        _value: &str,
653    ) -> Result<(), EngineError> {
654        Err(EngineError::Unavailable {
655            op: "set_flow_tag",
656        })
657    }
658
659    /// Read a single namespaced execution tag. Returns `Ok(None)` when
660    /// the tag is absent **or** the execution row does not exist —
661    /// the two cases are not distinguished on the read path. Callers
662    /// that need to distinguish should call [`Self::describe_execution`]
663    /// first (an `Ok(None)` from that method proves the execution is
664    /// absent). This matches Valkey's native `HGET` semantics and
665    /// keeps the read path at a single round-trip on every backend.
666    ///
667    /// `key` must pass [`validate_tag_key`] — a malformed key can
668    /// never be present in storage so the call short-circuits with
669    /// [`EngineError::Validation { kind: ValidationKind::InvalidInput, .. }`](crate::engine_error::EngineError::Validation)
670    /// rather than round-tripping.
671    ///
672    /// Per-backend shape:
673    ///
674    /// * **Valkey** — `HGET :tags <key>` on the execution's partition.
675    /// * **Postgres** — `SELECT raw_fields->'tags'->><key> FROM ff_exec_core
676    ///   WHERE ...` with `fetch_optional` → missing row collapses to `None`.
677    /// * **SQLite** — `SELECT json_extract(raw_fields, '$.tags."<key>"')
678    ///   FROM ff_exec_core WHERE ...` with the same collapse. The key is
679    ///   quoted in the JSON path so dotted namespaced keys resolve to
680    ///   the flat literal member written by `set_execution_tag`.
681    ///
682    /// The default impl returns [`EngineError::Unavailable`].
683    async fn get_execution_tag(
684        &self,
685        _execution_id: &ExecutionId,
686        _key: &str,
687    ) -> Result<Option<String>, EngineError> {
688        Err(EngineError::Unavailable {
689            op: "get_execution_tag",
690        })
691    }
692
693    /// Read an execution's `namespace` scalar. Returns `Ok(None)` when
694    /// the row is absent or the field is unset. Dedicated point-read
695    /// used by the scanner per-candidate filter (`should_skip_candidate`)
696    /// to preserve the 1-HGET cost contract documented in
697    /// `ff_engine::scanner::should_skip_candidate` — `describe_execution`
698    /// is heavier (HGETALL / full snapshot) and unnecessary when only
699    /// the namespace scalar is needed.
700    ///
701    /// Per-backend shape:
702    ///
703    /// * **Valkey** — `HGET :core namespace` on the execution's partition
704    ///   (single field read on the already-hot exec_core hash).
705    /// * **Postgres** — `SELECT raw_fields->>'namespace' FROM ff_exec_core
706    ///   WHERE partition_key = $1 AND execution_id = $2`.
707    /// * **SQLite** — `SELECT json_extract(raw_fields, '$.namespace')
708    ///   FROM ff_exec_core WHERE ...`.
709    ///
710    /// The default impl returns [`EngineError::Unavailable`].
711    async fn get_execution_namespace(
712        &self,
713        _execution_id: &ExecutionId,
714    ) -> Result<Option<String>, EngineError> {
715        Err(EngineError::Unavailable {
716            op: "get_execution_namespace",
717        })
718    }
719
720    /// Read a single namespaced flow tag. Returns `Ok(None)` when
721    /// the tag is absent **or** the flow row does not exist (same
722    /// collapse semantics as [`Self::get_execution_tag`]). Symmetry
723    /// partner — consumers like cairn read `cairn.session_id` off
724    /// flows for archival.
725    ///
726    /// `key` must pass [`validate_tag_key`].
727    ///
728    /// Per-backend shape:
729    ///
730    /// * **Valkey** — `HGET :tags <key>` on the flow's partition.
731    /// * **Postgres** — `SELECT raw_fields->><key> FROM ff_flow_core
732    ///   WHERE ...` (top-level `raw_fields` key, matches the flow-tag
733    ///   storage shape).
734    /// * **SQLite** — `SELECT json_extract(raw_fields, '$."<key>"')
735    ///   FROM ff_flow_core WHERE ...` (quoted key — see
736    ///   `set_flow_tag`).
737    ///
738    /// The default impl returns [`EngineError::Unavailable`].
739    async fn get_flow_tag(
740        &self,
741        _flow_id: &FlowId,
742        _key: &str,
743    ) -> Result<Option<String>, EngineError> {
744        Err(EngineError::Unavailable {
745            op: "get_flow_tag",
746        })
747    }
748
749    /// List dependency edges adjacent to an execution. Read-only; the
750    /// backend resolves the subject execution's flow, reads the
751    /// direction-specific adjacency SET, and decodes each member's
752    /// flow-scoped `edge:<edge_id>` hash.
753    ///
754    /// Returns an empty `Vec` when the subject has no edges on the
755    /// requested side — including standalone executions (no owning
756    /// flow). Ordering is unspecified: the underlying adjacency SET
757    /// is an unordered SMEMBERS read. Callers that need deterministic
758    /// order should sort by [`EdgeSnapshot::edge_id`] /
759    /// [`EdgeSnapshot::created_at`] themselves.
760    ///
761    /// Parse failures on the edge hash surface as
762    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]
763    /// — unknown fields, missing required fields, endpoint mismatches
764    /// against the adjacency SET all fail loud rather than silently
765    /// returning partial results.
766    ///
767    /// Gated on the `core` feature — edge reads are part of the
768    /// minimal engine surface a Postgres-style backend must honour.
769    ///
770    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
771    #[cfg(feature = "core")]
772    async fn list_edges(
773        &self,
774        _flow_id: &FlowId,
775        _direction: EdgeDirection,
776    ) -> Result<Vec<EdgeSnapshot>, EngineError> {
777        Err(EngineError::Unavailable { op: "list_edges" })
778    }
779
780    /// Snapshot a single dependency edge by its owning flow + edge id.
781    ///
782    /// `Ok(None)` when the edge hash is absent (never staged, or
783    /// staged under a different flow than `flow_id`). Parse failures
784    /// on a present edge hash surface as
785    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]
786    /// — the stored `flow_id` field is cross-checked against the
787    /// caller's expected `flow_id` so a wrong-key read fails loud
788    /// rather than returning an unrelated edge.
789    ///
790    /// Gated on the `core` feature — single-edge reads are part of
791    /// the minimal snapshot surface an alternate backend must honour
792    /// alongside [`Self::describe_execution`] / [`Self::describe_flow`]
793    /// / [`Self::list_edges`].
794    ///
795    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
796    #[cfg(feature = "core")]
797    async fn describe_edge(
798        &self,
799        _flow_id: &FlowId,
800        _edge_id: &EdgeId,
801    ) -> Result<Option<EdgeSnapshot>, EngineError> {
802        Err(EngineError::Unavailable {
803            op: "describe_edge",
804        })
805    }
806
807    /// Resolve an execution's owning flow id, if any.
808    ///
809    /// `Ok(None)` when the execution's core record is absent or has
810    /// no associated flow (standalone execution). A present-but-
811    /// malformed `flow_id` field surfaces as
812    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`].
813    ///
814    /// Gated on the `core` feature. Used by ff-sdk's
815    /// `list_outgoing_edges` / `list_incoming_edges` to pivot from a
816    /// consumer-supplied `ExecutionId` to the `FlowId` required by
817    /// [`Self::list_edges`]. A Valkey backend serves this with a
818    /// single `HGET exec_core flow_id`; a Postgres backend serves it
819    /// with the equivalent single-column row lookup.
820    ///
821    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
822    #[cfg(feature = "core")]
823    async fn resolve_execution_flow_id(
824        &self,
825        _eid: &ExecutionId,
826    ) -> Result<Option<FlowId>, EngineError> {
827        Err(EngineError::Unavailable {
828            op: "resolve_execution_flow_id",
829        })
830    }
831
832    /// List flows on a partition with cursor-based pagination (issue
833    /// #185).
834    ///
835    /// Returns a [`ListFlowsPage`] of [`FlowSummary`](crate::contracts::FlowSummary)
836    /// rows ordered by `flow_id` (UUID byte-lexicographic). `cursor`
837    /// is `None` for the first page; callers forward the returned
838    /// `next_cursor` verbatim to continue iteration, and the listing
839    /// is exhausted when `next_cursor` is `None`. `limit` is the
840    /// maximum number of rows to return on this page — implementations
841    /// MAY return fewer (end of partition) but MUST NOT exceed it.
842    ///
843    /// Ordering rationale: flow ids are UUIDs, and both Valkey
844    /// (sort after-the-fact) and Postgres (`ORDER BY flow_id`) can
845    /// agree on byte-lexicographic order — the same order
846    /// `FlowId::to_string()` produces for canonical hyphenated UUIDs.
847    /// Mapping to `cursor > flow_id` keeps the contract backend-
848    /// independent.
849    ///
850    /// # Postgres implementation pattern
851    ///
852    /// A Postgres-backed implementation serves this directly with
853    ///
854    /// ```sql
855    /// SELECT flow_id, created_at_ms, public_flow_state
856    ///   FROM ff_flow
857    ///  WHERE partition_key = $1
858    ///    AND ($2::uuid IS NULL OR flow_id > $2)
859    ///  ORDER BY flow_id
860    ///  LIMIT $3 + 1;
861    /// ```
862    ///
863    /// — reading one extra row to decide whether `next_cursor` should
864    /// be set to the last row's `flow_id`. The Valkey implementation
865    /// maintains the `ff:idx:{fp:N}:flow_index` SET and performs the
866    /// sort + slice client-side (SMEMBERS then sort-by-UUID-bytes),
867    /// pipelining `HGETALL flow_core` for each row on the page.
868    ///
869    /// Gated on the `core` feature — flow listing is part of the
870    /// minimal engine surface a Postgres-style backend must honour.
871    #[cfg(feature = "core")]
872    async fn list_flows(
873        &self,
874        _partition: PartitionKey,
875        _cursor: Option<FlowId>,
876        _limit: usize,
877    ) -> Result<ListFlowsPage, EngineError> {
878        Err(EngineError::Unavailable { op: "list_flows" })
879    }
880
881    /// Enumerate registered lanes with cursor-based pagination.
882    ///
883    /// Lanes are global (not partition-scoped) — the backend serves
884    /// this from its lane registry and does NOT accept a
885    /// [`crate::partition::Partition`] argument. Results are sorted
886    /// by [`LaneId`] name so the ordering is stable across calls and
887    /// cursors address a deterministic position in the sort.
888    ///
889    /// * `cursor` — exclusive lower bound. `None` starts from the
890    ///   first lane. To continue a walk, pass the previous page's
891    ///   [`ListLanesPage::next_cursor`].
892    /// * `limit` — hard cap on the number of lanes returned in the
893    ///   page. Backends MAY round this down when the registry size
894    ///   is smaller; they MUST NOT return more than `limit`.
895    ///
896    /// [`ListLanesPage::next_cursor`] is `Some(last_lane_in_page)`
897    /// iff at least one more lane exists after the returned page,
898    /// and `None` on the final page. Callers loop until `next_cursor`
899    /// is `None` to read the full registry.
900    ///
901    /// Gated on the `core` feature — lane enumeration is part of the
902    /// minimal snapshot surface an alternate backend must honour
903    /// alongside [`Self::describe_flow`] / [`Self::list_edges`].
904    #[cfg(feature = "core")]
905    async fn list_lanes(
906        &self,
907        _cursor: Option<LaneId>,
908        _limit: usize,
909    ) -> Result<ListLanesPage, EngineError> {
910        Err(EngineError::Unavailable { op: "list_lanes" })
911    }
912
913    /// List suspended executions in one partition, cursor-paginated,
914    /// with each entry's suspension `reason_code` populated (issue
915    /// #183).
916    ///
917    /// Consumer-facing "what's blocked on what?" panels (ff-board's
918    /// suspended-executions view, operator CLIs) need the reason in
919    /// the list response so the UI does not round-trip per row to
920    /// `describe_execution` for a field it knows it needs. `reason`
921    /// on [`SuspendedExecutionEntry`] carries the free-form
922    /// `suspension:current.reason_code` field — see the type rustdoc
923    /// for the String-not-enum rationale.
924    ///
925    /// `cursor` is opaque to callers; pass `None` to start a fresh
926    /// scan and feed the returned [`ListSuspendedPage::next_cursor`]
927    /// back in on subsequent pages until it comes back `None`.
928    /// `limit` bounds the `entries` count; backends MAY return fewer
929    /// when the partition is exhausted.
930    ///
931    /// Ordering is by ascending `suspended_at_ms` (the per-lane
932    /// suspended ZSET score == `timeout_at` or the no-timeout
933    /// sentinel) with execution id as a lex tiebreak, so cursor
934    /// continuation is deterministic across calls.
935    ///
936    /// Gated on the `core` feature — suspended-list enumeration is
937    /// part of the minimal engine surface a Postgres-style backend
938    /// must honour.
939    #[cfg(feature = "core")]
940    async fn list_suspended(
941        &self,
942        _partition: PartitionKey,
943        _cursor: Option<ExecutionId>,
944        _limit: usize,
945    ) -> Result<ListSuspendedPage, EngineError> {
946        Err(EngineError::Unavailable {
947            op: "list_suspended",
948        })
949    }
950
951    /// Forward-only paginated listing of the executions indexed under
952    /// one partition.
953    ///
954    /// Reads the partition-wide `ff:idx:{p:N}:all_executions` set,
955    /// sorts lexicographically on `ExecutionId`, and returns the page
956    /// of ids strictly greater than `cursor` (or starting from the
957    /// smallest id when `cursor = None`). The returned
958    /// [`ListExecutionsPage::next_cursor`] is the last id on the page
959    /// iff at least one more id exists past it; `None` signals
960    /// end-of-stream.
961    ///
962    /// `limit` is the maximum number of ids returned on this page. A
963    /// `limit` of `0` returns an empty page with `next_cursor = None`.
964    /// Backends MAY cap `limit` internally (Valkey: 1000) and return
965    /// fewer ids than requested; callers continue paginating until
966    /// `next_cursor == None`.
967    ///
968    /// Ordering is stable under concurrent inserts for already-emitted
969    /// ids (an id less-than-or-equal-to the caller's cursor is never
970    /// re-emitted in later pages) but new inserts past the cursor WILL
971    /// appear in subsequent pages — consistent with forward-only
972    /// cursor semantics.
973    ///
974    /// Gated on the `core` feature — partition-scoped listing is part
975    /// of the minimal engine surface every backend must honour.
976    #[cfg(feature = "core")]
977    async fn list_executions(
978        &self,
979        _partition: PartitionKey,
980        _cursor: Option<ExecutionId>,
981        _limit: usize,
982    ) -> Result<ListExecutionsPage, EngineError> {
983        Err(EngineError::Unavailable {
984            op: "list_executions",
985        })
986    }
987
988    // ── Trigger ops (issue #150) ──
989
990    /// Deliver an external signal to a suspended execution's waitpoint.
991    ///
992    /// The backend atomically records the signal, evaluates the resume
993    /// condition, and — when satisfied — transitions the execution
994    /// from `suspended` to `runnable` (or buffers the signal when the
995    /// waitpoint is still `pending`). Duplicate delivery — same
996    /// `idempotency_key` + waitpoint — surfaces as
997    /// [`DeliverSignalResult::Duplicate`] with the pre-existing
998    /// `signal_id` rather than mutating state twice.
999    ///
1000    /// Input validation (HMAC token presence, payload size limits,
1001    /// signal-name shape) is the backend's responsibility; callers
1002    /// pass a fully populated [`DeliverSignalArgs`] and receive typed
1003    /// outcomes or typed errors (`ScriptError::invalid_token`,
1004    /// `ScriptError::token_expired`, `ScriptError::ExecutionNotFound`
1005    /// surfaced via [`EngineError::Transport`] on the Valkey backend).
1006    ///
1007    /// Gated on the `core` feature — signal delivery is part of the
1008    /// minimal trigger surface every backend must honour so ff-server
1009    /// / REST handlers can dispatch against `Arc<dyn EngineBackend>`
1010    /// without knowing which backend is running underneath.
1011    #[cfg(feature = "core")]
1012    async fn deliver_signal(
1013        &self,
1014        _args: DeliverSignalArgs,
1015    ) -> Result<DeliverSignalResult, EngineError> {
1016        Err(EngineError::Unavailable {
1017            op: "deliver_signal",
1018        })
1019    }
1020
1021    /// Claim a resumed execution — a previously-suspended attempt that
1022    /// has cleared its resume condition (e.g. via
1023    /// [`Self::deliver_signal`]) and now needs a worker to pick up the
1024    /// same attempt index.
1025    ///
1026    /// Distinct from [`Self::claim`] (fresh work) and
1027    /// [`Self::claim_from_resume_grant`] (grant-based ownership transfer
1028    /// after a crash): the resumed-claim path re-binds an existing
1029    /// attempt rather than minting a new one. The backend issues a
1030    /// fresh `lease_id` + bumps the `lease_epoch`, preserving
1031    /// `attempt_id` / `attempt_index` so stream frames and progress
1032    /// updates continue on the same attempt.
1033    ///
1034    /// Typed failures surface via `ScriptError` → `EngineError`:
1035    /// `NotAResumedExecution` when the attempt state is not
1036    /// `attempt_interrupted`, `ExecutionNotLeaseable` when the
1037    /// lifecycle phase is not `runnable`, and `InvalidClaimGrant`
1038    /// when the grant key is missing or was already consumed.
1039    ///
1040    /// Gated on the `core` feature — resumed-claim is part of the
1041    /// minimal trigger surface every backend must honour.
1042    #[cfg(feature = "core")]
1043    async fn claim_resumed_execution(
1044        &self,
1045        _args: ClaimResumedExecutionArgs,
1046    ) -> Result<ClaimResumedExecutionResult, EngineError> {
1047        Err(EngineError::Unavailable {
1048            op: "claim_resumed_execution",
1049        })
1050    }
1051
1052    /// Scan a lane's eligible ZSET on one partition for
1053    /// highest-priority executions awaiting a worker (v0.12 PR-5).
1054    ///
1055    /// Lifted from the SDK-side `ZRANGEBYSCORE` inline on
1056    /// `FlowFabricWorker::claim_next` — the scheduler-bypass scanner
1057    /// gated behind `direct-valkey-claim`. The trait method itself is
1058    /// backend-agnostic; consumers that drive the scanner loop
1059    /// (bench harnesses, single-tenant dev) compose it with
1060    /// [`Self::issue_claim_grant`] + [`Self::claim_execution`] to
1061    /// replicate the pre-PR-5 `claim_next` body.
1062    ///
1063    /// # Backend coverage
1064    ///
1065    /// * **Valkey** — `ZRANGEBYSCORE eligible_zset -inf +inf LIMIT 0 <limit>`
1066    ///   on the lane's partition-scoped eligible key. Single
1067    ///   command; no script round-trip. Wire shape is byte-for-byte
1068    ///   identical to the pre-PR SDK inline call so bench traces
1069    ///   match pre-PR without new `#[tracing::instrument]` span names.
1070    /// * **Postgres / SQLite** — use the `Err(Unavailable)` default.
1071    ///   PG/SQLite consumers drive work through the scheduler-routed
1072    ///   [`Self::claim_for_worker`] path instead of the scanner
1073    ///   primitives exposed here; lifting the scheduler itself onto
1074    ///   the trait is RFC-024 follow-up scope. See
1075    ///   `project_claim_from_grant_pg_sqlite_gap.md` for motivation.
1076    ///
1077    /// Default impl returns [`EngineError::Unavailable`] so the trait
1078    /// addition is non-breaking for out-of-tree backends. Same
1079    /// precedent as [`Self::claim_execution`] landing in v0.12 PR-4.
1080    #[cfg(feature = "core")]
1081    async fn scan_eligible_executions(
1082        &self,
1083        _args: ScanEligibleArgs,
1084    ) -> Result<Vec<ExecutionId>, EngineError> {
1085        Err(EngineError::Unavailable {
1086            op: "scan_eligible_executions",
1087        })
1088    }
1089
1090    /// Issue a claim grant — the scheduler's admission write — for a
1091    /// single execution on a single lane (v0.12 PR-5).
1092    ///
1093    /// Lifted from the SDK-side `ff_issue_claim_grant` inline helper
1094    /// on `FlowFabricWorker::claim_next`. The backend atomically
1095    /// writes the grant hash, appends to the per-worker grant index,
1096    /// and removes the execution from the lane's eligible ZSET.
1097    ///
1098    /// Typed rejects surface via [`EngineError::Validation`]:
1099    /// `CapabilityMismatch` when the worker's capabilities do not
1100    /// cover the execution's `required_capabilities`, `InvalidInput`
1101    /// for malformed args. Transport faults surface via
1102    /// [`EngineError::Transport`].
1103    ///
1104    /// # Backend coverage
1105    ///
1106    /// * **Valkey** — one `ff_issue_claim_grant` FCALL. KEYS/ARGV
1107    ///   shape is byte-for-byte identical to the pre-PR SDK inline
1108    ///   call; bench traces match pre-PR.
1109    /// * **Postgres / SQLite** — `Err(Unavailable)` default; use
1110    ///   [`Self::claim_for_worker`] instead. See
1111    ///   [`Self::scan_eligible_executions`] for the cross-link
1112    ///   rationale.
1113    #[cfg(feature = "core")]
1114    async fn issue_claim_grant(
1115        &self,
1116        _args: IssueClaimGrantArgs,
1117    ) -> Result<IssueClaimGrantOutcome, EngineError> {
1118        Err(EngineError::Unavailable {
1119            op: "issue_claim_grant",
1120        })
1121    }
1122
1123    /// Move an execution from a lane's eligible ZSET into its
1124    /// blocked_route ZSET (v0.12 PR-5).
1125    ///
1126    /// Lifted from the SDK-side `ff_block_execution_for_admission`
1127    /// inline helper on `FlowFabricWorker::claim_next`. Called after
1128    /// a [`Self::issue_claim_grant`] `CapabilityMismatch` reject —
1129    /// without a block step, the inline scanner would re-pick the
1130    /// same top-of-ZSET every tick (parity with
1131    /// `ff-scheduler::Scheduler::block_candidate`).
1132    ///
1133    /// The engine's unblock scanner periodically promotes
1134    /// blocked_route back to eligible once a worker with matching
1135    /// caps registers.
1136    ///
1137    /// # Backend coverage
1138    ///
1139    /// * **Valkey** — one `ff_block_execution_for_admission` FCALL.
1140    /// * **Postgres / SQLite** — `Err(Unavailable)` default; the
1141    ///   scheduler-routed [`Self::claim_for_worker`] path handles
1142    ///   admission rejects server-side.
1143    #[cfg(feature = "core")]
1144    async fn block_route(
1145        &self,
1146        _args: BlockRouteArgs,
1147    ) -> Result<BlockRouteOutcome, EngineError> {
1148        Err(EngineError::Unavailable { op: "block_route" })
1149    }
1150
1151    /// Consume a scheduler-issued claim grant to mint a fresh attempt.
1152    ///
1153    /// The SDK's grant-consumer path — paired with `FlowFabricWorker::claim_from_grant`
1154    /// in `ff-sdk` — routes through this method. The scheduler has
1155    /// already validated budget / quota / capabilities and written a
1156    /// grant (Valkey `claim_grant` hash); this call atomically
1157    /// consumes that grant and creates the attempt row, mints
1158    /// `lease_id` + `lease_epoch`, and returns a
1159    /// [`ClaimExecutionResult::Claimed`] carrying the minted lease
1160    /// triple.
1161    ///
1162    /// Distinct from [`Self::claim`] (the scheduler-bypass scanner
1163    /// used by the `direct-valkey-claim` feature) — this method
1164    /// assumes the grant already exists and skips capability / ZSET
1165    /// scanning. The Valkey impl fires exactly one `ff_claim_execution`
1166    /// FCALL.
1167    ///
1168    /// Typed failures surface via `ScriptError` → `EngineError`:
1169    /// `UseClaimResumedExecution` when the attempt is actually
1170    /// `attempt_interrupted` (caller should retry via
1171    /// [`Self::claim_resumed_execution`] — see `ContentionKind` at
1172    /// `ff_core::engine_error`), `InvalidClaimGrant` when the grant is
1173    /// missing / consumed / worker-mismatched, `CapabilityMismatch`
1174    /// when the execution's `required_capabilities` drifted after
1175    /// grant issuance.
1176    ///
1177    /// # Backend coverage
1178    ///
1179    /// * **Valkey** — implemented in `ff-backend-valkey` (one
1180    ///   `ff_claim_execution` FCALL).
1181    /// * **Postgres / SQLite** — use the `Err(Unavailable)` default in
1182    ///   this PR. Grants on PG / SQLite today flow through
1183    ///   `PostgresScheduler::claim_for_worker` (a sibling struct, not
1184    ///   an `EngineBackend` method); wiring the default-over-trait
1185    ///   behaviour into a PG / SQLite `claim_execution` impl lands
1186    ///   with a future RFC-024 grant-consumer extension.
1187    ///
1188    /// The default impl returns [`EngineError::Unavailable`] so the
1189    /// trait addition is non-breaking for out-of-tree backends. Same
1190    /// precedent as [`Self::read_current_attempt_index`] landing in
1191    /// v0.12 PR-3.
1192    #[cfg(feature = "core")]
1193    async fn claim_execution(
1194        &self,
1195        _args: ClaimExecutionArgs,
1196    ) -> Result<ClaimExecutionResult, EngineError> {
1197        Err(EngineError::Unavailable {
1198            op: "claim_execution",
1199        })
1200    }
1201
1202    /// Operator-initiated cancellation of a flow and (optionally) its
1203    /// member executions. See RFC-012 §3.1.1 for the policy /wait
1204    /// matrix.
1205    async fn cancel_flow(
1206        &self,
1207        id: &FlowId,
1208        policy: CancelFlowPolicy,
1209        wait: CancelFlowWait,
1210    ) -> Result<CancelFlowResult, EngineError>;
1211
1212    /// RFC-016 Stage A: set the inbound-edge-group policy for a
1213    /// downstream execution. Must be called before the first
1214    /// `add_dependency(... -> downstream_execution_id)` — the backend
1215    /// rejects with [`EngineError::Conflict`] if edges have already
1216    /// been staged for this group.
1217    ///
1218    /// Stage A honours only
1219    /// [`EdgeDependencyPolicy::AllOf`](crate::contracts::EdgeDependencyPolicy::AllOf);
1220    /// the `AnyOf` / `Quorum` variants return
1221    /// [`EngineError::Validation`] with
1222    /// `detail = "stage A supports AllOf only; AnyOf/Quorum land in stage B"`
1223    /// until Stage B's resolver lands.
1224    #[cfg(feature = "core")]
1225    async fn set_edge_group_policy(
1226        &self,
1227        _flow_id: &FlowId,
1228        _downstream_execution_id: &ExecutionId,
1229        _policy: crate::contracts::EdgeDependencyPolicy,
1230    ) -> Result<crate::contracts::SetEdgeGroupPolicyResult, EngineError> {
1231        Err(EngineError::Unavailable {
1232            op: "set_edge_group_policy",
1233        })
1234    }
1235
1236    // ── HMAC secret rotation (v0.7 migration-master Q4) ──
1237
1238    /// Rotate the waitpoint HMAC signing kid **cluster-wide**.
1239    ///
1240    /// **v0.7 migration-master Q4 (adjudicated 2026-04-24).**
1241    /// Additive trait surface so Valkey and Postgres backends can
1242    /// both expose the "rotate everywhere" semantic under one name.
1243    ///
1244    /// * Valkey impl fans out an `ff_rotate_waitpoint_hmac_secret`
1245    ///   FCALL per execution partition. `entries.len() == num_flow_partitions`
1246    ///   and per-partition failures are surfaced as inner `Err`
1247    ///   entries — the call as a whole does not fail when one
1248    ///   partition's FCALL fails, matching
1249    ///   [`ff_sdk::admin::rotate_waitpoint_hmac_secret_all_partitions`]'s
1250    ///   partial-success contract.
1251    /// * Postgres impl (Wave 4) writes one row to
1252    ///   `ff_waitpoint_hmac(kid, secret, rotated_at)` and returns a
1253    ///   single-entry vec with `partition = 0`.
1254    ///
1255    /// The default impl returns
1256    /// [`EngineError::Unavailable`] with
1257    /// `op = "rotate_waitpoint_hmac_secret_all"` so backends that
1258    /// haven't implemented the method surface the miss loudly rather
1259    /// than silently no-op'ing. Both concrete backends override.
1260    async fn rotate_waitpoint_hmac_secret_all(
1261        &self,
1262        _args: RotateWaitpointHmacSecretAllArgs,
1263    ) -> Result<RotateWaitpointHmacSecretAllResult, EngineError> {
1264        Err(EngineError::Unavailable {
1265            op: "rotate_waitpoint_hmac_secret_all",
1266        })
1267    }
1268
1269    /// Seed the initial waitpoint HMAC secret for a fresh deployment
1270    /// (issue #280).
1271    ///
1272    /// **Idempotent.** If a `current_kid` (Valkey per-partition) or
1273    /// an active kid row (Postgres) already exists with the given
1274    /// `kid`, the method returns
1275    /// [`SeedOutcome::AlreadySeeded`] without overwriting, reporting
1276    /// whether the stored secret matches the caller-supplied one via
1277    /// `same_secret`. Callers (cairn boot, operator tooling) invoke
1278    /// this on every boot and let the backend decide whether to
1279    /// install — removing the client-side "check then HSET" race that
1280    /// cairn's raw-HSET boot path silently tolerated.
1281    ///
1282    /// For rotation of an already-seeded secret, use
1283    /// [`Self::rotate_waitpoint_hmac_secret_all`] instead; seed is
1284    /// install-only.
1285    ///
1286    /// The default impl returns [`EngineError::Unavailable`] with
1287    /// `op = "seed_waitpoint_hmac_secret"` so backends that haven't
1288    /// implemented the method surface the miss loudly.
1289    async fn seed_waitpoint_hmac_secret(
1290        &self,
1291        _args: SeedWaitpointHmacSecretArgs,
1292    ) -> Result<SeedOutcome, EngineError> {
1293        Err(EngineError::Unavailable {
1294            op: "seed_waitpoint_hmac_secret",
1295        })
1296    }
1297
1298    // ── Budget ──
1299
1300    /// Report usage against a budget and check limits. Returns the
1301    /// typed [`ReportUsageResult`] variant; backends enforce
1302    /// idempotency via the caller-supplied
1303    /// [`UsageDimensions::dedup_key`] (RFC-012 §R7.2.3 — replaces
1304    /// the pre-Round-7 `AdmissionDecision` return).
1305    async fn report_usage(
1306        &self,
1307        handle: &Handle,
1308        budget: &BudgetId,
1309        dimensions: crate::backend::UsageDimensions,
1310    ) -> Result<ReportUsageResult, EngineError>;
1311
1312    // ── Stream reads (RFC-012 Stage 1c tranche-4; issue #87) ──
1313
1314    /// Read frames from a completed or in-flight attempt's stream.
1315    ///
1316    /// `from` / `to` are [`StreamCursor`] values — `StreamCursor::Start`
1317    /// / `StreamCursor::End` are equivalent to XRANGE `-` / `+`, and
1318    /// `StreamCursor::At("<id>")` reads from a concrete entry id.
1319    ///
1320    /// Input validation (count_limit bounds, cursor shape) is the
1321    /// caller's responsibility — SDK-side wrappers in
1322    /// [`ff-sdk`](https://docs.rs/ff-sdk) enforce bounds before
1323    /// forwarding. Backends MAY additionally reject out-of-range
1324    /// input via [`EngineError::Validation`].
1325    ///
1326    /// Gated on the `streaming` feature — stream reads are part of
1327    /// the stream-subset surface a backend without XREAD-like
1328    /// primitives may omit.
1329    #[cfg(feature = "streaming")]
1330    async fn read_stream(
1331        &self,
1332        _execution_id: &ExecutionId,
1333        _attempt_index: AttemptIndex,
1334        _from: StreamCursor,
1335        _to: StreamCursor,
1336        _count_limit: u64,
1337    ) -> Result<StreamFrames, EngineError> {
1338        Err(EngineError::Unavailable { op: "read_stream" })
1339    }
1340
1341    /// Tail a live attempt's stream.
1342    ///
1343    /// `after` is an exclusive [`StreamCursor`] — entries with id
1344    /// strictly greater than `after` are returned. `StreamCursor::Start`
1345    /// / `StreamCursor::End` are NOT accepted here; callers MUST pass
1346    /// a concrete id (or `StreamCursor::from_beginning()`). The SDK
1347    /// wrapper rejects the open markers before reaching the backend.
1348    ///
1349    /// `block_ms == 0` → non-blocking peek. `block_ms > 0` → blocks up
1350    /// to that many ms for a new entry.
1351    ///
1352    /// `visibility` (RFC-015 §6.1) filters the returned entries by
1353    /// their stored [`StreamMode`](crate::backend::StreamMode)
1354    /// `mode` field. Default
1355    /// [`TailVisibility::All`](crate::backend::TailVisibility::All)
1356    /// preserves v1 behaviour.
1357    ///
1358    /// Gated on the `streaming` feature — see [`read_stream`](Self::read_stream).
1359    #[cfg(feature = "streaming")]
1360    async fn tail_stream(
1361        &self,
1362        _execution_id: &ExecutionId,
1363        _attempt_index: AttemptIndex,
1364        _after: StreamCursor,
1365        _block_ms: u64,
1366        _count_limit: u64,
1367        _visibility: TailVisibility,
1368    ) -> Result<StreamFrames, EngineError> {
1369        Err(EngineError::Unavailable { op: "tail_stream" })
1370    }
1371
1372    /// Read the rolling summary document for an attempt (RFC-015 §6.3).
1373    ///
1374    /// Returns `Ok(None)` when no [`StreamMode::DurableSummary`](crate::backend::StreamMode::DurableSummary)
1375    /// frame has ever been appended for the attempt. Non-blocking Hash
1376    /// read; safe to call from any consumer without holding the lease.
1377    ///
1378    /// Gated on the `streaming` feature — summary reads are part of
1379    /// the stream-subset surface.
1380    #[cfg(feature = "streaming")]
1381    async fn read_summary(
1382        &self,
1383        _execution_id: &ExecutionId,
1384        _attempt_index: AttemptIndex,
1385    ) -> Result<Option<SummaryDocument>, EngineError> {
1386        Err(EngineError::Unavailable {
1387            op: "read_summary",
1388        })
1389    }
1390
1391    // ── RFC-017 Stage A — Ingress (5) ──────────────────────────
1392    //
1393    // Every method in this block has a default impl returning
1394    // `EngineError::Unavailable { op }` per RFC-017 §5.3. Concrete
1395    // backends override each method with a real body. A missing
1396    // override surfaces as a loud typed error at the call site rather
1397    // than a silent no-op.
1398
1399    /// Create an execution. Ingress row 6 (RFC-017 §4). Wraps
1400    /// `ff_create_execution` on Valkey; `INSERT INTO ff_execution ...`
1401    /// on Postgres. The `idempotency_key` + backend-side default
1402    /// `dedup_ttl_ms = 86400000` make duplicate submissions idempotent.
1403    #[cfg(feature = "core")]
1404    async fn create_execution(
1405        &self,
1406        _args: CreateExecutionArgs,
1407    ) -> Result<CreateExecutionResult, EngineError> {
1408        Err(EngineError::Unavailable {
1409            op: "create_execution",
1410        })
1411    }
1412
1413    /// Create a flow header. Ingress row 5.
1414    #[cfg(feature = "core")]
1415    async fn create_flow(
1416        &self,
1417        _args: CreateFlowArgs,
1418    ) -> Result<CreateFlowResult, EngineError> {
1419        Err(EngineError::Unavailable { op: "create_flow" })
1420    }
1421
1422    /// Atomically add an execution to a flow (single-FCALL co-located
1423    /// commit on Valkey; single-transaction UPSERT on Postgres).
1424    #[cfg(feature = "core")]
1425    async fn add_execution_to_flow(
1426        &self,
1427        _args: AddExecutionToFlowArgs,
1428    ) -> Result<AddExecutionToFlowResult, EngineError> {
1429        Err(EngineError::Unavailable {
1430            op: "add_execution_to_flow",
1431        })
1432    }
1433
1434    /// Stage a dependency edge between flow members. CAS-guarded on
1435    /// `graph_revision` — stale rev returns `Contention(StaleGraphRevision)`.
1436    #[cfg(feature = "core")]
1437    async fn stage_dependency_edge(
1438        &self,
1439        _args: StageDependencyEdgeArgs,
1440    ) -> Result<StageDependencyEdgeResult, EngineError> {
1441        Err(EngineError::Unavailable {
1442            op: "stage_dependency_edge",
1443        })
1444    }
1445
1446    /// Apply a staged dependency edge to its downstream child.
1447    #[cfg(feature = "core")]
1448    async fn apply_dependency_to_child(
1449        &self,
1450        _args: ApplyDependencyToChildArgs,
1451    ) -> Result<ApplyDependencyToChildResult, EngineError> {
1452        Err(EngineError::Unavailable {
1453            op: "apply_dependency_to_child",
1454        })
1455    }
1456
1457    /// Resolve one dependency edge after its upstream reached a
1458    /// terminal outcome — satisfy on "success", mark impossible
1459    /// otherwise. Idempotent (`AlreadyResolved` on replay).
1460    ///
1461    /// PR-7b Step 0 overlap-resolver: lifted here so cluster 2
1462    /// (`scanner/dependency_reconciler`) could trait-route through
1463    /// `Arc<dyn EngineBackend>` without a merge conflict with cluster
1464    /// 4. Cluster 4 (`completion_listener::spawn_dispatch_loop`)
1465    /// ultimately routed through the coarser
1466    /// [`Self::cascade_completion`] (per-payload) instead of looping
1467    /// over this per-edge method, because the Postgres cascade is
1468    /// outbox-driven rather than per-edge — see `cascade_completion`
1469    /// rustdoc "Timing semantics" for details.
1470    ///
1471    /// # Backend status
1472    ///
1473    /// - **Valkey:** wraps `ff_resolve_dependency` (RFC-016 Stage C
1474    ///   signature). Atomic single-slot FCALL.
1475    /// - **Postgres:** `Unavailable`. PG's post-completion cascade is
1476    ///   not per-edge; it runs via
1477    ///   `ff_backend_postgres::dispatch::dispatch_completion(event_id)`
1478    ///   keyed on the `ff_completion_event` outbox row. The Valkey-
1479    ///   shaped per-edge resolve does not map cleanly to that model;
1480    ///   PG's `dependency_reconciler` already calls `dispatch_completion`
1481    ///   directly. The engine's PR-7b/final integration test expects
1482    ///   `Unsupported` logs from Valkey-shaped scanners on a PG
1483    ///   deployment — this surface honours that contract.
1484    /// - **SQLite:** `Unavailable` for the same reason (mirrors PG).
1485    ///
1486    /// The default impl returns [`EngineError::Unavailable`] so a
1487    /// backend that has not been migrated surfaces a typed
1488    /// `Unsupported`-grade error rather than a panic.
1489    #[cfg(feature = "core")]
1490    async fn resolve_dependency(
1491        &self,
1492        _args: crate::contracts::ResolveDependencyArgs,
1493    ) -> Result<crate::contracts::ResolveDependencyOutcome, EngineError> {
1494        Err(EngineError::Unavailable {
1495            op: "resolve_dependency",
1496        })
1497    }
1498
1499    /// Cascade a terminal-execution completion into its downstream
1500    /// edges. Consumed by
1501    /// `ff-engine::completion_listener::spawn_dispatch_loop` (PR-7b
1502    /// Cluster 4) to trait-route the post-completion DAG-promotion
1503    /// path through `Arc<dyn EngineBackend>`.
1504    ///
1505    /// Distinct from [`Self::resolve_dependency`]: that method is
1506    /// per-edge (one `ff_resolve_dependency` FCALL); this method is
1507    /// per-completion and orchestrates the full outgoing-edge walk
1508    /// plus `child_skipped` recursion (Valkey) or outbox-event
1509    /// dispatch (Postgres).
1510    ///
1511    /// # Timing semantics
1512    ///
1513    /// Backends diverge on *when* the caller observes cascade work.
1514    /// See [`CascadeOutcome`] for the full contract; the short form:
1515    ///
1516    /// - **Valkey:** synchronous. FCALL-driven walk completes inline;
1517    ///   `child_skipped` descendants are recursively cascaded up to the
1518    ///   internal `MAX_CASCADE_DEPTH` cap before return.
1519    /// - **Postgres:** asynchronous via the `ff_completion_event`
1520    ///   outbox. The call resolves `payload` to its `event_id`, runs
1521    ///   `ff_backend_postgres::dispatch::dispatch_completion`, and
1522    ///   returns when the outbox row has been claimed + its direct
1523    ///   hops advanced. Further-descendant cascades ride their own
1524    ///   outbox events (emitted by the per-hop tx) — NOT this call.
1525    ///
1526    /// Consumers that depend on synchronous cascade must either target
1527    /// Valkey explicitly or observe PG's `dispatched_at_ms` clearance
1528    /// via the `dependency_reconciler` partial index to verify drain.
1529    ///
1530    /// The default impl returns [`EngineError::Unavailable`] so
1531    /// backends that have not been migrated surface a typed error
1532    /// rather than a panic.
1533    ///
1534    /// [`CascadeOutcome`]: crate::contracts::CascadeOutcome
1535    #[cfg(feature = "core")]
1536    async fn cascade_completion(
1537        &self,
1538        _payload: &crate::backend::CompletionPayload,
1539    ) -> Result<crate::contracts::CascadeOutcome, EngineError> {
1540        Err(EngineError::Unavailable {
1541            op: "cascade_completion",
1542        })
1543    }
1544
1545    // ── RFC-017 Stage A — Operator control (4) ─────────────────
1546
1547    /// Operator-initiated execution cancel (row 2).
1548    #[cfg(feature = "core")]
1549    async fn cancel_execution(
1550        &self,
1551        _args: CancelExecutionArgs,
1552    ) -> Result<CancelExecutionResult, EngineError> {
1553        Err(EngineError::Unavailable {
1554            op: "cancel_execution",
1555        })
1556    }
1557
1558    /// Re-score an execution's eligibility priority (row 17).
1559    #[cfg(feature = "core")]
1560    async fn change_priority(
1561        &self,
1562        _args: ChangePriorityArgs,
1563    ) -> Result<ChangePriorityResult, EngineError> {
1564        Err(EngineError::Unavailable {
1565            op: "change_priority",
1566        })
1567    }
1568
1569    /// Replay a terminal execution (row 22). Variadic KEYS handling
1570    /// (inbound-edge pre-read) is hidden inside the Valkey impl per
1571    /// RFC-017 §4 row 3.
1572    #[cfg(feature = "core")]
1573    async fn replay_execution(
1574        &self,
1575        _args: ReplayExecutionArgs,
1576    ) -> Result<ReplayExecutionResult, EngineError> {
1577        Err(EngineError::Unavailable {
1578            op: "replay_execution",
1579        })
1580    }
1581
1582    /// Operator-initiated lease revoke (row 19).
1583    #[cfg(feature = "core")]
1584    async fn revoke_lease(
1585        &self,
1586        _args: RevokeLeaseArgs,
1587    ) -> Result<RevokeLeaseResult, EngineError> {
1588        Err(EngineError::Unavailable { op: "revoke_lease" })
1589    }
1590
1591    // ── cairn #389 — service-layer typed FCALL surface ────────────
1592    //
1593    // These methods mirror `complete`/`fail`/`renew` (which take a
1594    // worker [`Handle`]) but dispatch against `(execution_id, fence)`
1595    // tuples supplied directly. Service-layer callers (cairn's
1596    // `valkey_control_plane_impl.rs`, future consumers that hold a
1597    // run/lease descriptor without a `Handle`) use these to avoid
1598    // going through the raw `ferriskey::Value` FCALL escape hatch.
1599    //
1600    // Same shape / same precedent as `suspend_by_triple` (cairn #322).
1601    //
1602    // Default impl returns [`EngineError::Unavailable`] so the trait
1603    // addition is non-breaking for out-of-tree backends. The in-tree
1604    // Valkey backend overrides; Postgres + SQLite keep the default
1605    // until follow-up parity work lands (consistent with
1606    // `issue_reclaim_grant` / `reclaim_execution` precedent).
1607
1608    /// Service-layer `complete_execution` — peer of [`Self::complete`]
1609    /// that takes a fence triple instead of a worker [`Handle`]. See
1610    /// the group preamble above for cairn-migration context.
1611    #[cfg(feature = "core")]
1612    async fn complete_execution(
1613        &self,
1614        _args: CompleteExecutionArgs,
1615    ) -> Result<CompleteExecutionResult, EngineError> {
1616        Err(EngineError::Unavailable {
1617            op: "complete_execution",
1618        })
1619    }
1620
1621    /// Service-layer `fail_execution` — peer of [`Self::fail`] that
1622    /// takes a fence triple instead of a worker [`Handle`].
1623    #[cfg(feature = "core")]
1624    async fn fail_execution(
1625        &self,
1626        _args: FailExecutionArgs,
1627    ) -> Result<FailExecutionResult, EngineError> {
1628        Err(EngineError::Unavailable {
1629            op: "fail_execution",
1630        })
1631    }
1632
1633    /// Service-layer `renew_lease` — peer of [`Self::renew`] that
1634    /// takes a fence triple instead of a worker [`Handle`].
1635    #[cfg(feature = "core")]
1636    async fn renew_lease(
1637        &self,
1638        _args: RenewLeaseArgs,
1639    ) -> Result<RenewLeaseResult, EngineError> {
1640        Err(EngineError::Unavailable { op: "renew_lease" })
1641    }
1642
1643    /// Service-layer `resume_execution` — transitions a suspended
1644    /// execution back to runnable. Distinct from
1645    /// [`Self::claim_from_resume_grant`] (which mints a worker handle
1646    /// against an already-eligible resumed execution): this method is
1647    /// the lifecycle transition primitive the control plane calls
1648    /// when an operator / auto-resume policy moves a suspended
1649    /// execution forward.
1650    ///
1651    /// The Valkey impl pre-reads `current_waitpoint_id` + `lane_id`
1652    /// from `exec_core` so callers only need the execution id + the
1653    /// trigger type — same ergonomics as `revoke_lease` reading
1654    /// `current_worker_instance_id` when callers omit it.
1655    #[cfg(feature = "core")]
1656    async fn resume_execution(
1657        &self,
1658        _args: ResumeExecutionArgs,
1659    ) -> Result<ResumeExecutionResult, EngineError> {
1660        Err(EngineError::Unavailable {
1661            op: "resume_execution",
1662        })
1663    }
1664
1665    /// Service-layer `check_admission_and_record` — atomic admission
1666    /// check against a quota policy. Callers supply the policy id +
1667    /// dimension (quota keys live on their own `{q:<policy>}`
1668    /// partition that cannot be derived from `execution_id`, so these
1669    /// travel outside [`CheckAdmissionArgs`]). `dimension` defaults
1670    /// to `"default"` inside the Valkey body when the caller passes
1671    /// an empty string — matches cairn's pre-migration default.
1672    #[cfg(feature = "core")]
1673    async fn check_admission(
1674        &self,
1675        _quota_policy_id: &crate::types::QuotaPolicyId,
1676        _dimension: &str,
1677        _args: CheckAdmissionArgs,
1678    ) -> Result<CheckAdmissionResult, EngineError> {
1679        Err(EngineError::Unavailable {
1680            op: "check_admission",
1681        })
1682    }
1683
1684    /// Service-layer `evaluate_flow_eligibility` — read-only check
1685    /// that returns the execution's current eligibility state
1686    /// (`eligible`, `blocked_by_dependencies`, or a backend-specific
1687    /// status string). Called by cairn's dependency-resolution path
1688    /// to decide whether a downstream execution can proceed.
1689    #[cfg(feature = "core")]
1690    async fn evaluate_flow_eligibility(
1691        &self,
1692        _args: EvaluateFlowEligibilityArgs,
1693    ) -> Result<EvaluateFlowEligibilityResult, EngineError> {
1694        Err(EngineError::Unavailable {
1695            op: "evaluate_flow_eligibility",
1696        })
1697    }
1698
1699    // ── #454 — cairn typed-FCALL additions (4) ─────────────────
1700
1701    /// Per-execution budget spend with tenant-open dimensions.
1702    ///
1703    /// Cairn #454 Q1/Q2: takes an open-set `BTreeMap<String, u64>` of
1704    /// deltas (distinct from the fixed-shape
1705    /// [`Self::report_usage`]/[`Self::report_usage_admin`] which use
1706    /// [`UsageDimensions`]). Return shape reuses [`ReportUsageResult`]
1707    /// — same `Ok` / `SoftBreach` / `HardBreach` / `AlreadyApplied`
1708    /// variants cairn's UI already branches on.
1709    ///
1710    /// The default impl returns [`EngineError::Unavailable`] so the
1711    /// trait remains additive for backends that have not landed the
1712    /// #454 body yet.
1713    #[cfg(feature = "core")]
1714    async fn record_spend(
1715        &self,
1716        _args: RecordSpendArgs,
1717    ) -> Result<ReportUsageResult, EngineError> {
1718        Err(EngineError::Unavailable {
1719            op: "record_spend",
1720        })
1721    }
1722
1723    /// Per-execution budget attribution release.
1724    ///
1725    /// Called on execution termination to reverse this execution's
1726    /// contribution to a budget counter. Per cairn #454 clarification
1727    /// this is **per-execution**, not a whole-budget flush — the
1728    /// budget persists across executions.
1729    ///
1730    /// The default impl returns [`EngineError::Unavailable`].
1731    #[cfg(feature = "core")]
1732    async fn release_budget(
1733        &self,
1734        _args: ReleaseBudgetArgs,
1735    ) -> Result<(), EngineError> {
1736        Err(EngineError::Unavailable {
1737            op: "release_budget",
1738        })
1739    }
1740
1741    /// Operator-driven approval-signal delivery.
1742    ///
1743    /// Pre-shaped variant of [`Self::deliver_signal`] where the caller
1744    /// **does not carry the waitpoint token**. The backend reads the
1745    /// token from `ff_waitpoint_pending`, HMAC-verifies server-side,
1746    /// and dispatches. Operator API never sees the token bytes.
1747    ///
1748    /// Cairn #454 Q3 — `signal_name` is a flat string (conventional
1749    /// values `"approved"` / `"rejected"`); audit metadata lives in
1750    /// cairn's audit log, not on the FF surface.
1751    ///
1752    /// The default impl returns [`EngineError::Unavailable`].
1753    #[cfg(feature = "core")]
1754    async fn deliver_approval_signal(
1755        &self,
1756        _args: DeliverApprovalSignalArgs,
1757    ) -> Result<DeliverSignalResult, EngineError> {
1758        Err(EngineError::Unavailable {
1759            op: "deliver_approval_signal",
1760        })
1761    }
1762
1763    /// Backend-atomic `issue_claim_grant` + `claim_execution`.
1764    ///
1765    /// Cairn #454 Q4 — composing the two primitives caller-side risks
1766    /// leaking a grant if `claim_execution` fails after
1767    /// `issue_claim_grant` succeeded. This method's contract is that
1768    /// the composition is backend-atomic: Valkey fuses them in one
1769    /// FCALL; PG/SQLite fuse them in one tx.
1770    ///
1771    /// The default impl **must not** be a chained call — it returns
1772    /// [`EngineError::Unavailable`] so consumers cannot accidentally
1773    /// use a non-atomic fallback.
1774    #[cfg(feature = "core")]
1775    async fn issue_grant_and_claim(
1776        &self,
1777        _args: IssueGrantAndClaimArgs,
1778    ) -> Result<ClaimGrantOutcome, EngineError> {
1779        Err(EngineError::Unavailable {
1780            op: "issue_grant_and_claim",
1781        })
1782    }
1783
1784    // ── RFC-017 Stage A — Budget + quota admin (5) ─────────────
1785
1786    /// Create a budget definition (row 6).
1787    #[cfg(feature = "core")]
1788    async fn create_budget(
1789        &self,
1790        _args: CreateBudgetArgs,
1791    ) -> Result<CreateBudgetResult, EngineError> {
1792        Err(EngineError::Unavailable {
1793            op: "create_budget",
1794        })
1795    }
1796
1797    /// Reset a budget's usage counters (row 10).
1798    #[cfg(feature = "core")]
1799    async fn reset_budget(
1800        &self,
1801        _args: ResetBudgetArgs,
1802    ) -> Result<ResetBudgetResult, EngineError> {
1803        Err(EngineError::Unavailable { op: "reset_budget" })
1804    }
1805
1806    /// Create a quota policy (row 7).
1807    #[cfg(feature = "core")]
1808    async fn create_quota_policy(
1809        &self,
1810        _args: CreateQuotaPolicyArgs,
1811    ) -> Result<CreateQuotaPolicyResult, EngineError> {
1812        Err(EngineError::Unavailable {
1813            op: "create_quota_policy",
1814        })
1815    }
1816
1817    /// Read-only budget status for operator visibility (row 8).
1818    #[cfg(feature = "core")]
1819    async fn get_budget_status(
1820        &self,
1821        _id: &BudgetId,
1822    ) -> Result<BudgetStatus, EngineError> {
1823        Err(EngineError::Unavailable {
1824            op: "get_budget_status",
1825        })
1826    }
1827
1828    /// Admin-path `report_usage` (row 9 + RFC-017 §5 round-1 F4).
1829    /// Distinct from the existing [`Self::report_usage`] which takes
1830    /// a worker handle — the admin path has no lease context.
1831    #[cfg(feature = "core")]
1832    async fn report_usage_admin(
1833        &self,
1834        _budget: &BudgetId,
1835        _args: ReportUsageAdminArgs,
1836    ) -> Result<ReportUsageResult, EngineError> {
1837        Err(EngineError::Unavailable {
1838            op: "report_usage_admin",
1839        })
1840    }
1841
1842    // ── RFC-017 Stage A — Read + diagnostics (3) ───────────────
1843
1844    /// Fetch the stored result payload for a completed execution
1845    /// (row 4). Returns `Ok(None)` when the execution is missing, not
1846    /// yet complete, or its payload was trimmed by retention policy.
1847    async fn get_execution_result(
1848        &self,
1849        _id: &ExecutionId,
1850    ) -> Result<Option<Vec<u8>>, EngineError> {
1851        Err(EngineError::Unavailable {
1852            op: "get_execution_result",
1853        })
1854    }
1855
1856    /// List the pending-or-active waitpoints for an execution, cursor
1857    /// paginated (row 5 / §8). Stage A preserves the existing
1858    /// `PendingWaitpointInfo` shape; Stage D ships the §8 HMAC
1859    /// sanitisation + `(token_kid, token_fingerprint)` schema.
1860    #[cfg(feature = "core")]
1861    async fn list_pending_waitpoints(
1862        &self,
1863        _args: ListPendingWaitpointsArgs,
1864    ) -> Result<ListPendingWaitpointsResult, EngineError> {
1865        Err(EngineError::Unavailable {
1866            op: "list_pending_waitpoints",
1867        })
1868    }
1869
1870    /// Backend-level reachability probe (row 1). Valkey: `PING`;
1871    /// Postgres: `SELECT 1`.
1872    async fn ping(&self) -> Result<(), EngineError> {
1873        Err(EngineError::Unavailable { op: "ping" })
1874    }
1875
1876    // ── RFC-017 Stage A — Scheduling (1) ───────────────────────
1877
1878    /// Scheduler-routed claim entrypoint (row 18, RFC-017 §7). Valkey
1879    /// forwards to its `ff_scheduler::Scheduler` cursor; Postgres
1880    /// forwards to `PostgresScheduler`'s `FOR UPDATE SKIP LOCKED`
1881    /// path.
1882    ///
1883    /// Backends that carry an embedded scheduler (e.g. `ValkeyBackend`
1884    /// constructed via `with_embedded_scheduler`, or `PostgresBackend`
1885    /// with its `with_scanners` sibling) route the claim through it.
1886    /// Backends without a wired scheduler return
1887    /// [`EngineError::Unavailable`]. HTTP consumers use
1888    /// `FlowFabricWorker::claim_via_server` instead.
1889    #[cfg(feature = "core")]
1890    async fn claim_for_worker(
1891        &self,
1892        _args: ClaimForWorkerArgs,
1893    ) -> Result<ClaimForWorkerOutcome, EngineError> {
1894        Err(EngineError::Unavailable {
1895            op: "claim_for_worker",
1896        })
1897    }
1898
1899    // ── Cross-cutting (RFC-017 Stage B trait-lift) ──────────────
1900
1901    /// Static observability label identifying the backend family in
1902    /// logs + metrics (RFC-017 §5.4 + §9 Stage B). Default impl
1903    /// returns `"unknown"` so legacy `impl EngineBackend` blocks that
1904    /// have not upgraded keep compiling; every in-tree backend
1905    /// overrides — `ValkeyBackend` → `"valkey"`, `PostgresBackend` →
1906    /// `"postgres"`.
1907    fn backend_label(&self) -> &'static str {
1908        "unknown"
1909    }
1910
1911    /// Backend downcast escape hatch (v0.12 PR-7a transitional).
1912    ///
1913    /// Scanner supervisors in `ff-engine` still dispatch through a
1914    /// concrete `ferriskey::Client`; to keep the engine's public
1915    /// boundary backend-agnostic (`Arc<dyn EngineBackend>`) while the
1916    /// scanner internals remain Valkey-shaped, the engine downcasts
1917    /// via this method and reaches in for the embedded client. Every
1918    /// backend that wants to be consumed by `Engine::start_with_completions`
1919    /// overrides this to return `self` as `&dyn Any`; the default
1920    /// returns a placeholder so a stray `downcast_ref` fails cleanly
1921    /// rather than risking unsound behaviour.
1922    ///
1923    /// v0.13 (PR-7b) will trait-ify individual scanners onto
1924    /// `EngineBackend` and retire `ff-engine`'s dependence on this
1925    /// downcast path. The method itself will remain on the trait
1926    /// (likely deprecated) rather than be removed — removing a
1927    /// public trait method is a breaking change for external
1928    /// `impl EngineBackend` blocks.
1929    fn as_any(&self) -> &(dyn std::any::Any + 'static) {
1930        // Placeholder so the default does not expose `Self` for
1931        // downcast. Backends override to return `self`.
1932        &()
1933    }
1934
1935    /// RFC-018 Stage A: snapshot of this backend's identity + the
1936    /// flat `Supports` surface it can actually service. Consumers use
1937    /// this at startup to gate UI features / choose between alternative
1938    /// code paths before dispatching. See
1939    /// `rfcs/RFC-018-backend-capability-discovery.md` for the full
1940    /// discovery contract and the four owner-adjudicated open
1941    /// questions (granularity: coarse; version: struct; sync; no
1942    /// event stream).
1943    ///
1944    /// Default: returns a value tagged `family = "unknown"` with every
1945    /// `supports.*` bool `false`, so pre-RFC-018 out-of-tree backends
1946    /// keep compiling and consumers treat "all false" as "dispatch
1947    /// and catch [`EngineError::Unavailable`]" (pre-RFC-018 behaviour).
1948    /// Concrete in-tree backends (`ValkeyBackend`, `PostgresBackend`)
1949    /// override to populate a real value.
1950    ///
1951    /// Sync (no `.await`): backend-static info should not require a
1952    /// probe on every query. Dynamic probes happen once at
1953    /// `connect*` time and cache the result.
1954    fn capabilities(&self) -> crate::capability::Capabilities {
1955        crate::capability::Capabilities::new(
1956            crate::capability::BackendIdentity::new(
1957                "unknown",
1958                crate::capability::Version::new(0, 0, 0),
1959                "unknown",
1960            ),
1961            crate::capability::Supports::none(),
1962        )
1963    }
1964
1965    /// Issue #281: run one-time backend-specific boot preparation.
1966    ///
1967    /// Intended to run ONCE per deployment startup — NOT per request.
1968    /// Idempotent and safe for consumers to call on every application
1969    /// boot; backends that have nothing to do return
1970    /// [`PrepareOutcome::NoOp`] without side effects.
1971    ///
1972    /// Per-backend behaviour:
1973    ///
1974    /// * **Valkey** — issues `FUNCTION LOAD REPLACE` for the
1975    ///   `flowfabric` Lua library (with bounded retry on transient
1976    ///   transport faults; permanent compile errors surface as
1977    ///   [`EngineError::Transport`] without retry). Returns
1978    ///   [`PrepareOutcome::Applied`] carrying
1979    ///   `"FUNCTION LOAD (flowfabric lib v<N>)"`.
1980    /// * **Postgres** — returns [`PrepareOutcome::NoOp`]. Schema
1981    ///   migrations are applied out-of-band per
1982    ///   `rfcs/drafts/v0.7-migration-master.md §Q12`; the backend
1983    ///   runs a schema-version check at connect time and refuses to
1984    ///   start on mismatch, so no boot-side prepare work remains.
1985    /// * **Default impl** — returns [`PrepareOutcome::NoOp`] so
1986    ///   out-of-tree backends without preparation work compile
1987    ///   without boilerplate.
1988    ///
1989    /// # Relationship to the in-tree boot path
1990    ///
1991    /// `ValkeyBackend::initialize_deployment` (called from
1992    /// `Server::start_with_metrics`) already invokes
1993    /// [`ensure_library`](ff_script::loader::ensure_library) inline as
1994    /// its step 4; that path is unchanged. `prepare()` exists as a
1995    /// **trait-surface entry point** so consumers that construct an
1996    /// `Arc<dyn EngineBackend>` outside of `Server` (e.g.
1997    /// cairn-fabric's boot path at `cairn-fabric/src/boot.rs`) can
1998    /// run the same preparation without reaching into
1999    /// backend-specific modules. The overlap is intentional: calling
2000    /// both `prepare()` and `initialize_deployment` is safe because
2001    /// `FUNCTION LOAD REPLACE` is idempotent under the version
2002    /// check.
2003    ///
2004    /// # Layer forwarding
2005    ///
2006    /// Layer impls (`HookedBackend`, ff-sdk layers) do NOT forward
2007    /// `prepare` today — consistent with `backend_label` / `ping` /
2008    /// `shutdown_prepare`. Consumers that wrap a backend in layers
2009    /// MUST call `prepare()` on the raw backend before wrapping, or
2010    /// accept the default [`PrepareOutcome::NoOp`].
2011    async fn prepare(&self) -> Result<PrepareOutcome, EngineError> {
2012        Ok(PrepareOutcome::NoOp)
2013    }
2014
2015    /// Drain-before-shutdown hook (RFC-017 §5.4). The server calls
2016    /// this before draining its own background tasks so backend-
2017    /// scoped primitives (Valkey stream semaphore, Postgres sqlx
2018    /// pool, …) can close their gates and await in-flight work up to
2019    /// `grace`.
2020    ///
2021    /// Default impl returns `Ok(())` — a no-op backend has nothing
2022    /// backend-scoped to drain. Concrete backends whose data plane
2023    /// owns resources (connection pools, semaphores, listeners)
2024    /// override with a real body.
2025    async fn shutdown_prepare(&self, _grace: Duration) -> Result<(), EngineError> {
2026        Ok(())
2027    }
2028
2029    // ── RFC-017 Stage E2 — `Server::client` removal (header + reads) ───
2030
2031    /// RFC-017 Stage E2: the "header" portion of `cancel_flow` — run the
2032    /// atomic flow-state flip (Valkey: `ff_cancel_flow` FCALL; Postgres:
2033    /// `cancel_flow_once` tx), decode policy + membership, and surface
2034    /// the `flow_already_terminal` idempotency branch as a first-class
2035    /// [`CancelFlowHeader::AlreadyTerminal`] so the Server can build
2036    /// the wire [`CancelFlowResult`] without reaching for a raw
2037    /// `Client`. Separate from the existing
2038    /// [`EngineBackend::cancel_flow`] entry point (which takes the
2039    /// enum-typed `(policy, wait)` split and returns the wait-collapsed
2040    /// `CancelFlowResult`) because the Server owns its own
2041    /// wait-dispatch + member-cancel machinery via
2042    /// [`EngineBackend::cancel_execution`] + backlog ack.
2043    ///
2044    /// Default impl returns [`EngineError::Unavailable`] so un-migrated
2045    /// backends surface the miss loudly.
2046    #[cfg(feature = "core")]
2047    async fn cancel_flow_header(
2048        &self,
2049        _args: CancelFlowArgs,
2050    ) -> Result<crate::contracts::CancelFlowHeader, EngineError> {
2051        Err(EngineError::Unavailable {
2052            op: "cancel_flow_header",
2053        })
2054    }
2055
2056    /// RFC-017 Stage E2: best-effort acknowledgement that one member of
2057    /// a `cancel_all` flow has completed its per-member cancel. Drains
2058    /// the member from the flow's `pending_cancels` set and, if empty,
2059    /// removes the flow from the partition-level `cancel_backlog`
2060    /// (Valkey: `ff_ack_cancel_member` FCALL; Postgres: table write —
2061    /// default `Unavailable` until Wave 9).
2062    ///
2063    /// Failures are swallowed by the caller — the cancel-backlog
2064    /// reconciler is the authoritative drain — but a typed error here
2065    /// lets the caller log a backend-scoped context string.
2066    #[cfg(feature = "core")]
2067    async fn ack_cancel_member(
2068        &self,
2069        _flow_id: &FlowId,
2070        _execution_id: &ExecutionId,
2071    ) -> Result<(), EngineError> {
2072        Err(EngineError::Unavailable {
2073            op: "ack_cancel_member",
2074        })
2075    }
2076
2077    /// RFC-017 Stage E2: full-shape execution read used by the
2078    /// `GET /v1/executions/{id}` HTTP route. Returns the legacy
2079    /// [`ExecutionInfo`] wire shape (not the decoupled
2080    /// [`ExecutionSnapshot`]) so the existing HTTP response bytes stay
2081    /// identical across the migration.
2082    ///
2083    /// `Ok(None)` ⇒ no such execution. Default `Unavailable` because
2084    /// the Valkey HGETALL-and-parse is backend-specific.
2085    #[cfg(feature = "core")]
2086    async fn read_execution_info(
2087        &self,
2088        _id: &ExecutionId,
2089    ) -> Result<Option<ExecutionInfo>, EngineError> {
2090        Err(EngineError::Unavailable {
2091            op: "read_execution_info",
2092        })
2093    }
2094
2095    /// RFC-017 Stage E2: narrow `public_state` read used by the
2096    /// `GET /v1/executions/{id}/state` HTTP route. Returns `Ok(None)`
2097    /// when the execution is missing. Default `Unavailable`.
2098    #[cfg(feature = "core")]
2099    async fn read_execution_state(
2100        &self,
2101        _id: &ExecutionId,
2102    ) -> Result<Option<PublicState>, EngineError> {
2103        Err(EngineError::Unavailable {
2104            op: "read_execution_state",
2105        })
2106    }
2107
2108    // ── RFC-019 Stage A/B/C — Stream-cursor subscriptions ─────────
2109    //
2110    // Four owner-adjudicated families (RFC-019 §Open Questions #5):
2111    // `lease_history`, `completion`, `signal_delivery`,
2112    // `instance_tags`. Stage C (this crate) promotes each family to
2113    // a typed event enum; consumers `match` on variants instead of
2114    // parsing a backend-shaped byte blob.
2115    //
2116    // Each method returns a family-specific subscription alias (see
2117    // [`crate::stream_events`]). All defaults return
2118    // `EngineError::Unavailable` per RFC-017 trait-growth conventions.
2119
2120    /// Subscribe to lease lifecycle events (acquired / renewed /
2121    /// expired / reclaimed / revoked) for the partition this backend
2122    /// is configured with.
2123    ///
2124    /// Cross-partition fan-out is consumer-side merge: subscribe
2125    /// per-partition backend instance and interleave on the read
2126    /// side. Yields
2127    /// `Err(EngineError::StreamDisconnected { cursor })` on backend
2128    /// disconnect; resume by calling this method again with the
2129    /// returned cursor.
2130    ///
2131    /// `filter` (#282): when `filter.instance_tag` is `Some((k, v))`,
2132    /// only events whose execution carries tag `k = v` are yielded
2133    /// (matching the [`crate::backend::ScannerFilter`] surface from
2134    /// #122). Pass `&ScannerFilter::default()` for unfiltered
2135    /// behaviour. Filtering happens inside the backend stream; the
2136    /// [`crate::stream_events::LeaseHistorySubscription`] return type
2137    /// is unchanged.
2138    async fn subscribe_lease_history(
2139        &self,
2140        _cursor: crate::stream_subscribe::StreamCursor,
2141        _filter: &crate::backend::ScannerFilter,
2142    ) -> Result<crate::stream_events::LeaseHistorySubscription, EngineError> {
2143        Err(EngineError::Unavailable {
2144            op: "subscribe_lease_history",
2145        })
2146    }
2147
2148    /// Subscribe to completion events (terminal state transitions).
2149    ///
2150    /// - **Postgres**: wraps the `ff_completion_event` outbox +
2151    ///   LISTEN/NOTIFY machinery. Durable via event-id cursor.
2152    /// - **Valkey**: wraps the RESP3 `ff:dag:completions` pubsub
2153    ///   subscriber. Pubsub is at-most-once over the live
2154    ///   subscription window; the cursor is always the empty
2155    ///   sentinel. If you need at-least-once replay with durable
2156    ///   cursor resume, use the Postgres backend (see
2157    ///   `docs/POSTGRES_PARITY_MATRIX.md` row `subscribe_completion`).
2158    ///
2159    /// `filter` (#282): see [`Self::subscribe_lease_history`]. Valkey
2160    /// reuses the `subscribe_completions_filtered` per-event HGET
2161    /// gate; Postgres filters inline against the outbox's denormalised
2162    /// `instance_tag` column.
2163    async fn subscribe_completion(
2164        &self,
2165        _cursor: crate::stream_subscribe::StreamCursor,
2166        _filter: &crate::backend::ScannerFilter,
2167    ) -> Result<crate::stream_events::CompletionSubscription, EngineError> {
2168        Err(EngineError::Unavailable {
2169            op: "subscribe_completion",
2170        })
2171    }
2172
2173    /// Subscribe to signal-delivery events (satisfied / buffered /
2174    /// deduped).
2175    ///
2176    /// `filter` (#282): see [`Self::subscribe_lease_history`].
2177    async fn subscribe_signal_delivery(
2178        &self,
2179        _cursor: crate::stream_subscribe::StreamCursor,
2180        _filter: &crate::backend::ScannerFilter,
2181    ) -> Result<crate::stream_events::SignalDeliverySubscription, EngineError> {
2182        Err(EngineError::Unavailable {
2183            op: "subscribe_signal_delivery",
2184        })
2185    }
2186
2187    /// Subscribe to instance-tag events (tag attached / cleared).
2188    ///
2189    /// Producer wiring is deferred per #311 audit ("no concrete
2190    /// demand"); the trait method exists for API uniformity across
2191    /// the four families. Backends currently return
2192    /// `EngineError::Unavailable`.
2193    async fn subscribe_instance_tags(
2194        &self,
2195        _cursor: crate::stream_subscribe::StreamCursor,
2196    ) -> Result<crate::stream_events::InstanceTagSubscription, EngineError> {
2197        Err(EngineError::Unavailable {
2198            op: "subscribe_instance_tags",
2199        })
2200    }
2201
2202    // ── PR-7b Cluster 1 — Foundation scanner operations ─────────
2203    //
2204    // Per-execution write hooks invoked by the engine scanner loop.
2205    // Scanner bodies discover candidate executions via partition
2206    // indices (Valkey ZSETs / PG index scans) and, for each candidate,
2207    // call through to one of the methods below to perform the atomic
2208    // state-flip. Defaults return `EngineError::Unavailable { op }`;
2209    // Valkey impls wrap the corresponding `ff_*` FCALL; Postgres impls
2210    // run a single-row tx mirroring the Lua semantic.
2211
2212    /// Lease-expiry scanner hook — mark an expired lease as reclaimable
2213    /// so another worker can redeem the execution. Atomic per call.
2214    ///
2215    /// Valkey: `FCALL ff_mark_lease_expired_if_due`.
2216    /// Postgres: single-row tx on `ff_attempt` + `ff_exec_core` (see
2217    /// `ff_backend_postgres::reconcilers::lease_expiry`).
2218    async fn mark_lease_expired_if_due(
2219        &self,
2220        _partition: crate::partition::Partition,
2221        _execution_id: &ExecutionId,
2222    ) -> Result<(), EngineError> {
2223        Err(EngineError::Unavailable {
2224            op: "mark_lease_expired_if_due",
2225        })
2226    }
2227
2228    /// Delayed-promoter scanner hook — promote a delayed execution to
2229    /// `eligible_now` once its `delay_until` has passed.
2230    ///
2231    /// Valkey: `FCALL ff_promote_delayed`.
2232    /// Postgres: Wave 9 schema scope (no current impl).
2233    async fn promote_delayed(
2234        &self,
2235        _partition: crate::partition::Partition,
2236        _lane: &LaneId,
2237        _execution_id: &ExecutionId,
2238        _now_ms: TimestampMs,
2239    ) -> Result<(), EngineError> {
2240        Err(EngineError::Unavailable {
2241            op: "promote_delayed",
2242        })
2243    }
2244
2245    /// Pending-waitpoint-expiry scanner hook — close a pending
2246    /// waitpoint whose deadline has passed (wake the suspended
2247    /// execution with a timeout signal).
2248    ///
2249    /// Valkey: `FCALL ff_close_waitpoint`.
2250    /// Postgres: Wave 9 schema scope (no current impl).
2251    async fn close_waitpoint(
2252        &self,
2253        _partition: crate::partition::Partition,
2254        _execution_id: &ExecutionId,
2255        _waitpoint_id: &str,
2256        _now_ms: TimestampMs,
2257    ) -> Result<(), EngineError> {
2258        Err(EngineError::Unavailable {
2259            op: "close_waitpoint",
2260        })
2261    }
2262
2263    /// Shared hook for the attempt-timeout and execution-deadline
2264    /// scanners — terminate or retry an execution whose wall-clock
2265    /// budget has elapsed. `phase` discriminates which of the two
2266    /// scanner paths is calling so the backend can preserve diagnostic
2267    /// breadcrumbs without forking the surface.
2268    ///
2269    /// Valkey: `FCALL ff_expire_execution` (with `phase` passed through
2270    /// as an ARGV discriminator).
2271    /// Postgres: single-row tx mirror of the Lua semantic for
2272    /// `AttemptTimeout`; `ExecutionDeadline` is Wave 9 schema scope.
2273    async fn expire_execution(
2274        &self,
2275        _partition: crate::partition::Partition,
2276        _execution_id: &ExecutionId,
2277        _phase: ExpirePhase,
2278        _now_ms: TimestampMs,
2279    ) -> Result<(), EngineError> {
2280        Err(EngineError::Unavailable {
2281            op: "expire_execution",
2282        })
2283    }
2284
2285    /// Suspension-timeout scanner hook — expire a suspended execution
2286    /// whose suspension deadline has passed (wake with timeout).
2287    ///
2288    /// Valkey: `FCALL ff_expire_suspension`.
2289    /// Postgres: single-row tx on `ff_suspend` + `ff_exec_core`.
2290    async fn expire_suspension(
2291        &self,
2292        _partition: crate::partition::Partition,
2293        _execution_id: &ExecutionId,
2294        _now_ms: TimestampMs,
2295    ) -> Result<(), EngineError> {
2296        Err(EngineError::Unavailable {
2297            op: "expire_suspension",
2298        })
2299    }
2300
2301    // ── PR-7b Cluster 2 — Reconciler scanner operations ─────────
2302    //
2303    // Per-execution write hooks invoked by the `unblock` scanner. The
2304    // other two cluster-2 scanners (`budget_reset`, `dependency_reconciler`)
2305    // route through `reset_budget` + `resolve_dependency`, which are
2306    // already part of the RFC-017 service-layer surface and live above.
2307
2308    /// Unblock-scanner hook — move an execution from a blocked ZSET back
2309    /// to `eligible_now` once its blocking condition has cleared (budget
2310    /// under limit, quota window drained, or a capable worker has come
2311    /// online). `expected_blocking_reason` discriminates which of the
2312    /// `blocked:{budget,quota,route}` sets the execution is leaving and
2313    /// also fences against a stale unblock (Lua rejects if the core's
2314    /// `blocking_reason` no longer matches).
2315    ///
2316    /// # Backend status
2317    ///
2318    /// - **Valkey:** wraps `ff_unblock_execution` (RFC-010 §6). Atomic
2319    ///   single-slot FCALL on `{p:N}`.
2320    /// - **Postgres:** `Unavailable` (structural). PG does not persist a
2321    ///   per-reason `blocked:{budget,quota,route}` index — scheduler
2322    ///   eligibility is re-evaluated live via SQL predicates on
2323    ///   `ff_exec_core` + budget / quota tables (see
2324    ///   `ff_backend_postgres::scheduler`). Nothing to reconcile, so the
2325    ///   engine's PG scanner loop does not run this path; callers on
2326    ///   a PG deployment receive the typed `Unavailable` per PR-7b/final
2327    ///   contract.
2328    /// - **SQLite:** `Unavailable` for the same reason as Postgres.
2329    async fn unblock_execution(
2330        &self,
2331        _partition: crate::partition::Partition,
2332        _lane_id: &LaneId,
2333        _execution_id: &ExecutionId,
2334        _expected_blocking_reason: &str,
2335        _now_ms: TimestampMs,
2336    ) -> Result<(), EngineError> {
2337        Err(EngineError::Unavailable {
2338            op: "unblock_execution",
2339        })
2340    }
2341
2342    /// RFC-016 Stage C — sibling-cancel group drain.
2343    ///
2344    /// After `edge_cancel_dispatcher` has issued
2345    /// [`EngineBackend::cancel_execution`] against every listed
2346    /// sibling of a `(flow_id, downstream_eid)` group, this trait
2347    /// method atomically removes the tuple from the partition-local
2348    /// pending-cancel-groups index and clears the per-group flag +
2349    /// members state.
2350    ///
2351    /// Valkey: `FCALL ff_drain_sibling_cancel_group` (SREM +
2352    /// HDEL in one Lua unit).
2353    /// Postgres: `Unavailable` — PG's Wave-6b
2354    /// `reconcilers::edge_cancel_dispatcher::dispatcher_tick`
2355    /// owns the equivalent row drain inside its own per-group
2356    /// transaction; the Valkey-shaped per-tuple call is not used on
2357    /// the PG scanner path.
2358    /// SQLite: `Unavailable` (mirrors PG; RFC-023 scope).
2359    #[cfg(feature = "core")]
2360    async fn drain_sibling_cancel_group(
2361        &self,
2362        _flow_partition: crate::partition::Partition,
2363        _flow_id: &FlowId,
2364        _downstream_eid: &ExecutionId,
2365    ) -> Result<(), EngineError> {
2366        Err(EngineError::Unavailable {
2367            op: "drain_sibling_cancel_group",
2368        })
2369    }
2370
2371    /// RFC-016 Stage D — sibling-cancel group reconcile (Invariant
2372    /// Q6 safety net).
2373    ///
2374    /// Re-examines a `(flow_id, downstream_eid)` tuple in the
2375    /// partition-local pending-cancel-groups index and returns one
2376    /// of three atomic dispositions — see
2377    /// [`SiblingCancelReconcileAction`] — so the crash-recovery
2378    /// reconciler can drain stale or completed tuples without
2379    /// fighting the Stage-C dispatcher.
2380    ///
2381    /// Valkey: `FCALL ff_reconcile_sibling_cancel_group`.
2382    /// Postgres: `Unavailable` — PG's
2383    /// `reconcilers::edge_cancel_reconciler::reconciler_tick`
2384    /// owns the row-level reconcile inside its own batched tx.
2385    /// SQLite: `Unavailable` (mirrors PG).
2386    #[cfg(feature = "core")]
2387    async fn reconcile_sibling_cancel_group(
2388        &self,
2389        _flow_partition: crate::partition::Partition,
2390        _flow_id: &FlowId,
2391        _downstream_eid: &ExecutionId,
2392    ) -> Result<SiblingCancelReconcileAction, EngineError> {
2393        Err(EngineError::Unavailable {
2394            op: "reconcile_sibling_cancel_group",
2395        })
2396    }
2397
2398    // ── PR-7b Cluster 2b-A — Tally-recompute reconciler scanners ─────
2399    //
2400    // Coarse trait methods: each runs a full scan-and-fix pass over
2401    // one partition. Unlike cluster-2's per-candidate write hooks,
2402    // these scanners do multi-round-trip discovery (SSCAN / HMGET /
2403    // HGETALL / ZSCORE / ZREMRANGEBYSCORE) interleaved with conditional
2404    // writes. Moving the whole pass into the trait impl achieves the
2405    // PR-7b/final contract ("scanner trait leaks no `ferriskey::Client`")
2406    // without atomising operations that are inherently not atomic.
2407    //
2408    // Backend status (all three methods):
2409    //
2410    // - **Valkey:** real scan-loop in the trait impl; identical command
2411    //   shape to the pre-routing scanner path.
2412    // - **Postgres:** default `Unavailable`. PG mutates budget / quota
2413    //   counters and scheduling state inside SERIALIZABLE transactions,
2414    //   and the scheduler evaluates eligibility via live SQL predicates
2415    //   on `ff_exec_core` + budget / quota tables — no persistent index
2416    //   or counter hash that can drift, hence nothing to reconcile. The
2417    //   engine's PG scanner supervisor does not run these paths.
2418    // - **SQLite:** same as Postgres (RFC-023: SQLite runs its own
2419    //   scanner supervisor; these FF-owned tally-recompute scanners
2420    //   are not registered).
2421
2422    /// Index-reconciler pass — walks `ff:idx:{p:N}:all_executions`
2423    /// and verifies each execution appears in the correct scheduling
2424    /// sorted set (`eligible` / `delayed` / `blocked:*` / `active` /
2425    /// `suspended` / `terminal`) for its current `(lifecycle_phase,
2426    /// eligibility_state, ownership_state)` triple. Phase 1 is
2427    /// log-only; auto-fix is deferred to a later phase (RFC-010 §6.14).
2428    async fn reconcile_execution_index(
2429        &self,
2430        _partition: crate::partition::Partition,
2431        _lanes: &[LaneId],
2432        _filter: &crate::backend::ScannerFilter,
2433    ) -> Result<ReconcileCounts, EngineError> {
2434        Err(EngineError::Unavailable {
2435            op: "reconcile_execution_index",
2436        })
2437    }
2438
2439    /// Budget-reconciler pass — walks `ff:budget:{b:M}:policies_idx`,
2440    /// reads each budget's definition / usage / limits, and reconciles
2441    /// the `breached_at` marker against hard limits. Resetting budgets
2442    /// (non-zero `reset_interval_ms`) are skipped — they are handled
2443    /// by the `budget_reset` scanner (cluster 2). Drops index entries
2444    /// for budgets whose definition hash has been deleted (retention
2445    /// purge / manual). RFC-008 §Budget Reconciliation, RFC-010 §6.5.
2446    async fn reconcile_budget_counters(
2447        &self,
2448        _partition: crate::partition::Partition,
2449        _now_ms: TimestampMs,
2450    ) -> Result<ReconcileCounts, EngineError> {
2451        Err(EngineError::Unavailable {
2452            op: "reconcile_budget_counters",
2453        })
2454    }
2455
2456    /// Quota-reconciler pass — walks `ff:quota:{q:M}:policies_idx`,
2457    /// trims expired entries from rate-limit sliding windows, and
2458    /// recomputes each policy's concurrency counter by walking its
2459    /// `admitted_set` and pruning entries whose admission guard key
2460    /// has TTLed out. RFC-008 §Quota Reconciliation, RFC-010 §6.6.
2461    async fn reconcile_quota_counters(
2462        &self,
2463        _partition: crate::partition::Partition,
2464        _now_ms: TimestampMs,
2465    ) -> Result<ReconcileCounts, EngineError> {
2466        Err(EngineError::Unavailable {
2467            op: "reconcile_quota_counters",
2468        })
2469    }
2470
2471    // ── PR-7b Cluster 2b-B — Projection + retention scanners ──────
2472    //
2473    // Two additional non-FCALL scanners routed through the trait so
2474    // engine-side scanners stop touching `ferriskey::Client` directly.
2475    // Both ride on aggregate reads + derived writes; neither is a thin
2476    // single-FCALL passthrough.
2477
2478    /// Flow-projector scanner hook — sample flow members, derive an
2479    /// aggregate `public_flow_state` + per-state counts, and write them
2480    /// to the flow summary projection. Returns `Ok(true)` when the
2481    /// summary was updated, `Ok(false)` when the flow had no members
2482    /// or the index entry was defensively pruned (core missing).
2483    ///
2484    /// The derived `public_flow_state` written here is distinct from
2485    /// `ff_flow_core.public_flow_state` — the former is a rollup
2486    /// dashboard field, the latter is the authoritative mutation-guard
2487    /// state owned by `create_flow` / `cancel_flow`. See
2488    /// `ff_engine::scanner::flow_projector` module doc for the
2489    /// two-sources contract.
2490    ///
2491    /// # Backend status
2492    ///
2493    /// - **Valkey:** lifts the pre-PR-7b Rust-composed
2494    ///   SCARD + SRANDMEMBER + per-member HGET + HSET summary pattern.
2495    ///   No new Lua function; the aggregation is inherently
2496    ///   multi-round-trip (cross-partition member reads) and atomicity
2497    ///   is neither required nor achievable against 256 partitions.
2498    /// - **Postgres:** SELECT aggregates from `ff_exec_core` grouped by
2499    ///   `public_state` for the flow's members, INSERT ... ON CONFLICT
2500    ///   DO UPDATE into `ff_flow_summary` (migration 0019). One query
2501    ///   per flow; partition-local aggregation.
2502    /// - **SQLite:** `Unavailable` per RFC-023 Phase 3.5 (flow summary
2503    ///   projection is a shared-deployment dashboard feature; local
2504    ///   single-tenant SQLite deployments don't need it).
2505    #[cfg(feature = "core")]
2506    async fn project_flow_summary(
2507        &self,
2508        _partition: crate::partition::Partition,
2509        _flow_id: &FlowId,
2510        _now_ms: TimestampMs,
2511    ) -> Result<bool, EngineError> {
2512        Err(EngineError::Unavailable {
2513            op: "project_flow_summary",
2514        })
2515    }
2516
2517    /// Retention-trimmer scanner hook — delete terminal executions and
2518    /// all their subordinate keys/rows once they are older than the
2519    /// configured retention window. Returns the number of executions
2520    /// actually purged in this call (so the scanner can loop when it
2521    /// hits `batch_size`).
2522    ///
2523    /// Retention trimming is inherently a scan-and-delete loop over
2524    /// time; the trait surface exists to remove engine-side Valkey
2525    /// coupling, **not** to atomise the operation into a single
2526    /// round-trip. Implementations may issue multiple round-trips
2527    /// (e.g. per-execution cascade across sibling tables); the trait
2528    /// contract is "make progress, bounded by batch_size".
2529    ///
2530    /// # Backend status
2531    ///
2532    /// - **Valkey:** lifts the pre-PR-7b ZRANGEBYSCORE + per-exec
2533    ///   cascade-delete loop verbatim. Cluster-safe (all keys for one
2534    ///   execution live on the same `{p:N}` slot).
2535    /// - **Postgres:** `DELETE FROM ff_exec_core` for terminal rows
2536    ///   past the cutoff plus explicit cascade DELETEs on every
2537    ///   execution-scoped sibling table (no FK CASCADE in the schema).
2538    ///   Single transaction per batch.
2539    /// - **SQLite:** `Unavailable` per RFC-023 Phase 3.5. Single-tenant
2540    ///   local deployments typically manage their own DB lifecycle and
2541    ///   do not need a retention scanner.
2542    #[cfg(feature = "core")]
2543    async fn trim_retention(
2544        &self,
2545        _partition: crate::partition::Partition,
2546        _lane_id: &LaneId,
2547        _retention_ms: u64,
2548        _now_ms: TimestampMs,
2549        _batch_size: u32,
2550        _filter: &crate::backend::ScannerFilter,
2551    ) -> Result<u32, EngineError> {
2552        Err(EngineError::Unavailable {
2553            op: "trim_retention",
2554        })
2555    }
2556
2557    // ── PR-7b Wave 0a: exec_core field read primitive ──
2558
2559    /// Point-read N fields from the `exec_core` hash for a given
2560    /// execution. Returns a map of field-name → Option<String>
2561    /// (None for fields absent or stored as NULL). Scanner call
2562    /// sites formerly issuing raw `HGET`/`HMGET` on `ExecKeyContext::core()`
2563    /// route through this trait method (cairn #436 / PR-7b Wave 0a).
2564    ///
2565    /// Field values are coerced to String at the trait boundary for
2566    /// wire compatibility with the Valkey HGET shape. Consumers parse
2567    /// specific fields (`lane_id`, `current_attempt_index`, etc.) from
2568    /// the returned strings as needed.
2569    ///
2570    /// - **Valkey:** `HMGET exec_core_key f1 f2 ...`, zipping names to values.
2571    /// - **Postgres:** `SELECT` against `ff_exec_core` with dynamic column
2572    ///   extraction; fields in `raw_fields` JSONB are extracted via `->>`.
2573    /// - **SQLite:** equivalent with `json_extract(raw_fields, '$.field')`.
2574    ///
2575    /// Default body returns `Unavailable` so non-v0.13 backends remain
2576    /// compile-compatible.
2577    #[cfg(feature = "core")]
2578    async fn read_exec_core_fields(
2579        &self,
2580        _partition: crate::partition::Partition,
2581        _execution_id: &crate::types::ExecutionId,
2582        _fields: &[&str],
2583    ) -> Result<std::collections::HashMap<String, Option<String>>, EngineError> {
2584        Err(EngineError::Unavailable {
2585            op: "read_exec_core_fields",
2586        })
2587    }
2588
2589    // ── PR-7b Wave 0a: backend clock primitive ──
2590
2591    /// Returns the backend's current wall-clock epoch milliseconds.
2592    ///
2593    /// Used by 15 scanners to compute "due" thresholds before issuing
2594    /// per-partition due-scans. Previously a Valkey-only helper in
2595    /// `ff-engine`'s `scanner::lease_expiry` issuing `TIME`; this
2596    /// trait method is the backend-agnostic replacement (cairn #436 /
2597    /// PR-7b Wave 0a).
2598    ///
2599    /// Every in-tree backend overrides. The default falls back to
2600    /// `SystemTime::now()` so out-of-tree `EngineBackend` impls (e.g.
2601    /// cairn mocks, test doubles) stay source-compatible across v0.12
2602    /// → v0.13.
2603    ///
2604    /// - **Valkey:** `TIME` command.
2605    /// - **Postgres:** `SELECT (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint`
2606    ///   (not `now()` — `now()` is the transaction start timestamp,
2607    ///   which is stale under any long-running tx; scanners need the
2608    ///   true wall-clock read).
2609    /// - **SQLite:** `SELECT CAST((julianday('now') - 2440587.5) * 86400000 AS INTEGER)`.
2610    #[cfg(feature = "core")]
2611    async fn server_time_ms(&self) -> Result<u64, EngineError> {
2612        use std::time::{SystemTime, UNIX_EPOCH};
2613        let d = SystemTime::now()
2614            .duration_since(UNIX_EPOCH)
2615            .map_err(|e| EngineError::Transport {
2616                backend: "system-clock",
2617                source: format!("server_time_ms default: {e}").into(),
2618            })?;
2619        Ok(d.as_millis() as u64)
2620    }
2621}
2622
2623/// RFC-016 Stage D outcome of a single
2624/// [`EngineBackend::reconcile_sibling_cancel_group`] call.
2625///
2626/// Cardinality is intentionally bounded to three so the
2627/// `ff_sibling_cancel_reconcile_total{action}` metric label stays
2628/// closed.
2629#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2630pub enum SiblingCancelReconcileAction {
2631    /// Pending tuple was stale (flag cleared or edgegroup absent);
2632    /// SREM'd / DELETE'd without touching the group.
2633    SremmedStale,
2634    /// Flag still set but every listed sibling is already terminal —
2635    /// an interrupted drain. Flag cleared + tuple drained.
2636    CompletedDrain,
2637    /// Flag set and at least one sibling non-terminal — dispatcher
2638    /// owns this tuple; left untouched.
2639    NoOp,
2640}
2641
2642impl SiblingCancelReconcileAction {
2643    /// Short label for observability (matches the Lua + PG action
2644    /// strings exactly).
2645    pub fn as_str(&self) -> &'static str {
2646        match self {
2647            Self::SremmedStale => "sremmed_stale",
2648            Self::CompletedDrain => "completed_drain",
2649            Self::NoOp => "no_op",
2650        }
2651    }
2652}
2653
2654/// Result of one reconciler scan pass over a single partition.
2655///
2656/// `processed` counts candidates where the reconciler detected a
2657/// correctable condition (drift, breach flip, stale guard). `errors`
2658/// counts transport / parse failures. Scanners sum these into the
2659/// engine's per-pass metrics exactly as they did before PR-7b/2b-A.
2660#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
2661pub struct ReconcileCounts {
2662    /// Items the scanner corrected (drift fix, breach flip, guard
2663    /// eviction). Zero when the partition is healthy.
2664    pub processed: u32,
2665    /// Items the scanner could not process due to transport or parse
2666    /// errors. Non-zero values surface through scanner metrics and
2667    /// should be investigated.
2668    pub errors: u32,
2669}
2670
2671/// Which scanner invoked [`EngineBackend::expire_execution`].
2672///
2673/// The attempt-timeout and execution-deadline scanners share a single
2674/// trait method — their underlying state flip is identical (terminate
2675/// or retry per retry policy); the distinction is purely diagnostic
2676/// (which deadline elapsed) and carried through to the backend so the
2677/// same Lua / SQL path can log or tag appropriately.
2678#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2679pub enum ExpirePhase {
2680    /// Invoked by `attempt_timeout` scanner — the per-attempt
2681    /// wall-clock budget elapsed.
2682    AttemptTimeout,
2683    /// Invoked by `execution_deadline` scanner — the whole-execution
2684    /// wall-clock deadline elapsed.
2685    ExecutionDeadline,
2686}
2687
2688impl ExpirePhase {
2689    /// Short string tag suitable for Lua ARGV or Postgres breadcrumbs.
2690    pub fn as_str(&self) -> &'static str {
2691        match self {
2692            Self::AttemptTimeout => "attempt_timeout",
2693            Self::ExecutionDeadline => "execution_deadline",
2694        }
2695    }
2696}
2697
2698/// Object-safety assertion: `dyn EngineBackend` compiles iff every
2699/// method is dyn-compatible. Kept as a compile-time guard so a future
2700/// trait change that accidentally breaks dyn-safety fails the build
2701/// at this site rather than at every downstream `Arc<dyn
2702/// EngineBackend>` use.
2703#[allow(dead_code)]
2704fn _assert_dyn_compatible(_: &dyn EngineBackend) {}
2705
2706/// Polling interval for [`wait_for_flow_cancellation`]. Tight enough
2707/// that a local single-node cancel cascade observes `cancelled` within
2708/// one or two polls; slack enough that a `WaitIndefinite` caller does
2709/// not hammer `describe_flow` on a live cluster.
2710const CANCEL_WAIT_POLL_INTERVAL: Duration = Duration::from_millis(100);
2711
2712/// Defensive ceiling for [`CancelFlowWait::WaitIndefinite`] — if the
2713/// reconciler cascade has not converged in five minutes, something is
2714/// wedged and returning `Timeout` is strictly more useful than blocking
2715/// forever. RFC-012 §3.1.1 expects real-world cascades to finish within
2716/// `reconciler_interval + grace`, which is orders of magnitude below
2717/// this.
2718const CANCEL_WAIT_INDEFINITE_CEILING: Duration = Duration::from_secs(300);
2719
2720/// Poll `backend.describe_flow(flow_id)` until `public_flow_state` is
2721/// `"cancelled"` or `deadline` elapses.
2722///
2723/// Shared by every backend's `cancel_flow` trait impl that honours
2724/// [`CancelFlowWait::WaitTimeout`] / [`CancelFlowWait::WaitIndefinite`].
2725/// The underlying `cancel_flow` FCALL / SQL transaction flips the
2726/// flow-level state synchronously; member cancellations dispatch
2727/// asynchronously via the reconciler, which also flips
2728/// `public_flow_state` to `cancelled` once the cascade completes. This
2729/// helper waits for that terminal flip.
2730///
2731/// Returns:
2732/// * `Ok(())` once `public_flow_state = "cancelled"` is observed.
2733/// * `Err(EngineError::Timeout { op: "cancel_flow", elapsed })` when
2734///   `deadline` elapses first. `elapsed` is the wait budget (the
2735///   requested timeout), not wall-clock precision.
2736/// * `Err(e)` if `describe_flow` itself errors (propagated).
2737pub async fn wait_for_flow_cancellation<B: EngineBackend + ?Sized>(
2738    backend: &B,
2739    flow_id: &crate::types::FlowId,
2740    deadline: Duration,
2741) -> Result<(), EngineError> {
2742    let start = std::time::Instant::now();
2743    loop {
2744        match backend.describe_flow(flow_id).await? {
2745            Some(snap) if snap.public_flow_state == "cancelled" => return Ok(()),
2746            // `None` (flow removed) is also terminal from the caller's
2747            // perspective — nothing left to wait on.
2748            None => return Ok(()),
2749            Some(_) => {}
2750        }
2751        if start.elapsed() >= deadline {
2752            return Err(EngineError::Timeout {
2753                op: "cancel_flow",
2754                elapsed: deadline,
2755            });
2756        }
2757        tokio::time::sleep(CANCEL_WAIT_POLL_INTERVAL).await;
2758    }
2759}
2760
2761/// Convert a [`CancelFlowWait`] into the deadline passed to
2762/// [`wait_for_flow_cancellation`]. `NoWait` returns `None` — the caller
2763/// must skip the wait entirely.
2764pub fn cancel_flow_wait_deadline(wait: CancelFlowWait) -> Option<Duration> {
2765    // `CancelFlowWait` is `#[non_exhaustive]`; this match lives in the
2766    // defining crate so the exhaustiveness check keeps the compiler
2767    // honest. Future variants must be wired here explicitly.
2768    match wait {
2769        CancelFlowWait::NoWait => None,
2770        CancelFlowWait::WaitTimeout(d) => Some(d),
2771        CancelFlowWait::WaitIndefinite => Some(CANCEL_WAIT_INDEFINITE_CEILING),
2772    }
2773}
2774
2775/// Validate a caller-namespaced tag key against the regex
2776/// `^[a-z][a-z0-9_]*\.[a-z0-9_][a-z0-9_.]*$`.
2777///
2778/// The Rust trait-side check is **stricter than the Valkey Lua
2779/// contracts** (`ff_set_execution_tags` / `ff_set_flow_tags`), which
2780/// only check `^[a-z][a-z0-9_]*%.[^.]` — namespace prefix + first
2781/// suffix char, with the rest of the suffix unvalidated. Every
2782/// backend-side impl (`ff-backend-{valkey,postgres,sqlite}`) calls
2783/// this helper **before** the wire hop so the effective parity
2784/// contract is this full-key regex; Valkey's Lua is an additional,
2785/// more permissive server-side guard, not the parity-of-record.
2786///
2787/// A key passes iff:
2788///
2789/// * it begins with an ASCII lowercase letter;
2790/// * all characters up to the first `.` are lowercase alnum or `_`;
2791/// * the first `.` is followed by at least one non-`.` character
2792///   (so `cairn.` and `cairn..x` fail — the `<field>` must be non-empty).
2793///
2794/// Shared entry point for [`EngineBackend::set_execution_tag`] /
2795/// [`EngineBackend::set_flow_tag`] / [`EngineBackend::get_execution_tag`] /
2796/// [`EngineBackend::get_flow_tag`] so every backend rejects the same
2797/// keyspace. On rejection, returns
2798/// [`EngineError::Validation { kind: ValidationKind::InvalidInput, .. }`](crate::engine_error::EngineError::Validation)
2799/// with the offending key in `detail`.
2800///
2801/// `#[allow(clippy::result_large_err)]` — `EngineError` is the uniform
2802/// error across the whole `EngineBackend` trait surface (see trait method
2803/// signatures above); boxing it here alone would introduce a
2804/// gratuitous signature deviation. Clippy 1.95 flags free functions but
2805/// not trait methods; this function mirrors the trait-method convention.
2806#[allow(clippy::result_large_err)]
2807pub fn validate_tag_key(key: &str) -> Result<(), EngineError> {
2808    use crate::engine_error::ValidationKind;
2809
2810    let bad = || EngineError::Validation {
2811        kind: ValidationKind::InvalidInput,
2812        detail: format!(
2813            "invalid tag key: {key:?} (must match ^[a-z][a-z0-9_]*\\.[a-z0-9_][a-z0-9_.]*$)"
2814        ),
2815    };
2816
2817    let mut chars = key.chars();
2818    let first = chars.next().ok_or_else(bad)?;
2819    if !first.is_ascii_lowercase() {
2820        return Err(bad());
2821    }
2822    // Phase 1: consume the namespace prefix up to (but not including) the first `.`.
2823    // Chars must be `[a-z0-9_]`.
2824    let mut saw_dot = false;
2825    for c in chars.by_ref() {
2826        if c == '.' {
2827            saw_dot = true;
2828            break;
2829        }
2830        if !(c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_') {
2831            return Err(bad());
2832        }
2833    }
2834    if !saw_dot {
2835        return Err(bad());
2836    }
2837    // Phase 2: the first char after the first `.` must exist and be a
2838    // non-dot member char `[a-z0-9_]` (matches Valkey's Lua regex
2839    // `^[a-z][a-z0-9_]*%.[^.]` — a second consecutive dot is rejected).
2840    let second = chars.next().ok_or_else(bad)?;
2841    if !(second.is_ascii_lowercase() || second.is_ascii_digit() || second == '_') {
2842        return Err(bad());
2843    }
2844    // Phase 3 (Finding 2 tightening): every remaining char MUST be
2845    // `[a-z0-9_.]`. Before this, the suffix was unvalidated beyond
2846    // its first char — so `cairn.foo bar`, `cairn.Foo`,
2847    // `cairn.foo"bar`, `cairn.foo-bar` all passed trait-side validation
2848    // even though they would break SQLite JSON-path quoting, the Lua
2849    // HSET field-name conventions, or consumer grep patterns. Valkey's
2850    // Lua prefix regex is identically permissive on the suffix; this
2851    // tightens both layers from the Rust side. Dots in the suffix
2852    // remain legal (`app.sub.field` is valid) to preserve the existing
2853    // accepted shape.
2854    for c in chars {
2855        if !(c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_' || c == '.') {
2856            return Err(bad());
2857        }
2858    }
2859    Ok(())
2860}
2861
2862#[cfg(test)]
2863mod tests {
2864    use super::*;
2865
2866    /// A zero-state backend stub used to exercise the default
2867    /// `capabilities()` impl without pulling in a real
2868    /// transport. Only the default method is under test here; every
2869    /// other method is unreachable on this type.
2870    struct DefaultBackend;
2871
2872    #[async_trait]
2873    impl EngineBackend for DefaultBackend {
2874        async fn claim(
2875            &self,
2876            _lane: &LaneId,
2877            _capabilities: &CapabilitySet,
2878            _policy: ClaimPolicy,
2879        ) -> Result<Option<Handle>, EngineError> {
2880            unreachable!()
2881        }
2882        async fn renew(&self, _handle: &Handle) -> Result<LeaseRenewal, EngineError> {
2883            unreachable!()
2884        }
2885        async fn progress(
2886            &self,
2887            _handle: &Handle,
2888            _percent: Option<u8>,
2889            _message: Option<String>,
2890        ) -> Result<(), EngineError> {
2891            unreachable!()
2892        }
2893        async fn append_frame(
2894            &self,
2895            _handle: &Handle,
2896            _frame: Frame,
2897        ) -> Result<AppendFrameOutcome, EngineError> {
2898            unreachable!()
2899        }
2900        async fn complete(
2901            &self,
2902            _handle: &Handle,
2903            _payload: Option<Vec<u8>>,
2904        ) -> Result<(), EngineError> {
2905            unreachable!()
2906        }
2907        async fn fail(
2908            &self,
2909            _handle: &Handle,
2910            _reason: FailureReason,
2911            _classification: FailureClass,
2912        ) -> Result<FailOutcome, EngineError> {
2913            unreachable!()
2914        }
2915        async fn cancel(&self, _handle: &Handle, _reason: &str) -> Result<(), EngineError> {
2916            unreachable!()
2917        }
2918        async fn suspend(
2919            &self,
2920            _handle: &Handle,
2921            _args: SuspendArgs,
2922        ) -> Result<SuspendOutcome, EngineError> {
2923            unreachable!()
2924        }
2925        async fn create_waitpoint(
2926            &self,
2927            _handle: &Handle,
2928            _waitpoint_key: &str,
2929            _expires_in: Duration,
2930        ) -> Result<PendingWaitpoint, EngineError> {
2931            unreachable!()
2932        }
2933        async fn observe_signals(
2934            &self,
2935            _handle: &Handle,
2936        ) -> Result<Vec<ResumeSignal>, EngineError> {
2937            unreachable!()
2938        }
2939        async fn claim_from_resume_grant(
2940            &self,
2941            _token: ResumeToken,
2942        ) -> Result<Option<Handle>, EngineError> {
2943            unreachable!()
2944        }
2945        async fn delay(
2946            &self,
2947            _handle: &Handle,
2948            _delay_until: TimestampMs,
2949        ) -> Result<(), EngineError> {
2950            unreachable!()
2951        }
2952        async fn wait_children(&self, _handle: &Handle) -> Result<(), EngineError> {
2953            unreachable!()
2954        }
2955        async fn describe_execution(
2956            &self,
2957            _id: &ExecutionId,
2958        ) -> Result<Option<ExecutionSnapshot>, EngineError> {
2959            unreachable!()
2960        }
2961        async fn read_execution_context(
2962            &self,
2963            _execution_id: &ExecutionId,
2964        ) -> Result<ExecutionContext, EngineError> {
2965            Ok(ExecutionContext::new(
2966                Vec::new(),
2967                String::new(),
2968                std::collections::HashMap::new(),
2969            ))
2970        }
2971        async fn read_current_attempt_index(
2972            &self,
2973            _execution_id: &ExecutionId,
2974        ) -> Result<AttemptIndex, EngineError> {
2975            Ok(AttemptIndex::new(0))
2976        }
2977        async fn read_total_attempt_count(
2978            &self,
2979            _execution_id: &ExecutionId,
2980        ) -> Result<AttemptIndex, EngineError> {
2981            Ok(AttemptIndex::new(0))
2982        }
2983        async fn describe_flow(
2984            &self,
2985            _id: &FlowId,
2986        ) -> Result<Option<FlowSnapshot>, EngineError> {
2987            unreachable!()
2988        }
2989        #[cfg(feature = "core")]
2990        async fn list_edges(
2991            &self,
2992            _flow_id: &FlowId,
2993            _direction: EdgeDirection,
2994        ) -> Result<Vec<EdgeSnapshot>, EngineError> {
2995            unreachable!()
2996        }
2997        #[cfg(feature = "core")]
2998        async fn describe_edge(
2999            &self,
3000            _flow_id: &FlowId,
3001            _edge_id: &EdgeId,
3002        ) -> Result<Option<EdgeSnapshot>, EngineError> {
3003            unreachable!()
3004        }
3005        #[cfg(feature = "core")]
3006        async fn resolve_execution_flow_id(
3007            &self,
3008            _eid: &ExecutionId,
3009        ) -> Result<Option<FlowId>, EngineError> {
3010            unreachable!()
3011        }
3012        #[cfg(feature = "core")]
3013        async fn list_flows(
3014            &self,
3015            _partition: PartitionKey,
3016            _cursor: Option<FlowId>,
3017            _limit: usize,
3018        ) -> Result<ListFlowsPage, EngineError> {
3019            unreachable!()
3020        }
3021        #[cfg(feature = "core")]
3022        async fn list_lanes(
3023            &self,
3024            _cursor: Option<LaneId>,
3025            _limit: usize,
3026        ) -> Result<ListLanesPage, EngineError> {
3027            unreachable!()
3028        }
3029        #[cfg(feature = "core")]
3030        async fn list_suspended(
3031            &self,
3032            _partition: PartitionKey,
3033            _cursor: Option<ExecutionId>,
3034            _limit: usize,
3035        ) -> Result<ListSuspendedPage, EngineError> {
3036            unreachable!()
3037        }
3038        #[cfg(feature = "core")]
3039        async fn list_executions(
3040            &self,
3041            _partition: PartitionKey,
3042            _cursor: Option<ExecutionId>,
3043            _limit: usize,
3044        ) -> Result<ListExecutionsPage, EngineError> {
3045            unreachable!()
3046        }
3047        #[cfg(feature = "core")]
3048        async fn deliver_signal(
3049            &self,
3050            _args: DeliverSignalArgs,
3051        ) -> Result<DeliverSignalResult, EngineError> {
3052            unreachable!()
3053        }
3054        #[cfg(feature = "core")]
3055        async fn claim_resumed_execution(
3056            &self,
3057            _args: ClaimResumedExecutionArgs,
3058        ) -> Result<ClaimResumedExecutionResult, EngineError> {
3059            unreachable!()
3060        }
3061        async fn cancel_flow(
3062            &self,
3063            _id: &FlowId,
3064            _policy: CancelFlowPolicy,
3065            _wait: CancelFlowWait,
3066        ) -> Result<CancelFlowResult, EngineError> {
3067            unreachable!()
3068        }
3069        #[cfg(feature = "core")]
3070        async fn set_edge_group_policy(
3071            &self,
3072            _flow_id: &FlowId,
3073            _downstream_execution_id: &ExecutionId,
3074            _policy: crate::contracts::EdgeDependencyPolicy,
3075        ) -> Result<crate::contracts::SetEdgeGroupPolicyResult, EngineError> {
3076            unreachable!()
3077        }
3078        async fn report_usage(
3079            &self,
3080            _handle: &Handle,
3081            _budget: &BudgetId,
3082            _dimensions: crate::backend::UsageDimensions,
3083        ) -> Result<ReportUsageResult, EngineError> {
3084            unreachable!()
3085        }
3086        #[cfg(feature = "streaming")]
3087        async fn read_stream(
3088            &self,
3089            _execution_id: &ExecutionId,
3090            _attempt_index: AttemptIndex,
3091            _from: StreamCursor,
3092            _to: StreamCursor,
3093            _count_limit: u64,
3094        ) -> Result<StreamFrames, EngineError> {
3095            unreachable!()
3096        }
3097        #[cfg(feature = "streaming")]
3098        async fn tail_stream(
3099            &self,
3100            _execution_id: &ExecutionId,
3101            _attempt_index: AttemptIndex,
3102            _after: StreamCursor,
3103            _block_ms: u64,
3104            _count_limit: u64,
3105            _visibility: TailVisibility,
3106        ) -> Result<StreamFrames, EngineError> {
3107            unreachable!()
3108        }
3109        #[cfg(feature = "streaming")]
3110        async fn read_summary(
3111            &self,
3112            _execution_id: &ExecutionId,
3113            _attempt_index: AttemptIndex,
3114        ) -> Result<Option<SummaryDocument>, EngineError> {
3115            unreachable!()
3116        }
3117    }
3118
3119    /// The default `capabilities()` impl returns a value tagged
3120    /// `family = "unknown"` with every `supports.*` bool false, so
3121    /// pre-RFC-018 out-of-tree backends keep compiling and consumers
3122    /// can distinguish "backend predates RFC-018" from "backend
3123    /// reports concrete bools." Every concrete in-tree backend
3124    /// overrides.
3125    #[test]
3126    fn default_capabilities_is_unknown_family_all_false() {
3127        let b = DefaultBackend;
3128        let caps = b.capabilities();
3129        assert_eq!(caps.identity.family, "unknown");
3130        assert_eq!(
3131            caps.identity.version,
3132            crate::capability::Version::new(0, 0, 0)
3133        );
3134        assert_eq!(caps.identity.rfc017_stage, "unknown");
3135        // Every field false on the default (matches `Supports::none()`).
3136        assert_eq!(caps.supports, crate::capability::Supports::none());
3137    }
3138
3139    // ── resolve_dependency (PR-7b Step 0) ──
3140
3141    /// Default impl returns `Unavailable { op: "resolve_dependency" }`
3142    /// so pre-migration backends surface a typed Unsupported-grade
3143    /// error rather than panic. Both cluster 2's dependency_reconciler
3144    /// and cluster 4's completion_listener route through this method;
3145    /// the default is the safety net for non-Valkey deployments.
3146    #[cfg(feature = "core")]
3147    #[tokio::test]
3148    async fn default_resolve_dependency_is_unavailable() {
3149        use crate::contracts::ResolveDependencyArgs;
3150        use crate::partition::{Partition, PartitionFamily};
3151        use crate::types::{AttemptIndex, EdgeId, FlowId, LaneId};
3152
3153        let b = DefaultBackend;
3154        let partition = Partition {
3155            family: PartitionFamily::Flow,
3156            index: 0,
3157        };
3158        let args = ResolveDependencyArgs::new(
3159            partition,
3160            FlowId::parse("11111111-1111-1111-1111-111111111111").unwrap(),
3161            ExecutionId::parse("{fp:0}:22222222-2222-2222-2222-222222222222").unwrap(),
3162            ExecutionId::parse("{fp:0}:33333333-3333-3333-3333-333333333333").unwrap(),
3163            EdgeId::parse("44444444-4444-4444-4444-444444444444").unwrap(),
3164            LaneId::new("default"),
3165            AttemptIndex::new(0),
3166            "success".to_owned(),
3167            TimestampMs::now(),
3168        );
3169        match b.resolve_dependency(args).await {
3170            Err(EngineError::Unavailable { op }) => {
3171                assert_eq!(op, "resolve_dependency");
3172            }
3173            other => panic!("expected Unavailable, got {other:?}"),
3174        }
3175    }
3176
3177    // ── cascade_completion (PR-7b Cluster 4) ──
3178
3179    /// Default impl returns `Unavailable { op: "cascade_completion" }`
3180    /// so pre-migration / non-core builds surface a typed error rather
3181    /// than panic. The push-based completion listener routes through
3182    /// this method; the default is the safety net for deployments
3183    /// whose backend doesn't cascade (e.g. SQLite in the current
3184    /// Wave 9 scope).
3185    #[cfg(feature = "core")]
3186    #[tokio::test]
3187    async fn default_cascade_completion_is_unavailable() {
3188        use crate::backend::CompletionPayload;
3189
3190        let b = DefaultBackend;
3191        let eid = ExecutionId::parse(
3192            "{fp:0}:66666666-6666-6666-6666-666666666666",
3193        )
3194        .unwrap();
3195        let payload = CompletionPayload::new(eid, "success", None, TimestampMs::now());
3196        match b.cascade_completion(&payload).await {
3197            Err(EngineError::Unavailable { op }) => {
3198                assert_eq!(op, "cascade_completion");
3199            }
3200            other => panic!("expected Unavailable, got {other:?}"),
3201        }
3202    }
3203
3204    // ── unblock_execution (PR-7b Cluster 2) ──
3205
3206    /// Default impl returns `Unavailable { op: "unblock_execution" }`
3207    /// so non-Valkey deployments get a typed `Unavailable` rather than
3208    /// a panic. The engine scanner loop on PG/SQLite skips this path
3209    /// entirely (scheduler eligibility is re-evaluated live via SQL
3210    /// predicates), but a stray direct call must still fail gracefully.
3211    #[cfg(feature = "core")]
3212    #[tokio::test]
3213    async fn default_unblock_execution_is_unavailable() {
3214        use crate::partition::{Partition, PartitionFamily};
3215
3216        let b = DefaultBackend;
3217        let partition = Partition {
3218            family: PartitionFamily::Execution,
3219            index: 0,
3220        };
3221        let eid = ExecutionId::parse(
3222            "{fp:0}:55555555-5555-5555-5555-555555555555",
3223        )
3224        .unwrap();
3225        let lane = LaneId::new("default");
3226        match b
3227            .unblock_execution(
3228                partition,
3229                &lane,
3230                &eid,
3231                "waiting_for_budget",
3232                TimestampMs::from_millis(0),
3233            )
3234            .await
3235        {
3236            Err(EngineError::Unavailable { op }) => {
3237                assert_eq!(op, "unblock_execution");
3238            }
3239            other => panic!("expected Unavailable, got {other:?}"),
3240        }
3241    }
3242
3243    // ── project_flow_summary (PR-7b Cluster 2b-B) ──
3244
3245    /// Default impl returns `Unavailable { op: "project_flow_summary" }`
3246    /// so non-Valkey deployments get a typed `Unavailable` rather than
3247    /// a panic. SQLite rides the default per RFC-023 Phase 3.5.
3248    #[cfg(feature = "core")]
3249    #[tokio::test]
3250    async fn default_project_flow_summary_is_unavailable() {
3251        use crate::partition::{Partition, PartitionFamily};
3252        use crate::types::FlowId;
3253
3254        let b = DefaultBackend;
3255        let partition = Partition {
3256            family: PartitionFamily::Flow,
3257            index: 0,
3258        };
3259        let fid = FlowId::parse("11111111-1111-1111-1111-111111111111").unwrap();
3260        match b
3261            .project_flow_summary(partition, &fid, TimestampMs::from_millis(0))
3262            .await
3263        {
3264            Err(EngineError::Unavailable { op }) => {
3265                assert_eq!(op, "project_flow_summary");
3266            }
3267            other => panic!("expected Unavailable, got {other:?}"),
3268        }
3269    }
3270
3271    // ── trim_retention (PR-7b Cluster 2b-B) ──
3272
3273    /// Default impl returns `Unavailable { op: "trim_retention" }` so
3274    /// non-Valkey deployments get a typed `Unavailable` rather than a
3275    /// panic. SQLite rides the default per RFC-023 Phase 3.5.
3276    #[cfg(feature = "core")]
3277    #[tokio::test]
3278    async fn default_trim_retention_is_unavailable() {
3279        use crate::partition::{Partition, PartitionFamily};
3280
3281        let b = DefaultBackend;
3282        let partition = Partition {
3283            family: PartitionFamily::Execution,
3284            index: 0,
3285        };
3286        let lane = LaneId::new("default");
3287        match b
3288            .trim_retention(
3289                partition,
3290                &lane,
3291                60_000,
3292                TimestampMs::from_millis(0),
3293                20,
3294                &crate::backend::ScannerFilter::NOOP,
3295            )
3296            .await
3297        {
3298            Err(EngineError::Unavailable { op }) => {
3299                assert_eq!(op, "trim_retention");
3300            }
3301            other => panic!("expected Unavailable, got {other:?}"),
3302        }
3303    }
3304
3305    // ── read_exec_core_fields (PR-7b Wave 0a) ──
3306
3307    /// Default impl returns `Unavailable { op: "read_exec_core_fields" }`
3308    /// so out-of-tree backends that pre-date v0.13 keep compiling while
3309    /// surfacing a typed error on call. Empty-field input short-circuits
3310    /// to `Ok(empty map)` in all in-tree impls; the default is triggered
3311    /// only when the trait method itself is un-overridden.
3312    #[cfg(feature = "core")]
3313    #[tokio::test]
3314    async fn default_read_exec_core_fields_is_unavailable() {
3315        use crate::partition::{Partition, PartitionFamily};
3316
3317        let b = DefaultBackend;
3318        let partition = Partition {
3319            family: PartitionFamily::Execution,
3320            index: 0,
3321        };
3322        let eid = ExecutionId::parse(
3323            "{fp:0}:66666666-6666-6666-6666-666666666666",
3324        )
3325        .unwrap();
3326        match b
3327            .read_exec_core_fields(partition, &eid, &["lane_id"])
3328            .await
3329        {
3330            Err(EngineError::Unavailable { op }) => {
3331                assert_eq!(op, "read_exec_core_fields");
3332            }
3333            other => panic!("expected Unavailable, got {other:?}"),
3334        }
3335    }
3336
3337    // ── validate_tag_key (issue #433) ──
3338
3339    #[test]
3340    fn validate_tag_key_accepts_valid() {
3341        for k in [
3342            "cairn.session_id",
3343            "cairn.project",
3344            "a.b",
3345            "a1_2.x",
3346            "app.sub.field",
3347            "x.y_z",
3348        ] {
3349            validate_tag_key(k).unwrap_or_else(|e| panic!("{k:?} should pass: {e:?}"));
3350        }
3351    }
3352
3353    #[test]
3354    fn validate_tag_key_rejects_invalid() {
3355        for k in [
3356            "",                // empty
3357            "Cairn.x",         // uppercase first
3358            "1cairn.x",        // leading digit
3359            "cairn",           // no dot
3360            "cairn.",          // empty suffix
3361            "cairn..x",        // dot immediately after first dot
3362            ".cairn",          // leading dot
3363            "cair n.x",        // space before dot
3364            "ca-irn.x",        // hyphen in prefix
3365            // Finding 2 tightening — suffix now fully validated.
3366            "cairn.Foo",       // uppercase in suffix
3367            "cairn.foo bar",   // space in suffix
3368            "cairn.foo\"bar",  // double-quote in suffix (would break SQLite JSON-path quoting)
3369            "cairn.foo-bar",   // hyphen in suffix
3370            "cairn.foo\\bar",  // backslash in suffix
3371        ] {
3372            let err = validate_tag_key(k)
3373                .err()
3374                .unwrap_or_else(|| panic!("{k:?} should fail"));
3375            match err {
3376                EngineError::Validation {
3377                    kind: crate::engine_error::ValidationKind::InvalidInput,
3378                    ..
3379                } => {}
3380                other => panic!("{k:?}: unexpected err {other:?}"),
3381            }
3382        }
3383    }
3384
3385    // ── cairn #389: service-layer FCALL trait-method defaults ──
3386    //
3387    // Each method MUST return `EngineError::Unavailable { op: "<name>" }`
3388    // on backends that haven't overridden it, so out-of-tree backends
3389    // keep compiling and consumers get a terminal-classified error
3390    // rather than a panic. Mirrors the precedent used by
3391    // `issue_reclaim_grant` / `reclaim_execution` / `suspend_by_triple`.
3392    //
3393    // Feature-gated on `core` because the methods under test only
3394    // exist on the trait under that feature — matches the precedent
3395    // of `default_resolve_dependency_is_unavailable` above.
3396
3397    #[cfg(feature = "core")]
3398    #[tokio::test]
3399    async fn default_complete_execution_is_unavailable() {
3400        use crate::contracts::CompleteExecutionArgs;
3401        use crate::types::{ExecutionId, FlowId};
3402        let b = DefaultBackend;
3403        let config = crate::partition::PartitionConfig::default();
3404        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3405        let args = CompleteExecutionArgs {
3406            execution_id: eid,
3407            fence: None,
3408            attempt_index: AttemptIndex::new(0),
3409            result_payload: None,
3410            result_encoding: None,
3411            source: crate::types::CancelSource::default(),
3412            now: TimestampMs::from_millis(0),
3413        };
3414        match b.complete_execution(args).await.unwrap_err() {
3415            EngineError::Unavailable { op } => assert_eq!(op, "complete_execution"),
3416            other => panic!("expected Unavailable, got {other:?}"),
3417        }
3418    }
3419
3420    #[cfg(feature = "core")]
3421    #[tokio::test]
3422    async fn default_fail_execution_is_unavailable() {
3423        use crate::contracts::FailExecutionArgs;
3424        use crate::types::{ExecutionId, FlowId};
3425        let b = DefaultBackend;
3426        let config = crate::partition::PartitionConfig::default();
3427        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3428        let args = FailExecutionArgs {
3429            execution_id: eid,
3430            fence: None,
3431            attempt_index: AttemptIndex::new(0),
3432            failure_reason: String::new(),
3433            failure_category: String::new(),
3434            retry_policy_json: String::new(),
3435            next_attempt_policy_json: String::new(),
3436            source: crate::types::CancelSource::default(),
3437        };
3438        match b.fail_execution(args).await.unwrap_err() {
3439            EngineError::Unavailable { op } => assert_eq!(op, "fail_execution"),
3440            other => panic!("expected Unavailable, got {other:?}"),
3441        }
3442    }
3443
3444    #[cfg(feature = "core")]
3445    #[tokio::test]
3446    async fn default_renew_lease_is_unavailable() {
3447        use crate::contracts::RenewLeaseArgs;
3448        use crate::types::{ExecutionId, FlowId};
3449        let b = DefaultBackend;
3450        let config = crate::partition::PartitionConfig::default();
3451        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3452        let args = RenewLeaseArgs {
3453            execution_id: eid,
3454            attempt_index: AttemptIndex::new(0),
3455            fence: None,
3456            lease_ttl_ms: 1_000,
3457            lease_history_grace_ms: 60_000,
3458        };
3459        match b.renew_lease(args).await.unwrap_err() {
3460            EngineError::Unavailable { op } => assert_eq!(op, "renew_lease"),
3461            other => panic!("expected Unavailable, got {other:?}"),
3462        }
3463    }
3464
3465    #[cfg(feature = "core")]
3466    #[tokio::test]
3467    async fn default_resume_execution_is_unavailable() {
3468        use crate::contracts::ResumeExecutionArgs;
3469        use crate::types::{ExecutionId, FlowId};
3470        let b = DefaultBackend;
3471        let config = crate::partition::PartitionConfig::default();
3472        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3473        let args = ResumeExecutionArgs {
3474            execution_id: eid,
3475            trigger_type: "signal".to_owned(),
3476            resume_delay_ms: 0,
3477        };
3478        match b.resume_execution(args).await.unwrap_err() {
3479            EngineError::Unavailable { op } => assert_eq!(op, "resume_execution"),
3480            other => panic!("expected Unavailable, got {other:?}"),
3481        }
3482    }
3483
3484    #[cfg(feature = "core")]
3485    #[tokio::test]
3486    async fn default_check_admission_is_unavailable() {
3487        use crate::contracts::CheckAdmissionArgs;
3488        use crate::types::{ExecutionId, FlowId, QuotaPolicyId};
3489        let b = DefaultBackend;
3490        let config = crate::partition::PartitionConfig::default();
3491        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3492        let args = CheckAdmissionArgs {
3493            execution_id: eid,
3494            now: TimestampMs::from_millis(0),
3495            window_seconds: 60,
3496            rate_limit: 10,
3497            concurrency_cap: 1,
3498            jitter_ms: None,
3499        };
3500        let qid = QuotaPolicyId::new();
3501        match b
3502            .check_admission(&qid, "default", args)
3503            .await
3504            .unwrap_err()
3505        {
3506            EngineError::Unavailable { op } => assert_eq!(op, "check_admission"),
3507            other => panic!("expected Unavailable, got {other:?}"),
3508        }
3509    }
3510
3511    #[cfg(feature = "core")]
3512    #[tokio::test]
3513    async fn default_evaluate_flow_eligibility_is_unavailable() {
3514        use crate::contracts::EvaluateFlowEligibilityArgs;
3515        use crate::types::{ExecutionId, FlowId};
3516        let b = DefaultBackend;
3517        let config = crate::partition::PartitionConfig::default();
3518        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3519        let args = EvaluateFlowEligibilityArgs { execution_id: eid };
3520        match b.evaluate_flow_eligibility(args).await.unwrap_err() {
3521            EngineError::Unavailable { op } => assert_eq!(op, "evaluate_flow_eligibility"),
3522            other => panic!("expected Unavailable, got {other:?}"),
3523        }
3524    }
3525
3526    // ── Cluster 2b-A tally-recompute reconcilers ──
3527
3528    #[cfg(feature = "core")]
3529    #[tokio::test]
3530    async fn default_reconcile_execution_index_is_unavailable() {
3531        use crate::backend::ScannerFilter;
3532        use crate::partition::{Partition, PartitionFamily};
3533        let b = DefaultBackend;
3534        let partition = Partition { family: PartitionFamily::Execution, index: 0 };
3535        let lanes = [LaneId::new("default")];
3536        let filter = ScannerFilter::default();
3537        match b
3538            .reconcile_execution_index(partition, &lanes, &filter)
3539            .await
3540            .unwrap_err()
3541        {
3542            EngineError::Unavailable { op } => assert_eq!(op, "reconcile_execution_index"),
3543            other => panic!("expected Unavailable, got {other:?}"),
3544        }
3545    }
3546
3547    #[cfg(feature = "core")]
3548    #[tokio::test]
3549    async fn default_reconcile_budget_counters_is_unavailable() {
3550        use crate::partition::{Partition, PartitionFamily};
3551        let b = DefaultBackend;
3552        let partition = Partition { family: PartitionFamily::Budget, index: 0 };
3553        match b
3554            .reconcile_budget_counters(partition, TimestampMs::from_millis(0))
3555            .await
3556            .unwrap_err()
3557        {
3558            EngineError::Unavailable { op } => assert_eq!(op, "reconcile_budget_counters"),
3559            other => panic!("expected Unavailable, got {other:?}"),
3560        }
3561    }
3562
3563    #[cfg(feature = "core")]
3564    #[tokio::test]
3565    async fn default_reconcile_quota_counters_is_unavailable() {
3566        use crate::partition::{Partition, PartitionFamily};
3567        let b = DefaultBackend;
3568        let partition = Partition { family: PartitionFamily::Quota, index: 0 };
3569        match b
3570            .reconcile_quota_counters(partition, TimestampMs::from_millis(0))
3571            .await
3572            .unwrap_err()
3573        {
3574            EngineError::Unavailable { op } => assert_eq!(op, "reconcile_quota_counters"),
3575            other => panic!("expected Unavailable, got {other:?}"),
3576        }
3577    }
3578
3579    // ── #454 trait-additions default-impl tests (4) ────────────
3580
3581    #[cfg(feature = "core")]
3582    #[tokio::test]
3583    async fn default_record_spend_is_unavailable() {
3584        use crate::contracts::RecordSpendArgs;
3585        use crate::types::{BudgetId, ExecutionId, FlowId};
3586        let b = DefaultBackend;
3587        let config = crate::partition::PartitionConfig::default();
3588        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3589        let args = RecordSpendArgs::new(
3590            BudgetId::new(),
3591            eid,
3592            std::collections::BTreeMap::new(),
3593            "k",
3594        );
3595        match b.record_spend(args).await.unwrap_err() {
3596            EngineError::Unavailable { op } => assert_eq!(op, "record_spend"),
3597            other => panic!("expected Unavailable, got {other:?}"),
3598        }
3599    }
3600
3601    #[cfg(feature = "core")]
3602    #[tokio::test]
3603    async fn default_release_budget_is_unavailable() {
3604        use crate::contracts::ReleaseBudgetArgs;
3605        use crate::types::{BudgetId, ExecutionId, FlowId};
3606        let b = DefaultBackend;
3607        let config = crate::partition::PartitionConfig::default();
3608        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3609        let args = ReleaseBudgetArgs::new(BudgetId::new(), eid);
3610        match b.release_budget(args).await.unwrap_err() {
3611            EngineError::Unavailable { op } => assert_eq!(op, "release_budget"),
3612            other => panic!("expected Unavailable, got {other:?}"),
3613        }
3614    }
3615
3616    #[cfg(feature = "core")]
3617    #[tokio::test]
3618    async fn default_deliver_approval_signal_is_unavailable() {
3619        use crate::contracts::DeliverApprovalSignalArgs;
3620        use crate::types::{ExecutionId, FlowId, LaneId, WaitpointId};
3621        let b = DefaultBackend;
3622        let config = crate::partition::PartitionConfig::default();
3623        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3624        let args = DeliverApprovalSignalArgs::new(
3625            eid,
3626            LaneId::new("default"),
3627            WaitpointId::new(),
3628            "approved",
3629            "sfx",
3630            1_000,
3631            Some(100),
3632            Some(10),
3633        );
3634        match b.deliver_approval_signal(args).await.unwrap_err() {
3635            EngineError::Unavailable { op } => assert_eq!(op, "deliver_approval_signal"),
3636            other => panic!("expected Unavailable, got {other:?}"),
3637        }
3638    }
3639
3640    #[cfg(feature = "core")]
3641    #[tokio::test]
3642    async fn default_issue_grant_and_claim_is_unavailable() {
3643        use crate::contracts::IssueGrantAndClaimArgs;
3644        use crate::types::{ExecutionId, FlowId, LaneId};
3645        let b = DefaultBackend;
3646        let config = crate::partition::PartitionConfig::default();
3647        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3648        let args = IssueGrantAndClaimArgs::new(eid, LaneId::new("default"), 30_000);
3649        match b.issue_grant_and_claim(args).await.unwrap_err() {
3650            EngineError::Unavailable { op } => assert_eq!(op, "issue_grant_and_claim"),
3651            other => panic!("expected Unavailable, got {other:?}"),
3652        }
3653    }
3654}