Skip to main content

ff_core/
engine_backend.rs

1//! The `EngineBackend` trait — abstracting FlowFabric's write surface.
2//!
3//! **RFC-012 Stage 1a:** this is the trait landing. The
4//! Valkey-backed impl lives in `ff-backend-valkey`; future backends
5//! (Postgres) add a sibling crate with their own impl. ff-sdk's
6//! `FlowFabricWorker` gains `connect_with(backend)` /
7//! `backend(&self)` accessors so consumers that want to bring their
8//! own backend (tests, future non-Valkey deployments) can hand one
9//! in. The hot-path migration of `ClaimedTask` / `FlowFabricWorker`
10//! to forward through the trait lands across Stages 1b-1d.
11//!
12//! # Object safety
13//!
14//! `EngineBackend` is object-safe: all methods are `async fn` behind
15//! `#[async_trait]` and take `&self`. Consumers can hold
16//! `Arc<dyn EngineBackend>` for heterogenous-backend deployments.
17//! The trait is `Send + Sync + 'static` per RFC-012 §4.1; every impl
18//! must honour that bound.
19//!
20//! # Error surface
21//!
22//! Every method returns [`Result<_, EngineError>`]. `EngineError`'s
23//! `Transport` variant carries a boxed `dyn Error + Send + Sync`;
24//! Valkey-backed transport faults box a
25//! `ff_script::error::ScriptError` (downcast via
26//! `ff_script::engine_error_ext::transport_script_ref`). Other
27//! backends box their native error type and set the `backend` tag
28//! accordingly.
29//!
30//! # Atomicity contract
31//!
32//! Per-op state transitions MUST be atomic (RFC-012 §3.4). On Valkey
33//! this is the single-FCALL-per-op property; on Postgres it is the
34//! per-transaction property. A backend that cannot honour atomicity
35//! for a given op either MUST NOT implement `EngineBackend` or MUST
36//! return `EngineError::Unavailable { op }` for the affected method.
37//!
38//! # Replay semantics
39//!
40//! `complete`, `fail`, `cancel`, `suspend`, `delay`, `wait_children`
41//! are idempotent under replay — calling twice with the same handle
42//! and args returns the same outcome (success on first call, typed
43//! `State` / `Contention` on subsequent calls where the fence triple
44//! no longer matches a live lease).
45
46use std::time::Duration;
47
48use async_trait::async_trait;
49
50use crate::backend::{
51    AppendFrameOutcome, CancelFlowPolicy, CancelFlowWait, CapabilitySet, ClaimPolicy,
52    FailOutcome, FailureClass, FailureReason, Frame, Handle, LeaseRenewal, PendingWaitpoint,
53    PrepareOutcome, ResumeSignal, ResumeToken,
54};
55// `SummaryDocument` and `TailVisibility` are referenced only inside
56// `#[cfg(feature = "streaming")]` trait methods below, so the imports
57// must be gated to avoid an unused-imports warning on the non-streaming
58// build.
59#[cfg(feature = "streaming")]
60use crate::backend::{SummaryDocument, TailVisibility};
61use crate::contracts::{
62    CancelFlowResult, ExecutionContext, ExecutionSnapshot, FlowSnapshot, IssueReclaimGrantArgs,
63    IssueReclaimGrantOutcome, ReclaimExecutionArgs, ReclaimExecutionOutcome, ReportUsageResult,
64    RotateWaitpointHmacSecretAllArgs, RotateWaitpointHmacSecretAllResult, SeedOutcome,
65    SeedWaitpointHmacSecretArgs, SuspendArgs, SuspendOutcome,
66};
67#[cfg(feature = "core")]
68use crate::contracts::{
69    AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
70    ApplyDependencyToChildResult, BudgetStatus, CancelExecutionArgs, CancelExecutionResult,
71    CancelFlowArgs, ChangePriorityArgs, ChangePriorityResult, ClaimExecutionArgs,
72    ClaimExecutionResult, ClaimForWorkerArgs, ClaimForWorkerOutcome, ClaimResumedExecutionArgs,
73    ClaimResumedExecutionResult,
74    BlockExecutionForAdmissionArgs, BlockExecutionForAdmissionOutcome, BlockRouteArgs,
75    BlockRouteOutcome, CheckAdmissionArgs, CheckAdmissionResult,
76    ClaimGrantOutcome,
77    CompleteExecutionArgs, CompleteExecutionResult, CreateBudgetArgs, CreateBudgetResult,
78    CreateExecutionArgs, CreateExecutionResult, CreateFlowArgs, CreateFlowResult,
79    CreateQuotaPolicyArgs, CreateQuotaPolicyResult,
80    DeliverApprovalSignalArgs, DeliverSignalArgs, DeliverSignalResult, EdgeDirection, EdgeSnapshot,
81    EvaluateFlowEligibilityArgs, EvaluateFlowEligibilityResult, ExecutionInfo,
82    FailExecutionArgs, FailExecutionResult,
83    IssueClaimGrantArgs, IssueClaimGrantOutcome, IssueGrantAndClaimArgs,
84    BudgetUsageAndLimits, QuotaPolicyLimits, RecordSpendArgs, ReleaseAdmissionArgs,
85    ReleaseAdmissionResult,
86    ReleaseBudgetArgs, ScanEligibleArgs,
87    ListExecutionsPage, ListFlowsPage, ListLanesPage, ListPendingWaitpointsArgs,
88    ListPendingWaitpointsResult, ListSuspendedPage, RenewLeaseArgs, RenewLeaseResult,
89    ReplayExecutionArgs, ReplayExecutionResult,
90    ReportUsageAdminArgs, ResetBudgetArgs, ResetBudgetResult, ResumeExecutionArgs,
91    ResumeExecutionResult, RevokeLeaseArgs, RevokeLeaseResult,
92    StageDependencyEdgeArgs, StageDependencyEdgeResult,
93};
94// RFC-025 worker registry.
95#[cfg(feature = "core")]
96use crate::contracts::{
97    HeartbeatWorkerArgs, HeartbeatWorkerOutcome, ListWorkersArgs, ListWorkersResult,
98    MarkWorkerDeadArgs, MarkWorkerDeadOutcome, RegisterWorkerArgs, RegisterWorkerOutcome,
99};
100#[cfg(feature = "suspension")]
101use crate::contracts::{ListExpiredLeasesArgs, ListExpiredLeasesResult};
102#[cfg(feature = "core")]
103use crate::state::PublicState;
104#[cfg(feature = "core")]
105use crate::partition::PartitionKey;
106#[cfg(feature = "streaming")]
107use crate::contracts::{StreamCursor, StreamFrames};
108use crate::engine_error::EngineError;
109#[cfg(feature = "core")]
110use crate::types::EdgeId;
111#[cfg(feature = "core")]
112use crate::types::WaitpointId;
113use crate::types::{AttemptIndex, BudgetId, ExecutionId, FlowId, LaneId, LeaseFence, TimestampMs};
114
115/// The engine write surface — a single trait a backend implementation
116/// honours to serve a `FlowFabricWorker`.
117///
118/// See RFC-012 §3.1 for the inventory rationale and §3.3 for the
119/// type-level shape. 16 methods (Round-7 added `create_waitpoint`;
120/// `append_frame` return widened; `report_usage` return replaced —
121/// RFC-012 §R7). Issue #150 added the two trigger-surface methods
122/// (`deliver_signal` / `claim_resumed_execution`).
123///
124/// # Note on `complete` payload shape
125///
126/// The RFC §3.3 sketch uses `Option<Bytes>`; the Stage 1a trait uses
127/// `Option<Vec<u8>>` to match the existing
128/// `ff_sdk::ClaimedTask::complete` signature and avoid adding a
129/// `bytes` public-type dep for zero consumer benefit. Round-4 §7.17
130/// resolved the payload container debate to `Box<[u8]>` in the
131/// public type (see `HandleOpaque`); `Option<Vec<u8>>` is the
132/// zero-churn choice consistent with today's code. Consumers that
133/// need `&[u8]` can borrow via `.as_deref()` on the Option.
134#[async_trait]
135pub trait EngineBackend: Send + Sync + 'static {
136    // ── Claim + lifecycle ──
137
138    /// Fresh-work claim. Returns `Ok(None)` when no work is currently
139    /// available; `Err` only on transport or input-validation faults.
140    async fn claim(
141        &self,
142        lane: &LaneId,
143        capabilities: &CapabilitySet,
144        policy: ClaimPolicy,
145    ) -> Result<Option<Handle>, EngineError>;
146
147    /// Renew a held lease. Returns the updated expiry + epoch on
148    /// success; typed `State::StaleLease` / `State::LeaseExpired`
149    /// when the lease has been stolen or timed out.
150    async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError>;
151
152    /// Numeric-progress heartbeat.
153    ///
154    /// Writes scalar `progress_percent` / `progress_message` fields on
155    /// `exec_core`; each call overwrites the previous value. This does
156    /// NOT append to the output stream — stream-frame producers must use
157    /// [`append_frame`](Self::append_frame) instead.
158    async fn progress(
159        &self,
160        handle: &Handle,
161        percent: Option<u8>,
162        message: Option<String>,
163    ) -> Result<(), EngineError>;
164
165    /// Append one stream frame. Distinct from [`progress`](Self::progress)
166    /// per RFC-012 §3.1.1 K#6. Returns the backend-assigned stream entry
167    /// id and post-append frame count (RFC-012 §R7.2.1).
168    ///
169    /// Stream-frame producers (arbitrary `frame_type` + payload, consumed
170    /// via the read/tail surfaces) MUST use this method rather than
171    /// [`progress`](Self::progress); the latter updates scalar fields on
172    /// `exec_core` and is invisible to stream consumers.
173    async fn append_frame(
174        &self,
175        handle: &Handle,
176        frame: Frame,
177    ) -> Result<AppendFrameOutcome, EngineError>;
178
179    /// Terminal success. Borrows `handle` (round-4 M-D2) so callers
180    /// can retry under `EngineError::Transport` without losing the
181    /// cookie. Payload is `Option<Vec<u8>>` per the note above.
182    async fn complete(&self, handle: &Handle, payload: Option<Vec<u8>>) -> Result<(), EngineError>;
183
184    /// Terminal failure with classification. Returns [`FailOutcome`]
185    /// so the caller learns whether a retry was scheduled.
186    async fn fail(
187        &self,
188        handle: &Handle,
189        reason: FailureReason,
190        classification: FailureClass,
191    ) -> Result<FailOutcome, EngineError>;
192
193    /// Cooperative cancel by the worker holding the lease.
194    async fn cancel(&self, handle: &Handle, reason: &str) -> Result<(), EngineError>;
195
196    /// Suspend the execution awaiting a typed resume condition
197    /// (RFC-013 Stage 1d).
198    ///
199    /// Borrows `handle` (round-4 M-D2). Terminal-looking behaviour is
200    /// expressed through [`SuspendOutcome`]:
201    ///
202    /// * [`SuspendOutcome::Suspended`] — the pre-suspend handle is
203    ///   logically invalidated; the fresh `HandleKind::Suspended`
204    ///   handle inside the variant supersedes it. Runtime enforcement
205    ///   via the fence triple: subsequent ops against the stale handle
206    ///   surface as `Contention(LeaseConflict)`.
207    /// * [`SuspendOutcome::AlreadySatisfied`] — buffered signals on a
208    ///   pending waitpoint already matched the resume condition at
209    ///   suspension time. The lease is NOT released; the caller's
210    ///   pre-suspend handle remains valid.
211    ///
212    /// See RFC-013 §2 for the type shapes, §3 for the replay /
213    /// idempotency contract, §4 for the error taxonomy.
214    async fn suspend(
215        &self,
216        handle: &Handle,
217        args: SuspendArgs,
218    ) -> Result<SuspendOutcome, EngineError>;
219
220    /// Suspend by execution id + lease fence triple, for service-layer
221    /// callers that hold a run record / lease-claim descriptor but no
222    /// worker [`Handle`] (cairn issue #322).
223    ///
224    /// Semantics mirror [`Self::suspend`] exactly — the same
225    /// [`SuspendArgs`] validation, the same [`SuspendOutcome`]
226    /// lifecycle, the same RFC-013 §3 dedup / replay contract. The
227    /// only difference is the fencing source: instead of the
228    /// `(lease_id, lease_epoch, attempt_id)` fields embedded in a
229    /// `Handle`, the backend fences against the triple passed directly.
230    /// Attempt-index, lane, and worker-instance metadata that
231    /// [`Self::suspend`] reads from the handle payload are recovered
232    /// from the backend's authoritative execution record (Valkey:
233    /// `exec_core` HGETs; Postgres: `ff_attempt` row lookup).
234    ///
235    /// The default impl returns [`EngineError::Unavailable`] so
236    /// existing backend impls remain non-breaking. Production backends
237    /// (Valkey, Postgres) override.
238    async fn suspend_by_triple(
239        &self,
240        exec_id: ExecutionId,
241        triple: LeaseFence,
242        args: SuspendArgs,
243    ) -> Result<SuspendOutcome, EngineError> {
244        let _ = (exec_id, triple, args);
245        Err(EngineError::Unavailable {
246            op: "suspend_by_triple",
247        })
248    }
249
250    /// Issue a pending waitpoint for future signal delivery.
251    ///
252    /// Waitpoints have two states in the Valkey wire contract:
253    /// **pending** (token issued, not yet backing a suspension) and
254    /// **active** (bound to a suspension). This method creates a
255    /// waitpoint in the **pending** state. A later `suspend` call
256    /// transitions a pending waitpoint to active (see Lua
257    /// `use_pending_waitpoint` ARGV flag at
258    /// `flowfabric.lua:3603,3641,3690`) — or, if buffered signals
259    /// already satisfy its condition, the suspend call returns
260    /// `SuspendOutcome::AlreadySatisfied` and the waitpoint activates
261    /// without ever releasing the lease.
262    ///
263    /// Pending-waitpoint expiry is a first-class terminal error on
264    /// the wire (`PendingWaitpointExpired` at
265    /// `ff-script/src/error.rs:170,403-408`). The attempt retains its
266    /// lease while the waitpoint is pending; signals delivered to
267    /// this waitpoint are buffered server-side (RFC-012 §R7.2.2).
268    async fn create_waitpoint(
269        &self,
270        handle: &Handle,
271        waitpoint_key: &str,
272        expires_in: Duration,
273    ) -> Result<PendingWaitpoint, EngineError>;
274
275    /// Read the HMAC token stored on a waitpoint record, keyed by
276    /// `(partition, waitpoint_id)`.
277    ///
278    /// Returns `Ok(Some(token))` when the waitpoint exists and carries
279    /// a token, `Ok(None)` when the waitpoint does not exist or no
280    /// token field is present. A missing waitpoint is not an error —
281    /// signals can legitimately arrive after a waitpoint has been
282    /// consumed or expired, and the signal-bridge authenticates on the
283    /// presence of a matching token, not on the waitpoint's liveness.
284    ///
285    /// # Use case
286    ///
287    /// Control-plane signal delivery (cairn signal-bridge): at
288    /// signal-resume time the bridge reads the token off the
289    /// waitpoint hash / row to authenticate the resume request it
290    /// subsequently issues. Previously implemented as direct
291    /// `ferriskey::Client::hget(waitpoint_key, "waitpoint_token")` —
292    /// Valkey-only. This method routes the read through the trait so
293    /// the pattern works on Postgres + SQLite as well.
294    ///
295    /// # Per-backend shape
296    ///
297    /// * **Valkey** — `HGET ff:wp:<tag>:<waitpoint_id> waitpoint_token`
298    ///   on the waitpoint's partition. Empty string / missing field
299    ///   maps to `None`.
300    /// * **Postgres** — `SELECT token FROM ff_waitpoint_pending
301    ///   WHERE partition_key = $1 AND waitpoint_id = $2 LIMIT 1`.
302    ///   Row-absent → `None`; empty `token` → `None`.
303    /// * **SQLite** — same shape as Postgres.
304    ///
305    /// The `partition` argument is the opaque [`PartitionKey`]
306    /// produced by FlowFabric (typically extracted from the
307    /// `Handle` / `ResumeToken` the waitpoint was minted against).
308    ///
309    /// # Default impl
310    ///
311    /// Returns [`EngineError::Unavailable`] with
312    /// `op = "read_waitpoint_token"` so out-of-tree backends and
313    /// in-tree backends not yet overriding this method continue to
314    /// compile. Mirrors the precedent used by
315    /// [`Self::issue_reclaim_grant`] / [`Self::reclaim_execution`].
316    #[cfg(feature = "core")]
317    async fn read_waitpoint_token(
318        &self,
319        _partition: PartitionKey,
320        _waitpoint_id: &WaitpointId,
321    ) -> Result<Option<String>, EngineError> {
322        Err(EngineError::Unavailable {
323            op: "read_waitpoint_token",
324        })
325    }
326
327    /// Non-mutating observation of signals that satisfied the handle's
328    /// resume condition.
329    async fn observe_signals(&self, handle: &Handle) -> Result<Vec<ResumeSignal>, EngineError>;
330
331    /// Consume a resume grant (via [`ResumeToken`]) to mint a
332    /// resumed-kind handle. Routes to `ff_claim_resumed_execution` on
333    /// Valkey / the epoch-bump reconciler on PG/SQLite. Returns
334    /// `Ok(None)` when the grant's target execution is no longer
335    /// resumable (already reclaimed, terminal, etc.).
336    ///
337    /// **Renamed from `claim_from_reclaim` (RFC-024 PR-B+C).** The
338    /// pre-rename name advertised "reclaim" but the semantic has
339    /// always been resume-after-suspend. The new lease-reclaim path
340    /// lives on [`Self::reclaim_execution`].
341    async fn claim_from_resume_grant(
342        &self,
343        token: ResumeToken,
344    ) -> Result<Option<Handle>, EngineError>;
345
346    /// Issue a lease-reclaim grant (RFC-024 §3.2). Admits executions
347    /// in `lease_expired_reclaimable` or `lease_revoked` state to the
348    /// reclaim path; the returned [`IssueReclaimGrantOutcome::Granted`]
349    /// carries a [`crate::contracts::ReclaimGrant`] which is then fed
350    /// to [`Self::reclaim_execution`] to mint a fresh attempt.
351    ///
352    /// Default impl returns [`EngineError::Unavailable`] — PR-D (PG),
353    /// PR-E (SQLite), and PR-F (Valkey) override with real bodies.
354    async fn issue_reclaim_grant(
355        &self,
356        _args: IssueReclaimGrantArgs,
357    ) -> Result<IssueReclaimGrantOutcome, EngineError> {
358        Err(EngineError::Unavailable {
359            op: "issue_reclaim_grant",
360        })
361    }
362
363    /// Consume a [`crate::contracts::ReclaimGrant`] to mint a fresh
364    /// attempt for a previously lease-expired / lease-revoked
365    /// execution (RFC-024 §3.2). Creates a new attempt row, bumps the
366    /// execution's `lease_reclaim_count`, and mints a
367    /// [`crate::backend::HandleKind::Reclaimed`] handle.
368    ///
369    /// Default impl returns [`EngineError::Unavailable`] — PR-D (PG),
370    /// PR-E (SQLite), and PR-F (Valkey) override with real bodies.
371    async fn reclaim_execution(
372        &self,
373        _args: ReclaimExecutionArgs,
374    ) -> Result<ReclaimExecutionOutcome, EngineError> {
375        Err(EngineError::Unavailable {
376            op: "reclaim_execution",
377        })
378    }
379
380    // Round-5 amendment: lease-releasing peers of `suspend`.
381
382    /// Park the execution until `delay_until`, releasing the lease.
383    async fn delay(&self, handle: &Handle, delay_until: TimestampMs) -> Result<(), EngineError>;
384
385    /// Mark the execution as waiting for its child flow to complete,
386    /// releasing the lease.
387    async fn wait_children(&self, handle: &Handle) -> Result<(), EngineError>;
388
389    // ── Read / admin ──
390
391    /// Snapshot an execution by id. `Ok(None)` ⇒ no such execution.
392    async fn describe_execution(
393        &self,
394        id: &ExecutionId,
395    ) -> Result<Option<ExecutionSnapshot>, EngineError>;
396
397    /// Point-read of the execution-scoped `(input_payload,
398    /// execution_kind, tags)` bundle used by the SDK worker when
399    /// assembling a `ClaimedTask` (see `ff_sdk::ClaimedTask`) after a
400    /// successful claim.
401    ///
402    /// No default impl — every `EngineBackend` must answer this
403    /// explicitly. Distinct from [`Self::describe_execution`]
404    /// (read-model projection) because the SDK needs the raw payload
405    /// bytes + kind + tags immediately post-claim, and the snapshot
406    /// projection deliberately omits the payload bytes.
407    ///
408    /// Per-backend shape:
409    ///
410    /// * **Valkey** — pipelined `GET :payload` + `HGETALL :core`
411    ///   + `HGETALL :tags` on the execution's partition (same pattern
412    ///   as [`Self::describe_execution`]).
413    /// * **Postgres** — single `SELECT payload, raw_fields` on
414    ///   `ff_exec_core` keyed by `(partition_key, execution_id)`;
415    ///   `execution_kind` + `tags` live in `raw_fields` JSONB.
416    /// * **SQLite** — identical shape to Postgres.
417    ///
418    /// Returns [`EngineError::Validation { kind: ValidationKind::InvalidInput, .. }`](crate::engine_error::EngineError::Validation)
419    /// when the execution does not exist — the SDK worker only calls
420    /// this after a successful claim, so a missing row is a loud
421    /// storage-tier invariant violation rather than a routine `Ok(None)`.
422    async fn read_execution_context(
423        &self,
424        execution_id: &ExecutionId,
425    ) -> Result<ExecutionContext, EngineError>;
426
427    /// Point-read of the execution's current attempt-index **pointer**
428    /// — the index of the currently-leased attempt row.
429    ///
430    /// Distinct from [`Self::read_total_attempt_count`]: this method
431    /// names the attempt that *already exists* (pointer), whereas
432    /// `read_total_attempt_count` is the monotonic claim counter used
433    /// to compute the next fresh attempt index. See the sibling's
434    /// rustdoc for the retry-path scenario that motivates the split.
435    ///
436    /// Used on the SDK worker's `claim_from_resume_grant` path —
437    /// specifically the private `claim_resumed_execution` helper —
438    /// immediately before dispatching [`Self::claim_resumed_execution`].
439    /// The returned index is fed into
440    /// [`ClaimResumedExecutionArgs::current_attempt_index`](crate::contracts::ClaimResumedExecutionArgs)
441    /// so the backend's script / transaction targets the correct
442    /// existing attempt row (KEYS[6] on Valkey; `ff_attempt` PK tuple
443    /// on PG/SQLite).
444    ///
445    /// Per-backend shape:
446    ///
447    /// * **Valkey** — `HGET {exec}:core current_attempt_index` on the
448    ///   execution's partition. Single command. Both the
449    ///   **missing-field** case (`exec_core` present but
450    ///   `current_attempt_index` absent or empty-string, i.e. pre-claim
451    ///   state) **and** the **missing-row** case (no `exec_core` hash
452    ///   at all) read back as `AttemptIndex(0)`. This preserves the
453    ///   pre-PR-3 inline-`HGET` semantic and is safe because Valkey's
454    ///   happy path requires `exec_core` to exist before this method
455    ///   is reached — the SDK only calls `read_current_attempt_index`
456    ///   post-grant, and grant issuance is gated on `exec_core`
457    ///   presence. A genuinely absent row would surface as the proper
458    ///   business-logic error (`NotAResumedExecution` /
459    ///   `ExecutionNotLeaseable`) on the downstream FCALL.
460    /// * **Postgres** — `SELECT attempt_index FROM ff_exec_core
461    ///   WHERE partition_key = $1 AND execution_id = $2`. The column
462    ///   is `NOT NULL DEFAULT 0` so a pre-claim row reads back as `0`
463    ///   (matching Valkey's missing-field case). **Missing row**
464    ///   surfaces as [`EngineError::Validation { kind:
465    ///   ValidationKind::InvalidInput, .. }`](crate::engine_error::EngineError::Validation)
466    ///   — diverges from Valkey's missing-row `→ 0` mapping.
467    /// * **SQLite** — `SELECT attempt_index FROM ff_exec_core
468    ///   WHERE partition_key = ? AND execution_id = ?`; identical
469    ///   semantics to Postgres (missing-row → `InvalidInput`).
470    ///
471    /// **Cross-backend asymmetry on missing row is intentional.** The
472    /// SDK happy path never observes it (grant issuance on Valkey
473    /// requires `exec_core`, and PG/SQLite currently return
474    /// `Unavailable` from `claim_from_grant` per
475    /// `project_claim_from_grant_pg_sqlite_gap.md`). Consumers writing
476    /// backend-agnostic tooling against this method directly must
477    /// treat the missing-row case as backend-dependent — match on
478    /// `InvalidInput` for PG/SQLite, and treat an unexpected `0` as
479    /// the Valkey equivalent signal.
480    ///
481    /// The default impl returns [`EngineError::Unavailable`] so the
482    /// trait addition is non-breaking for out-of-tree backends (same
483    /// precedent as [`Self::read_execution_context`] landing in v0.12
484    /// PR-1).
485    async fn read_current_attempt_index(
486        &self,
487        _execution_id: &ExecutionId,
488    ) -> Result<AttemptIndex, EngineError> {
489        Err(EngineError::Unavailable {
490            op: "read_current_attempt_index",
491        })
492    }
493
494    /// Point-read of the execution's **total attempt counter** — the
495    /// monotonic count of claims that have ever fired against this
496    /// execution (including the in-flight one once claimed).
497    ///
498    /// Used on the SDK worker's `claim_from_grant` / `claim_execution`
499    /// path — the next attempt-index for a fresh claim is this
500    /// counter's current value (so `1` on the second retry after the
501    /// first attempt failed terminally). This is semantically distinct
502    /// from [`Self::read_current_attempt_index`], which is a *pointer*
503    /// at the currently-leased attempt row and is only meaningful on
504    /// the `claim_from_resume_grant` path (where a live attempt already
505    /// exists and we want to re-seat its lease rather than mint a new
506    /// attempt row).
507    ///
508    /// Reading the pointer on the `claim_from_grant` path was a live
509    /// bug: on the retry-of-a-retry scenario the pointer still named
510    /// the *previous* terminal-failed attempt, so the newly-minted
511    /// attempt collided with it (Valkey KEYS[6]) or mis-targeted the
512    /// PG/SQLite `ff_attempt` PK tuple. This method fixes that by
513    /// reading the counter that Lua 5920 / PG `ff_claim_execution` /
514    /// SQLite `claim_impl` all already consult when computing the
515    /// next attempt index.
516    ///
517    /// Per-backend shape:
518    ///
519    /// * **Valkey** — `HGET {exec}:core total_attempt_count` on the
520    ///   execution's partition. Single command; pre-claim read (field
521    ///   absent or empty) maps to `0`.
522    /// * **Postgres** — `SELECT raw_fields->>'total_attempt_count'
523    ///   FROM ff_exec_core WHERE (partition_key, execution_id) = ...`.
524    ///   The field lives in the JSONB `raw_fields` bag rather than a
525    ///   dedicated column (mirrors how `create_execution_impl` seeds
526    ///   it on row creation). Missing row → `InvalidInput`; missing
527    ///   field → `0`.
528    /// * **SQLite** — `SELECT CAST(json_extract(raw_fields,
529    ///   '$.total_attempt_count') AS INTEGER) FROM ff_exec_core
530    ///   WHERE ...`. Same JSON-in-`raw_fields` shape as PG; uses the
531    ///   same `json_extract` idiom already employed in
532    ///   `ff-backend-sqlite/src/queries/operator.rs` for replay_count.
533    ///
534    /// The default impl returns [`EngineError::Unavailable`] so the
535    /// trait addition is non-breaking for out-of-tree backends (same
536    /// precedent as [`Self::read_current_attempt_index`] landing in
537    /// v0.12 PR-3).
538    async fn read_total_attempt_count(
539        &self,
540        _execution_id: &ExecutionId,
541    ) -> Result<AttemptIndex, EngineError> {
542        Err(EngineError::Unavailable {
543            op: "read_total_attempt_count",
544        })
545    }
546
547    /// Snapshot a flow by id. `Ok(None)` ⇒ no such flow.
548    async fn describe_flow(&self, id: &FlowId) -> Result<Option<FlowSnapshot>, EngineError>;
549
550    // ── Namespaced tag point-writes / reads (issue #433) ──
551
552    /// Set a single namespaced tag on an execution. Tag `key` MUST match
553    /// the reserved caller-namespace pattern `^[a-z][a-z0-9_]*\.[a-z0-9_][a-z0-9_.]*$` —
554    /// i.e. `<caller>.<field>` — or the call returns
555    /// [`EngineError::Validation { kind: ValidationKind::InvalidInput, .. }`](crate::engine_error::EngineError::Validation)
556    /// with the offending key in `detail`. `value` is arbitrary UTF-8.
557    ///
558    /// The namespace prefix is carried inline in `key` (e.g.
559    /// `"cairn.session_id"`) — there is no separate `namespace` arg.
560    /// This matches the existing `ff_set_execution_tags` wire shape and
561    /// the flow-tag projection in [`ExecutionSnapshot::tags`].
562    ///
563    /// Validation is performed by each overriding backend impl via
564    /// [`validate_tag_key`] **before** the wire hop so PG / SQLite /
565    /// Valkey reject the same set of keys. The default trait impl
566    /// returns [`EngineError::Unavailable`] without running validation
567    /// — there is no meaningful storage to validate against on an
568    /// unsupported backend, and surfacing `Unavailable` before
569    /// `Validation` matches the precedence used elsewhere on the trait.
570    /// Backends MAY additionally validate on the storage tier (Valkey's
571    /// Lua path does, with a more permissive prefix-only check).
572    ///
573    /// Per-backend shape:
574    ///
575    /// * **Valkey** — `ff_set_execution_tags` FCALL with a single
576    ///   `{key → value}` pair. Routes through the existing Lua
577    ///   contract (no new wire format).
578    /// * **Postgres** — `UPDATE ff_exec_core SET raw_fields = jsonb_set(
579    ///   coalesce(raw_fields, '{}'::jsonb), '{tags,<key>}', to_jsonb($value))
580    ///   WHERE (partition_key, execution_id) = ...`. Same storage shape
581    ///   read by [`Self::describe_execution`] / [`Self::read_execution_context`].
582    /// * **SQLite** — `UPDATE ff_exec_core SET raw_fields = json_set(
583    ///   coalesce(raw_fields, '{}'), '$.tags."<key>"', $value) WHERE ...`.
584    ///   The key is quoted in the JSON path so dots inside the
585    ///   namespaced key (e.g. `cairn.session_id`) are treated as a
586    ///   single literal member name rather than JSON-path separators —
587    ///   yielding the same flat `raw_fields.tags` shape as PG.
588    ///
589    /// Missing execution surfaces as
590    /// [`EngineError::NotFound { entity: "execution" }`](crate::engine_error::EngineError::NotFound)
591    /// — matches the Valkey FCALL's `execution_not_found` mapping and
592    /// the existing `ScriptError::ExecutionNotFound` → `EngineError`
593    /// conversion (`ff_script::engine_error_ext`).
594    ///
595    /// The default impl returns [`EngineError::Unavailable`] so the
596    /// trait addition is non-breaking for out-of-tree backends.
597    async fn set_execution_tag(
598        &self,
599        _execution_id: &ExecutionId,
600        _key: &str,
601        _value: &str,
602    ) -> Result<(), EngineError> {
603        Err(EngineError::Unavailable {
604            op: "set_execution_tag",
605        })
606    }
607
608    /// Set a single namespaced tag on a flow. Same namespace rule as
609    /// [`Self::set_execution_tag`]: `key` MUST match
610    /// `^[a-z][a-z0-9_]*\.[a-z0-9_][a-z0-9_.]*$`.
611    ///
612    /// Per-backend shape:
613    ///
614    /// * **Valkey** — `ff_set_flow_tags` FCALL with a single pair.
615    ///   Tags land on the dedicated `ff:flow:{fp:N}:<flow_id>:tags`
616    ///   hash, not on the `flow_core` hash (diverges from the
617    ///   execution shape — execution tags live on `ff:exec:...:tags`
618    ///   by the same split). **Lazy migration on first write**: the
619    ///   Lua (`ff_script::flowfabric.lua`, `ff_set_flow_tags`) scans
620    ///   `flow_core` once per flow for pre-58.4 inline namespaced
621    ///   fields (anything matching `^[a-z][a-z0-9_]*\.`), HSETs them
622    ///   onto `:tags`, HDELs them from `flow_core`, and stamps
623    ///   `tags_migrated=1` on `flow_core` so subsequent calls
624    ///   short-circuit to O(1). This heals flows created before
625    ///   RFC-058.4 landed; well-formed flows pay the migration cost
626    ///   only on their very first tag write. Callers MUST read tags
627    ///   via [`Self::get_flow_tag`] (`HGET :tags <key>`) — direct
628    ///   `HGETALL` against `flow_core` will not see post-migration
629    ///   values.
630    ///
631    ///   **Cross-backend parity caveat on `describe_flow`**: the
632    ///   pre-existing `ValkeyBackend::describe_flow` /
633    ///   `FlowSnapshot::tags` read path snapshots `flow_core` fields
634    ///   only and does NOT today merge the `:tags` sub-hash, whereas
635    ///   Postgres `describe_flow` DOES surface flow tags via
636    ///   `ff_backend_postgres::flow::extract_tags` (which reads them
637    ///   off `raw_fields` — the same store `set_flow_tag` writes on
638    ///   PG). Trait consumers MUST NOT assume a tag written here
639    ///   will be visible via `describe_flow` on every backend: on
640    ///   Valkey, callers that need the full tag set should
641    ///   complement the snapshot with per-key [`Self::get_flow_tag`]
642    ///   reads. Extending Valkey `describe_flow` to merge `:tags`
643    ///   is additive and out of scope for this trait addition.
644    /// * **Postgres** — `UPDATE ff_flow_core SET raw_fields =
645    ///   jsonb_set(..., '{<key>}', ...)` — flow tags are stored as
646    ///   top-level `raw_fields` keys (matches
647    ///   `ff_backend_postgres::flow::extract_tags`). No `tags` nesting
648    ///   on flows, which diverges from the execution shape.
649    /// * **SQLite** — mirrors PG: `UPDATE ff_flow_core SET raw_fields =
650    ///   json_set(..., '$."<key>"', $value) WHERE ...`. The key is
651    ///   quoted so the dotted namespaced key lands as a single flat
652    ///   top-level member of `raw_fields`.
653    ///
654    /// Missing flow surfaces as
655    /// [`EngineError::NotFound { entity: "flow" }`](crate::engine_error::EngineError::NotFound)
656    /// (matches the Valkey FCALL's `flow_not_found` mapping).
657    ///
658    /// The default impl returns [`EngineError::Unavailable`].
659    async fn set_flow_tag(
660        &self,
661        _flow_id: &FlowId,
662        _key: &str,
663        _value: &str,
664    ) -> Result<(), EngineError> {
665        Err(EngineError::Unavailable {
666            op: "set_flow_tag",
667        })
668    }
669
670    /// Read a single namespaced execution tag. Returns `Ok(None)` when
671    /// the tag is absent **or** the execution row does not exist —
672    /// the two cases are not distinguished on the read path. Callers
673    /// that need to distinguish should call [`Self::describe_execution`]
674    /// first (an `Ok(None)` from that method proves the execution is
675    /// absent). This matches Valkey's native `HGET` semantics and
676    /// keeps the read path at a single round-trip on every backend.
677    ///
678    /// `key` must pass [`validate_tag_key`] — a malformed key can
679    /// never be present in storage so the call short-circuits with
680    /// [`EngineError::Validation { kind: ValidationKind::InvalidInput, .. }`](crate::engine_error::EngineError::Validation)
681    /// rather than round-tripping.
682    ///
683    /// Per-backend shape:
684    ///
685    /// * **Valkey** — `HGET :tags <key>` on the execution's partition.
686    /// * **Postgres** — `SELECT raw_fields->'tags'->><key> FROM ff_exec_core
687    ///   WHERE ...` with `fetch_optional` → missing row collapses to `None`.
688    /// * **SQLite** — `SELECT json_extract(raw_fields, '$.tags."<key>"')
689    ///   FROM ff_exec_core WHERE ...` with the same collapse. The key is
690    ///   quoted in the JSON path so dotted namespaced keys resolve to
691    ///   the flat literal member written by `set_execution_tag`.
692    ///
693    /// The default impl returns [`EngineError::Unavailable`].
694    async fn get_execution_tag(
695        &self,
696        _execution_id: &ExecutionId,
697        _key: &str,
698    ) -> Result<Option<String>, EngineError> {
699        Err(EngineError::Unavailable {
700            op: "get_execution_tag",
701        })
702    }
703
704    /// Read an execution's `namespace` scalar. Returns `Ok(None)` when
705    /// the row is absent or the field is unset. Dedicated point-read
706    /// used by the scanner per-candidate filter (`should_skip_candidate`)
707    /// to preserve the 1-HGET cost contract documented in
708    /// `ff_engine::scanner::should_skip_candidate` — `describe_execution`
709    /// is heavier (HGETALL / full snapshot) and unnecessary when only
710    /// the namespace scalar is needed.
711    ///
712    /// Per-backend shape:
713    ///
714    /// * **Valkey** — `HGET :core namespace` on the execution's partition
715    ///   (single field read on the already-hot exec_core hash).
716    /// * **Postgres** — `SELECT raw_fields->>'namespace' FROM ff_exec_core
717    ///   WHERE partition_key = $1 AND execution_id = $2`.
718    /// * **SQLite** — `SELECT json_extract(raw_fields, '$.namespace')
719    ///   FROM ff_exec_core WHERE ...`.
720    ///
721    /// The default impl returns [`EngineError::Unavailable`].
722    async fn get_execution_namespace(
723        &self,
724        _execution_id: &ExecutionId,
725    ) -> Result<Option<String>, EngineError> {
726        Err(EngineError::Unavailable {
727            op: "get_execution_namespace",
728        })
729    }
730
731    /// Read a single namespaced flow tag. Returns `Ok(None)` when
732    /// the tag is absent **or** the flow row does not exist (same
733    /// collapse semantics as [`Self::get_execution_tag`]). Symmetry
734    /// partner — consumers like cairn read `cairn.session_id` off
735    /// flows for archival.
736    ///
737    /// `key` must pass [`validate_tag_key`].
738    ///
739    /// Per-backend shape:
740    ///
741    /// * **Valkey** — `HGET :tags <key>` on the flow's partition.
742    /// * **Postgres** — `SELECT raw_fields->><key> FROM ff_flow_core
743    ///   WHERE ...` (top-level `raw_fields` key, matches the flow-tag
744    ///   storage shape).
745    /// * **SQLite** — `SELECT json_extract(raw_fields, '$."<key>"')
746    ///   FROM ff_flow_core WHERE ...` (quoted key — see
747    ///   `set_flow_tag`).
748    ///
749    /// The default impl returns [`EngineError::Unavailable`].
750    async fn get_flow_tag(
751        &self,
752        _flow_id: &FlowId,
753        _key: &str,
754    ) -> Result<Option<String>, EngineError> {
755        Err(EngineError::Unavailable {
756            op: "get_flow_tag",
757        })
758    }
759
760    /// List dependency edges adjacent to an execution. Read-only; the
761    /// backend resolves the subject execution's flow, reads the
762    /// direction-specific adjacency SET, and decodes each member's
763    /// flow-scoped `edge:<edge_id>` hash.
764    ///
765    /// Returns an empty `Vec` when the subject has no edges on the
766    /// requested side — including standalone executions (no owning
767    /// flow). Ordering is unspecified: the underlying adjacency SET
768    /// is an unordered SMEMBERS read. Callers that need deterministic
769    /// order should sort by [`EdgeSnapshot::edge_id`] /
770    /// [`EdgeSnapshot::created_at`] themselves.
771    ///
772    /// Parse failures on the edge hash surface as
773    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]
774    /// — unknown fields, missing required fields, endpoint mismatches
775    /// against the adjacency SET all fail loud rather than silently
776    /// returning partial results.
777    ///
778    /// Gated on the `core` feature — edge reads are part of the
779    /// minimal engine surface a Postgres-style backend must honour.
780    ///
781    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
782    #[cfg(feature = "core")]
783    async fn list_edges(
784        &self,
785        _flow_id: &FlowId,
786        _direction: EdgeDirection,
787    ) -> Result<Vec<EdgeSnapshot>, EngineError> {
788        Err(EngineError::Unavailable { op: "list_edges" })
789    }
790
791    /// Snapshot a single dependency edge by its owning flow + edge id.
792    ///
793    /// `Ok(None)` when the edge hash is absent (never staged, or
794    /// staged under a different flow than `flow_id`). Parse failures
795    /// on a present edge hash surface as
796    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]
797    /// — the stored `flow_id` field is cross-checked against the
798    /// caller's expected `flow_id` so a wrong-key read fails loud
799    /// rather than returning an unrelated edge.
800    ///
801    /// Gated on the `core` feature — single-edge reads are part of
802    /// the minimal snapshot surface an alternate backend must honour
803    /// alongside [`Self::describe_execution`] / [`Self::describe_flow`]
804    /// / [`Self::list_edges`].
805    ///
806    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
807    #[cfg(feature = "core")]
808    async fn describe_edge(
809        &self,
810        _flow_id: &FlowId,
811        _edge_id: &EdgeId,
812    ) -> Result<Option<EdgeSnapshot>, EngineError> {
813        Err(EngineError::Unavailable {
814            op: "describe_edge",
815        })
816    }
817
818    /// Resolve an execution's owning flow id, if any.
819    ///
820    /// `Ok(None)` when the execution's core record is absent or has
821    /// no associated flow (standalone execution). A present-but-
822    /// malformed `flow_id` field surfaces as
823    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`].
824    ///
825    /// Gated on the `core` feature. Used by ff-sdk's
826    /// `list_outgoing_edges` / `list_incoming_edges` to pivot from a
827    /// consumer-supplied `ExecutionId` to the `FlowId` required by
828    /// [`Self::list_edges`]. A Valkey backend serves this with a
829    /// single `HGET exec_core flow_id`; a Postgres backend serves it
830    /// with the equivalent single-column row lookup.
831    ///
832    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
833    #[cfg(feature = "core")]
834    async fn resolve_execution_flow_id(
835        &self,
836        _eid: &ExecutionId,
837    ) -> Result<Option<FlowId>, EngineError> {
838        Err(EngineError::Unavailable {
839            op: "resolve_execution_flow_id",
840        })
841    }
842
843    /// List flows on a partition with cursor-based pagination (issue
844    /// #185).
845    ///
846    /// Returns a [`ListFlowsPage`] of [`FlowSummary`](crate::contracts::FlowSummary)
847    /// rows ordered by `flow_id` (UUID byte-lexicographic). `cursor`
848    /// is `None` for the first page; callers forward the returned
849    /// `next_cursor` verbatim to continue iteration, and the listing
850    /// is exhausted when `next_cursor` is `None`. `limit` is the
851    /// maximum number of rows to return on this page — implementations
852    /// MAY return fewer (end of partition) but MUST NOT exceed it.
853    ///
854    /// Ordering rationale: flow ids are UUIDs, and both Valkey
855    /// (sort after-the-fact) and Postgres (`ORDER BY flow_id`) can
856    /// agree on byte-lexicographic order — the same order
857    /// `FlowId::to_string()` produces for canonical hyphenated UUIDs.
858    /// Mapping to `cursor > flow_id` keeps the contract backend-
859    /// independent.
860    ///
861    /// # Postgres implementation pattern
862    ///
863    /// A Postgres-backed implementation serves this directly with
864    ///
865    /// ```sql
866    /// SELECT flow_id, created_at_ms, public_flow_state
867    ///   FROM ff_flow
868    ///  WHERE partition_key = $1
869    ///    AND ($2::uuid IS NULL OR flow_id > $2)
870    ///  ORDER BY flow_id
871    ///  LIMIT $3 + 1;
872    /// ```
873    ///
874    /// — reading one extra row to decide whether `next_cursor` should
875    /// be set to the last row's `flow_id`. The Valkey implementation
876    /// maintains the `ff:idx:{fp:N}:flow_index` SET and performs the
877    /// sort + slice client-side (SMEMBERS then sort-by-UUID-bytes),
878    /// pipelining `HGETALL flow_core` for each row on the page.
879    ///
880    /// Gated on the `core` feature — flow listing is part of the
881    /// minimal engine surface a Postgres-style backend must honour.
882    #[cfg(feature = "core")]
883    async fn list_flows(
884        &self,
885        _partition: PartitionKey,
886        _cursor: Option<FlowId>,
887        _limit: usize,
888    ) -> Result<ListFlowsPage, EngineError> {
889        Err(EngineError::Unavailable { op: "list_flows" })
890    }
891
892    /// Enumerate registered lanes with cursor-based pagination.
893    ///
894    /// Lanes are global (not partition-scoped) — the backend serves
895    /// this from its lane registry and does NOT accept a
896    /// [`crate::partition::Partition`] argument. Results are sorted
897    /// by [`LaneId`] name so the ordering is stable across calls and
898    /// cursors address a deterministic position in the sort.
899    ///
900    /// * `cursor` — exclusive lower bound. `None` starts from the
901    ///   first lane. To continue a walk, pass the previous page's
902    ///   [`ListLanesPage::next_cursor`].
903    /// * `limit` — hard cap on the number of lanes returned in the
904    ///   page. Backends MAY round this down when the registry size
905    ///   is smaller; they MUST NOT return more than `limit`.
906    ///
907    /// [`ListLanesPage::next_cursor`] is `Some(last_lane_in_page)`
908    /// iff at least one more lane exists after the returned page,
909    /// and `None` on the final page. Callers loop until `next_cursor`
910    /// is `None` to read the full registry.
911    ///
912    /// Gated on the `core` feature — lane enumeration is part of the
913    /// minimal snapshot surface an alternate backend must honour
914    /// alongside [`Self::describe_flow`] / [`Self::list_edges`].
915    #[cfg(feature = "core")]
916    async fn list_lanes(
917        &self,
918        _cursor: Option<LaneId>,
919        _limit: usize,
920    ) -> Result<ListLanesPage, EngineError> {
921        Err(EngineError::Unavailable { op: "list_lanes" })
922    }
923
924    /// List suspended executions in one partition, cursor-paginated,
925    /// with each entry's suspension `reason_code` populated (issue
926    /// #183).
927    ///
928    /// Consumer-facing "what's blocked on what?" panels (ff-board's
929    /// suspended-executions view, operator CLIs) need the reason in
930    /// the list response so the UI does not round-trip per row to
931    /// `describe_execution` for a field it knows it needs. `reason`
932    /// on [`SuspendedExecutionEntry`] carries the free-form
933    /// `suspension:current.reason_code` field — see the type rustdoc
934    /// for the String-not-enum rationale.
935    ///
936    /// `cursor` is opaque to callers; pass `None` to start a fresh
937    /// scan and feed the returned [`ListSuspendedPage::next_cursor`]
938    /// back in on subsequent pages until it comes back `None`.
939    /// `limit` bounds the `entries` count; backends MAY return fewer
940    /// when the partition is exhausted.
941    ///
942    /// Ordering is by ascending `suspended_at_ms` (the per-lane
943    /// suspended ZSET score == `timeout_at` or the no-timeout
944    /// sentinel) with execution id as a lex tiebreak, so cursor
945    /// continuation is deterministic across calls.
946    ///
947    /// Gated on the `core` feature — suspended-list enumeration is
948    /// part of the minimal engine surface a Postgres-style backend
949    /// must honour.
950    #[cfg(feature = "core")]
951    async fn list_suspended(
952        &self,
953        _partition: PartitionKey,
954        _cursor: Option<ExecutionId>,
955        _limit: usize,
956    ) -> Result<ListSuspendedPage, EngineError> {
957        Err(EngineError::Unavailable {
958            op: "list_suspended",
959        })
960    }
961
962    /// Forward-only paginated listing of the executions indexed under
963    /// one partition.
964    ///
965    /// Reads the partition-wide `ff:idx:{p:N}:all_executions` set,
966    /// sorts lexicographically on `ExecutionId`, and returns the page
967    /// of ids strictly greater than `cursor` (or starting from the
968    /// smallest id when `cursor = None`). The returned
969    /// [`ListExecutionsPage::next_cursor`] is the last id on the page
970    /// iff at least one more id exists past it; `None` signals
971    /// end-of-stream.
972    ///
973    /// `limit` is the maximum number of ids returned on this page. A
974    /// `limit` of `0` returns an empty page with `next_cursor = None`.
975    /// Backends MAY cap `limit` internally (Valkey: 1000) and return
976    /// fewer ids than requested; callers continue paginating until
977    /// `next_cursor == None`.
978    ///
979    /// Ordering is stable under concurrent inserts for already-emitted
980    /// ids (an id less-than-or-equal-to the caller's cursor is never
981    /// re-emitted in later pages) but new inserts past the cursor WILL
982    /// appear in subsequent pages — consistent with forward-only
983    /// cursor semantics.
984    ///
985    /// Gated on the `core` feature — partition-scoped listing is part
986    /// of the minimal engine surface every backend must honour.
987    #[cfg(feature = "core")]
988    async fn list_executions(
989        &self,
990        _partition: PartitionKey,
991        _cursor: Option<ExecutionId>,
992        _limit: usize,
993    ) -> Result<ListExecutionsPage, EngineError> {
994        Err(EngineError::Unavailable {
995            op: "list_executions",
996        })
997    }
998
999    // ── Trigger ops (issue #150) ──
1000
1001    /// Deliver an external signal to a suspended execution's waitpoint.
1002    ///
1003    /// The backend atomically records the signal, evaluates the resume
1004    /// condition, and — when satisfied — transitions the execution
1005    /// from `suspended` to `runnable` (or buffers the signal when the
1006    /// waitpoint is still `pending`). Duplicate delivery — same
1007    /// `idempotency_key` + waitpoint — surfaces as
1008    /// [`DeliverSignalResult::Duplicate`] with the pre-existing
1009    /// `signal_id` rather than mutating state twice.
1010    ///
1011    /// Input validation (HMAC token presence, payload size limits,
1012    /// signal-name shape) is the backend's responsibility; callers
1013    /// pass a fully populated [`DeliverSignalArgs`] and receive typed
1014    /// outcomes or typed errors (`ScriptError::invalid_token`,
1015    /// `ScriptError::token_expired`, `ScriptError::ExecutionNotFound`
1016    /// surfaced via [`EngineError::Transport`] on the Valkey backend).
1017    ///
1018    /// Gated on the `core` feature — signal delivery is part of the
1019    /// minimal trigger surface every backend must honour so ff-server
1020    /// / REST handlers can dispatch against `Arc<dyn EngineBackend>`
1021    /// without knowing which backend is running underneath.
1022    #[cfg(feature = "core")]
1023    async fn deliver_signal(
1024        &self,
1025        _args: DeliverSignalArgs,
1026    ) -> Result<DeliverSignalResult, EngineError> {
1027        Err(EngineError::Unavailable {
1028            op: "deliver_signal",
1029        })
1030    }
1031
1032    /// Claim a resumed execution — a previously-suspended attempt that
1033    /// has cleared its resume condition (e.g. via
1034    /// [`Self::deliver_signal`]) and now needs a worker to pick up the
1035    /// same attempt index.
1036    ///
1037    /// Distinct from [`Self::claim`] (fresh work) and
1038    /// [`Self::claim_from_resume_grant`] (grant-based ownership transfer
1039    /// after a crash): the resumed-claim path re-binds an existing
1040    /// attempt rather than minting a new one. The backend issues a
1041    /// fresh `lease_id` + bumps the `lease_epoch`, preserving
1042    /// `attempt_id` / `attempt_index` so stream frames and progress
1043    /// updates continue on the same attempt.
1044    ///
1045    /// Typed failures surface via `ScriptError` → `EngineError`:
1046    /// `NotAResumedExecution` when the attempt state is not
1047    /// `attempt_interrupted`, `ExecutionNotLeaseable` when the
1048    /// lifecycle phase is not `runnable`, and `InvalidClaimGrant`
1049    /// when the grant key is missing or was already consumed.
1050    ///
1051    /// Gated on the `core` feature — resumed-claim is part of the
1052    /// minimal trigger surface every backend must honour.
1053    #[cfg(feature = "core")]
1054    async fn claim_resumed_execution(
1055        &self,
1056        _args: ClaimResumedExecutionArgs,
1057    ) -> Result<ClaimResumedExecutionResult, EngineError> {
1058        Err(EngineError::Unavailable {
1059            op: "claim_resumed_execution",
1060        })
1061    }
1062
1063    /// Scan a lane's eligible ZSET on one partition for
1064    /// highest-priority executions awaiting a worker (v0.12 PR-5).
1065    ///
1066    /// Lifted from the SDK-side `ZRANGEBYSCORE` inline on
1067    /// `FlowFabricWorker::claim_next` — the scheduler-bypass scanner
1068    /// gated behind `direct-valkey-claim`. The trait method itself is
1069    /// backend-agnostic; consumers that drive the scanner loop
1070    /// (bench harnesses, single-tenant dev) compose it with
1071    /// [`Self::issue_claim_grant`] + [`Self::claim_execution`] to
1072    /// replicate the pre-PR-5 `claim_next` body.
1073    ///
1074    /// # Backend coverage
1075    ///
1076    /// * **Valkey** — `ZRANGEBYSCORE eligible_zset -inf +inf LIMIT 0 <limit>`
1077    ///   on the lane's partition-scoped eligible key. Single
1078    ///   command; no script round-trip. Wire shape is byte-for-byte
1079    ///   identical to the pre-PR SDK inline call so bench traces
1080    ///   match pre-PR without new `#[tracing::instrument]` span names.
1081    /// * **Postgres / SQLite** — use the `Err(Unavailable)` default.
1082    ///   PG/SQLite consumers drive work through the scheduler-routed
1083    ///   [`Self::claim_for_worker`] path instead of the scanner
1084    ///   primitives exposed here; lifting the scheduler itself onto
1085    ///   the trait is RFC-024 follow-up scope. See
1086    ///   `project_claim_from_grant_pg_sqlite_gap.md` for motivation.
1087    ///
1088    /// Default impl returns [`EngineError::Unavailable`] so the trait
1089    /// addition is non-breaking for out-of-tree backends. Same
1090    /// precedent as [`Self::claim_execution`] landing in v0.12 PR-4.
1091    #[cfg(feature = "core")]
1092    async fn scan_eligible_executions(
1093        &self,
1094        _args: ScanEligibleArgs,
1095    ) -> Result<Vec<ExecutionId>, EngineError> {
1096        Err(EngineError::Unavailable {
1097            op: "scan_eligible_executions",
1098        })
1099    }
1100
1101    /// Issue a claim grant — the scheduler's admission write — for a
1102    /// single execution on a single lane (v0.12 PR-5).
1103    ///
1104    /// Lifted from the SDK-side `ff_issue_claim_grant` inline helper
1105    /// on `FlowFabricWorker::claim_next`. The backend atomically
1106    /// writes the grant hash, appends to the per-worker grant index,
1107    /// and removes the execution from the lane's eligible ZSET.
1108    ///
1109    /// Typed rejects surface via [`EngineError::Validation`]:
1110    /// `CapabilityMismatch` when the worker's capabilities do not
1111    /// cover the execution's `required_capabilities`, `InvalidInput`
1112    /// for malformed args. Transport faults surface via
1113    /// [`EngineError::Transport`].
1114    ///
1115    /// # Backend coverage
1116    ///
1117    /// * **Valkey** — one `ff_issue_claim_grant` FCALL. KEYS/ARGV
1118    ///   shape is byte-for-byte identical to the pre-PR SDK inline
1119    ///   call; bench traces match pre-PR.
1120    /// * **Postgres / SQLite** — `Err(Unavailable)` default; use
1121    ///   [`Self::claim_for_worker`] instead. See
1122    ///   [`Self::scan_eligible_executions`] for the cross-link
1123    ///   rationale.
1124    #[cfg(feature = "core")]
1125    async fn issue_claim_grant(
1126        &self,
1127        _args: IssueClaimGrantArgs,
1128    ) -> Result<IssueClaimGrantOutcome, EngineError> {
1129        Err(EngineError::Unavailable {
1130            op: "issue_claim_grant",
1131        })
1132    }
1133
1134    /// Move an execution from a lane's eligible ZSET into its
1135    /// blocked_route ZSET (v0.12 PR-5).
1136    ///
1137    /// Lifted from the SDK-side `ff_block_execution_for_admission`
1138    /// inline helper on `FlowFabricWorker::claim_next`. Called after
1139    /// a [`Self::issue_claim_grant`] `CapabilityMismatch` reject —
1140    /// without a block step, the inline scanner would re-pick the
1141    /// same top-of-ZSET every tick (parity with
1142    /// `ff-scheduler::Scheduler::block_candidate`).
1143    ///
1144    /// The engine's unblock scanner periodically promotes
1145    /// blocked_route back to eligible once a worker with matching
1146    /// caps registers.
1147    ///
1148    /// # Backend coverage
1149    ///
1150    /// * **Valkey** — one `ff_block_execution_for_admission` FCALL.
1151    /// * **Postgres / SQLite** — `Err(Unavailable)` default; the
1152    ///   scheduler-routed [`Self::claim_for_worker`] path handles
1153    ///   admission rejects server-side.
1154    #[cfg(feature = "core")]
1155    async fn block_route(
1156        &self,
1157        _args: BlockRouteArgs,
1158    ) -> Result<BlockRouteOutcome, EngineError> {
1159        Err(EngineError::Unavailable { op: "block_route" })
1160    }
1161
1162    /// Consume a scheduler-issued claim grant to mint a fresh attempt.
1163    ///
1164    /// The SDK's grant-consumer path — paired with `FlowFabricWorker::claim_from_grant`
1165    /// in `ff-sdk` — routes through this method. The scheduler has
1166    /// already validated budget / quota / capabilities and written a
1167    /// grant (Valkey `claim_grant` hash); this call atomically
1168    /// consumes that grant and creates the attempt row, mints
1169    /// `lease_id` + `lease_epoch`, and returns a
1170    /// [`ClaimExecutionResult::Claimed`] carrying the minted lease
1171    /// triple.
1172    ///
1173    /// Distinct from [`Self::claim`] (the scheduler-bypass scanner
1174    /// used by the `direct-valkey-claim` feature) — this method
1175    /// assumes the grant already exists and skips capability / ZSET
1176    /// scanning. The Valkey impl fires exactly one `ff_claim_execution`
1177    /// FCALL.
1178    ///
1179    /// Typed failures surface via `ScriptError` → `EngineError`:
1180    /// `UseClaimResumedExecution` when the attempt is actually
1181    /// `attempt_interrupted` (caller should retry via
1182    /// [`Self::claim_resumed_execution`] — see `ContentionKind` at
1183    /// `ff_core::engine_error`), `InvalidClaimGrant` when the grant is
1184    /// missing / consumed / worker-mismatched, `CapabilityMismatch`
1185    /// when the execution's `required_capabilities` drifted after
1186    /// grant issuance.
1187    ///
1188    /// # Backend coverage
1189    ///
1190    /// * **Valkey** — implemented in `ff-backend-valkey` (one
1191    ///   `ff_claim_execution` FCALL).
1192    /// * **Postgres / SQLite** — use the `Err(Unavailable)` default in
1193    ///   this PR. Grants on PG / SQLite today flow through
1194    ///   `PostgresScheduler::claim_for_worker` (a sibling struct, not
1195    ///   an `EngineBackend` method); wiring the default-over-trait
1196    ///   behaviour into a PG / SQLite `claim_execution` impl lands
1197    ///   with a future RFC-024 grant-consumer extension.
1198    ///
1199    /// The default impl returns [`EngineError::Unavailable`] so the
1200    /// trait addition is non-breaking for out-of-tree backends. Same
1201    /// precedent as [`Self::read_current_attempt_index`] landing in
1202    /// v0.12 PR-3.
1203    #[cfg(feature = "core")]
1204    async fn claim_execution(
1205        &self,
1206        _args: ClaimExecutionArgs,
1207    ) -> Result<ClaimExecutionResult, EngineError> {
1208        Err(EngineError::Unavailable {
1209            op: "claim_execution",
1210        })
1211    }
1212
1213    /// Operator-initiated cancellation of a flow and (optionally) its
1214    /// member executions. See RFC-012 §3.1.1 for the policy /wait
1215    /// matrix.
1216    async fn cancel_flow(
1217        &self,
1218        id: &FlowId,
1219        policy: CancelFlowPolicy,
1220        wait: CancelFlowWait,
1221    ) -> Result<CancelFlowResult, EngineError>;
1222
1223    /// RFC-016 Stage A: set the inbound-edge-group policy for a
1224    /// downstream execution. Must be called before the first
1225    /// `add_dependency(... -> downstream_execution_id)` — the backend
1226    /// rejects with [`EngineError::Conflict`] if edges have already
1227    /// been staged for this group.
1228    ///
1229    /// Stage A honours only
1230    /// [`EdgeDependencyPolicy::AllOf`](crate::contracts::EdgeDependencyPolicy::AllOf);
1231    /// the `AnyOf` / `Quorum` variants return
1232    /// [`EngineError::Validation`] with
1233    /// `detail = "stage A supports AllOf only; AnyOf/Quorum land in stage B"`
1234    /// until Stage B's resolver lands.
1235    #[cfg(feature = "core")]
1236    async fn set_edge_group_policy(
1237        &self,
1238        _flow_id: &FlowId,
1239        _downstream_execution_id: &ExecutionId,
1240        _policy: crate::contracts::EdgeDependencyPolicy,
1241    ) -> Result<crate::contracts::SetEdgeGroupPolicyResult, EngineError> {
1242        Err(EngineError::Unavailable {
1243            op: "set_edge_group_policy",
1244        })
1245    }
1246
1247    // ── HMAC secret rotation (v0.7 migration-master Q4) ──
1248
1249    /// Rotate the waitpoint HMAC signing kid **cluster-wide**.
1250    ///
1251    /// **v0.7 migration-master Q4 (adjudicated 2026-04-24).**
1252    /// Additive trait surface so Valkey and Postgres backends can
1253    /// both expose the "rotate everywhere" semantic under one name.
1254    ///
1255    /// * Valkey impl fans out an `ff_rotate_waitpoint_hmac_secret`
1256    ///   FCALL per execution partition. `entries.len() == num_flow_partitions`
1257    ///   and per-partition failures are surfaced as inner `Err`
1258    ///   entries — the call as a whole does not fail when one
1259    ///   partition's FCALL fails, matching
1260    ///   [`ff_sdk::admin::rotate_waitpoint_hmac_secret_all_partitions`]'s
1261    ///   partial-success contract.
1262    /// * Postgres impl (Wave 4) writes one row to
1263    ///   `ff_waitpoint_hmac(kid, secret, rotated_at)` and returns a
1264    ///   single-entry vec with `partition = 0`.
1265    ///
1266    /// The default impl returns
1267    /// [`EngineError::Unavailable`] with
1268    /// `op = "rotate_waitpoint_hmac_secret_all"` so backends that
1269    /// haven't implemented the method surface the miss loudly rather
1270    /// than silently no-op'ing. Both concrete backends override.
1271    async fn rotate_waitpoint_hmac_secret_all(
1272        &self,
1273        _args: RotateWaitpointHmacSecretAllArgs,
1274    ) -> Result<RotateWaitpointHmacSecretAllResult, EngineError> {
1275        Err(EngineError::Unavailable {
1276            op: "rotate_waitpoint_hmac_secret_all",
1277        })
1278    }
1279
1280    /// Seed the initial waitpoint HMAC secret for a fresh deployment
1281    /// (issue #280).
1282    ///
1283    /// **Idempotent.** If a `current_kid` (Valkey per-partition) or
1284    /// an active kid row (Postgres) already exists with the given
1285    /// `kid`, the method returns
1286    /// [`SeedOutcome::AlreadySeeded`] without overwriting, reporting
1287    /// whether the stored secret matches the caller-supplied one via
1288    /// `same_secret`. Callers (cairn boot, operator tooling) invoke
1289    /// this on every boot and let the backend decide whether to
1290    /// install — removing the client-side "check then HSET" race that
1291    /// cairn's raw-HSET boot path silently tolerated.
1292    ///
1293    /// For rotation of an already-seeded secret, use
1294    /// [`Self::rotate_waitpoint_hmac_secret_all`] instead; seed is
1295    /// install-only.
1296    ///
1297    /// The default impl returns [`EngineError::Unavailable`] with
1298    /// `op = "seed_waitpoint_hmac_secret"` so backends that haven't
1299    /// implemented the method surface the miss loudly.
1300    async fn seed_waitpoint_hmac_secret(
1301        &self,
1302        _args: SeedWaitpointHmacSecretArgs,
1303    ) -> Result<SeedOutcome, EngineError> {
1304        Err(EngineError::Unavailable {
1305            op: "seed_waitpoint_hmac_secret",
1306        })
1307    }
1308
1309    // ── Budget ──
1310
1311    /// Report usage against a budget and check limits. Returns the
1312    /// typed [`ReportUsageResult`] variant; backends enforce
1313    /// idempotency via the caller-supplied
1314    /// [`UsageDimensions::dedup_key`] (RFC-012 §R7.2.3 — replaces
1315    /// the pre-Round-7 `AdmissionDecision` return).
1316    async fn report_usage(
1317        &self,
1318        handle: &Handle,
1319        budget: &BudgetId,
1320        dimensions: crate::backend::UsageDimensions,
1321    ) -> Result<ReportUsageResult, EngineError>;
1322
1323    // ── Stream reads (RFC-012 Stage 1c tranche-4; issue #87) ──
1324
1325    /// Read frames from a completed or in-flight attempt's stream.
1326    ///
1327    /// `from` / `to` are [`StreamCursor`] values — `StreamCursor::Start`
1328    /// / `StreamCursor::End` are equivalent to XRANGE `-` / `+`, and
1329    /// `StreamCursor::At("<id>")` reads from a concrete entry id.
1330    ///
1331    /// Input validation (count_limit bounds, cursor shape) is the
1332    /// caller's responsibility — SDK-side wrappers in
1333    /// [`ff-sdk`](https://docs.rs/ff-sdk) enforce bounds before
1334    /// forwarding. Backends MAY additionally reject out-of-range
1335    /// input via [`EngineError::Validation`].
1336    ///
1337    /// Gated on the `streaming` feature — stream reads are part of
1338    /// the stream-subset surface a backend without XREAD-like
1339    /// primitives may omit.
1340    #[cfg(feature = "streaming")]
1341    async fn read_stream(
1342        &self,
1343        _execution_id: &ExecutionId,
1344        _attempt_index: AttemptIndex,
1345        _from: StreamCursor,
1346        _to: StreamCursor,
1347        _count_limit: u64,
1348    ) -> Result<StreamFrames, EngineError> {
1349        Err(EngineError::Unavailable { op: "read_stream" })
1350    }
1351
1352    /// Tail a live attempt's stream.
1353    ///
1354    /// `after` is an exclusive [`StreamCursor`] — entries with id
1355    /// strictly greater than `after` are returned. `StreamCursor::Start`
1356    /// / `StreamCursor::End` are NOT accepted here; callers MUST pass
1357    /// a concrete id (or `StreamCursor::from_beginning()`). The SDK
1358    /// wrapper rejects the open markers before reaching the backend.
1359    ///
1360    /// `block_ms == 0` → non-blocking peek. `block_ms > 0` → blocks up
1361    /// to that many ms for a new entry.
1362    ///
1363    /// `visibility` (RFC-015 §6.1) filters the returned entries by
1364    /// their stored [`StreamMode`](crate::backend::StreamMode)
1365    /// `mode` field. Default
1366    /// [`TailVisibility::All`](crate::backend::TailVisibility::All)
1367    /// preserves v1 behaviour.
1368    ///
1369    /// Gated on the `streaming` feature — see [`read_stream`](Self::read_stream).
1370    #[cfg(feature = "streaming")]
1371    async fn tail_stream(
1372        &self,
1373        _execution_id: &ExecutionId,
1374        _attempt_index: AttemptIndex,
1375        _after: StreamCursor,
1376        _block_ms: u64,
1377        _count_limit: u64,
1378        _visibility: TailVisibility,
1379    ) -> Result<StreamFrames, EngineError> {
1380        Err(EngineError::Unavailable { op: "tail_stream" })
1381    }
1382
1383    /// Read the rolling summary document for an attempt (RFC-015 §6.3).
1384    ///
1385    /// Returns `Ok(None)` when no [`StreamMode::DurableSummary`](crate::backend::StreamMode::DurableSummary)
1386    /// frame has ever been appended for the attempt. Non-blocking Hash
1387    /// read; safe to call from any consumer without holding the lease.
1388    ///
1389    /// Gated on the `streaming` feature — summary reads are part of
1390    /// the stream-subset surface.
1391    #[cfg(feature = "streaming")]
1392    async fn read_summary(
1393        &self,
1394        _execution_id: &ExecutionId,
1395        _attempt_index: AttemptIndex,
1396    ) -> Result<Option<SummaryDocument>, EngineError> {
1397        Err(EngineError::Unavailable {
1398            op: "read_summary",
1399        })
1400    }
1401
1402    // ── RFC-017 Stage A — Ingress (5) ──────────────────────────
1403    //
1404    // Every method in this block has a default impl returning
1405    // `EngineError::Unavailable { op }` per RFC-017 §5.3. Concrete
1406    // backends override each method with a real body. A missing
1407    // override surfaces as a loud typed error at the call site rather
1408    // than a silent no-op.
1409
1410    /// Create an execution. Ingress row 6 (RFC-017 §4). Wraps
1411    /// `ff_create_execution` on Valkey; `INSERT INTO ff_execution ...`
1412    /// on Postgres. The `idempotency_key` + backend-side default
1413    /// `dedup_ttl_ms = 86400000` make duplicate submissions idempotent.
1414    #[cfg(feature = "core")]
1415    async fn create_execution(
1416        &self,
1417        _args: CreateExecutionArgs,
1418    ) -> Result<CreateExecutionResult, EngineError> {
1419        Err(EngineError::Unavailable {
1420            op: "create_execution",
1421        })
1422    }
1423
1424    /// Create a flow header. Ingress row 5.
1425    #[cfg(feature = "core")]
1426    async fn create_flow(
1427        &self,
1428        _args: CreateFlowArgs,
1429    ) -> Result<CreateFlowResult, EngineError> {
1430        Err(EngineError::Unavailable { op: "create_flow" })
1431    }
1432
1433    /// Atomically add an execution to a flow (single-FCALL co-located
1434    /// commit on Valkey; single-transaction UPSERT on Postgres).
1435    #[cfg(feature = "core")]
1436    async fn add_execution_to_flow(
1437        &self,
1438        _args: AddExecutionToFlowArgs,
1439    ) -> Result<AddExecutionToFlowResult, EngineError> {
1440        Err(EngineError::Unavailable {
1441            op: "add_execution_to_flow",
1442        })
1443    }
1444
1445    /// Stage a dependency edge between flow members. CAS-guarded on
1446    /// `graph_revision` — stale rev returns `Contention(StaleGraphRevision)`.
1447    #[cfg(feature = "core")]
1448    async fn stage_dependency_edge(
1449        &self,
1450        _args: StageDependencyEdgeArgs,
1451    ) -> Result<StageDependencyEdgeResult, EngineError> {
1452        Err(EngineError::Unavailable {
1453            op: "stage_dependency_edge",
1454        })
1455    }
1456
1457    /// Apply a staged dependency edge to its downstream child.
1458    #[cfg(feature = "core")]
1459    async fn apply_dependency_to_child(
1460        &self,
1461        _args: ApplyDependencyToChildArgs,
1462    ) -> Result<ApplyDependencyToChildResult, EngineError> {
1463        Err(EngineError::Unavailable {
1464            op: "apply_dependency_to_child",
1465        })
1466    }
1467
1468    /// Resolve one dependency edge after its upstream reached a
1469    /// terminal outcome — satisfy on "success", mark impossible
1470    /// otherwise. Idempotent (`AlreadyResolved` on replay).
1471    ///
1472    /// PR-7b Step 0 overlap-resolver: lifted here so cluster 2
1473    /// (`scanner/dependency_reconciler`) could trait-route through
1474    /// `Arc<dyn EngineBackend>` without a merge conflict with cluster
1475    /// 4. Cluster 4 (`completion_listener::spawn_dispatch_loop`)
1476    /// ultimately routed through the coarser
1477    /// [`Self::cascade_completion`] (per-payload) instead of looping
1478    /// over this per-edge method, because the Postgres cascade is
1479    /// outbox-driven rather than per-edge — see `cascade_completion`
1480    /// rustdoc "Timing semantics" for details.
1481    ///
1482    /// # Backend status
1483    ///
1484    /// - **Valkey:** wraps `ff_resolve_dependency` (RFC-016 Stage C
1485    ///   signature). Atomic single-slot FCALL.
1486    /// - **Postgres:** `Unavailable`. PG's post-completion cascade is
1487    ///   not per-edge; it runs via
1488    ///   `ff_backend_postgres::dispatch::dispatch_completion(event_id)`
1489    ///   keyed on the `ff_completion_event` outbox row. The Valkey-
1490    ///   shaped per-edge resolve does not map cleanly to that model;
1491    ///   PG's `dependency_reconciler` already calls `dispatch_completion`
1492    ///   directly. The engine's PR-7b/final integration test expects
1493    ///   `Unsupported` logs from Valkey-shaped scanners on a PG
1494    ///   deployment — this surface honours that contract.
1495    /// - **SQLite:** `Unavailable` for the same reason (mirrors PG).
1496    ///
1497    /// The default impl returns [`EngineError::Unavailable`] so a
1498    /// backend that has not been migrated surfaces a typed
1499    /// `Unsupported`-grade error rather than a panic.
1500    #[cfg(feature = "core")]
1501    async fn resolve_dependency(
1502        &self,
1503        _args: crate::contracts::ResolveDependencyArgs,
1504    ) -> Result<crate::contracts::ResolveDependencyOutcome, EngineError> {
1505        Err(EngineError::Unavailable {
1506            op: "resolve_dependency",
1507        })
1508    }
1509
1510    /// Cascade a terminal-execution completion into its downstream
1511    /// edges. Consumed by
1512    /// `ff-engine::completion_listener::spawn_dispatch_loop` (PR-7b
1513    /// Cluster 4) to trait-route the post-completion DAG-promotion
1514    /// path through `Arc<dyn EngineBackend>`.
1515    ///
1516    /// Distinct from [`Self::resolve_dependency`]: that method is
1517    /// per-edge (one `ff_resolve_dependency` FCALL); this method is
1518    /// per-completion and orchestrates the full outgoing-edge walk
1519    /// plus `child_skipped` recursion (Valkey) or outbox-event
1520    /// dispatch (Postgres).
1521    ///
1522    /// # Timing semantics
1523    ///
1524    /// Backends diverge on *when* the caller observes cascade work.
1525    /// See [`CascadeOutcome`] for the full contract; the short form:
1526    ///
1527    /// - **Valkey:** synchronous. FCALL-driven walk completes inline;
1528    ///   `child_skipped` descendants are recursively cascaded up to the
1529    ///   internal `MAX_CASCADE_DEPTH` cap before return.
1530    /// - **Postgres:** asynchronous via the `ff_completion_event`
1531    ///   outbox. The call resolves `payload` to its `event_id`, runs
1532    ///   `ff_backend_postgres::dispatch::dispatch_completion`, and
1533    ///   returns when the outbox row has been claimed + its direct
1534    ///   hops advanced. Further-descendant cascades ride their own
1535    ///   outbox events (emitted by the per-hop tx) — NOT this call.
1536    ///
1537    /// Consumers that depend on synchronous cascade must either target
1538    /// Valkey explicitly or observe PG's `dispatched_at_ms` clearance
1539    /// via the `dependency_reconciler` partial index to verify drain.
1540    ///
1541    /// The default impl returns [`EngineError::Unavailable`] so
1542    /// backends that have not been migrated surface a typed error
1543    /// rather than a panic.
1544    ///
1545    /// [`CascadeOutcome`]: crate::contracts::CascadeOutcome
1546    #[cfg(feature = "core")]
1547    async fn cascade_completion(
1548        &self,
1549        _payload: &crate::backend::CompletionPayload,
1550    ) -> Result<crate::contracts::CascadeOutcome, EngineError> {
1551        Err(EngineError::Unavailable {
1552            op: "cascade_completion",
1553        })
1554    }
1555
1556    // ── RFC-017 Stage A — Operator control (4) ─────────────────
1557
1558    /// Operator-initiated execution cancel (row 2).
1559    #[cfg(feature = "core")]
1560    async fn cancel_execution(
1561        &self,
1562        _args: CancelExecutionArgs,
1563    ) -> Result<CancelExecutionResult, EngineError> {
1564        Err(EngineError::Unavailable {
1565            op: "cancel_execution",
1566        })
1567    }
1568
1569    /// Re-score an execution's eligibility priority (row 17).
1570    #[cfg(feature = "core")]
1571    async fn change_priority(
1572        &self,
1573        _args: ChangePriorityArgs,
1574    ) -> Result<ChangePriorityResult, EngineError> {
1575        Err(EngineError::Unavailable {
1576            op: "change_priority",
1577        })
1578    }
1579
1580    /// Replay a terminal execution (row 22). Variadic KEYS handling
1581    /// (inbound-edge pre-read) is hidden inside the Valkey impl per
1582    /// RFC-017 §4 row 3.
1583    #[cfg(feature = "core")]
1584    async fn replay_execution(
1585        &self,
1586        _args: ReplayExecutionArgs,
1587    ) -> Result<ReplayExecutionResult, EngineError> {
1588        Err(EngineError::Unavailable {
1589            op: "replay_execution",
1590        })
1591    }
1592
1593    /// Operator-initiated lease revoke (row 19).
1594    #[cfg(feature = "core")]
1595    async fn revoke_lease(
1596        &self,
1597        _args: RevokeLeaseArgs,
1598    ) -> Result<RevokeLeaseResult, EngineError> {
1599        Err(EngineError::Unavailable { op: "revoke_lease" })
1600    }
1601
1602    // ── cairn #389 — service-layer typed FCALL surface ────────────
1603    //
1604    // These methods mirror `complete`/`fail`/`renew` (which take a
1605    // worker [`Handle`]) but dispatch against `(execution_id, fence)`
1606    // tuples supplied directly. Service-layer callers (cairn's
1607    // `valkey_control_plane_impl.rs`, future consumers that hold a
1608    // run/lease descriptor without a `Handle`) use these to avoid
1609    // going through the raw `ferriskey::Value` FCALL escape hatch.
1610    //
1611    // Same shape / same precedent as `suspend_by_triple` (cairn #322).
1612    //
1613    // Default impl returns [`EngineError::Unavailable`] so the trait
1614    // addition is non-breaking for out-of-tree backends. The in-tree
1615    // Valkey backend overrides; Postgres + SQLite keep the default
1616    // until follow-up parity work lands (consistent with
1617    // `issue_reclaim_grant` / `reclaim_execution` precedent).
1618
1619    /// Service-layer `complete_execution` — peer of [`Self::complete`]
1620    /// that takes a fence triple instead of a worker [`Handle`]. See
1621    /// the group preamble above for cairn-migration context.
1622    #[cfg(feature = "core")]
1623    async fn complete_execution(
1624        &self,
1625        _args: CompleteExecutionArgs,
1626    ) -> Result<CompleteExecutionResult, EngineError> {
1627        Err(EngineError::Unavailable {
1628            op: "complete_execution",
1629        })
1630    }
1631
1632    /// Service-layer `fail_execution` — peer of [`Self::fail`] that
1633    /// takes a fence triple instead of a worker [`Handle`].
1634    #[cfg(feature = "core")]
1635    async fn fail_execution(
1636        &self,
1637        _args: FailExecutionArgs,
1638    ) -> Result<FailExecutionResult, EngineError> {
1639        Err(EngineError::Unavailable {
1640            op: "fail_execution",
1641        })
1642    }
1643
1644    /// Service-layer `renew_lease` — peer of [`Self::renew`] that
1645    /// takes a fence triple instead of a worker [`Handle`].
1646    #[cfg(feature = "core")]
1647    async fn renew_lease(
1648        &self,
1649        _args: RenewLeaseArgs,
1650    ) -> Result<RenewLeaseResult, EngineError> {
1651        Err(EngineError::Unavailable { op: "renew_lease" })
1652    }
1653
1654    /// Service-layer `resume_execution` — transitions a suspended
1655    /// execution back to runnable. Distinct from
1656    /// [`Self::claim_from_resume_grant`] (which mints a worker handle
1657    /// against an already-eligible resumed execution): this method is
1658    /// the lifecycle transition primitive the control plane calls
1659    /// when an operator / auto-resume policy moves a suspended
1660    /// execution forward.
1661    ///
1662    /// The Valkey impl pre-reads `current_waitpoint_id` + `lane_id`
1663    /// from `exec_core` so callers only need the execution id + the
1664    /// trigger type — same ergonomics as `revoke_lease` reading
1665    /// `current_worker_instance_id` when callers omit it.
1666    #[cfg(feature = "core")]
1667    async fn resume_execution(
1668        &self,
1669        _args: ResumeExecutionArgs,
1670    ) -> Result<ResumeExecutionResult, EngineError> {
1671        Err(EngineError::Unavailable {
1672            op: "resume_execution",
1673        })
1674    }
1675
1676    /// Service-layer `check_admission_and_record` — atomic admission
1677    /// check against a quota policy. Callers supply the policy id +
1678    /// dimension (quota keys live on their own `{q:<policy>}`
1679    /// partition that cannot be derived from `execution_id`, so these
1680    /// travel outside [`CheckAdmissionArgs`]). `dimension` defaults
1681    /// to `"default"` inside the Valkey body when the caller passes
1682    /// an empty string — matches cairn's pre-migration default.
1683    #[cfg(feature = "core")]
1684    async fn check_admission(
1685        &self,
1686        _quota_policy_id: &crate::types::QuotaPolicyId,
1687        _dimension: &str,
1688        _args: CheckAdmissionArgs,
1689    ) -> Result<CheckAdmissionResult, EngineError> {
1690        Err(EngineError::Unavailable {
1691            op: "check_admission",
1692        })
1693    }
1694
1695    /// Generalised admission block — covers budget / quota /
1696    /// capability denial paths. `BlockingReason` selects both the
1697    /// eligibility state written to `exec_core` and the target
1698    /// `blocked_<reason>` lane index. Valkey wraps the existing
1699    /// `ff_block_execution_for_admission` FCALL (KEYS=3); PG/SQLite
1700    /// write the equivalent row transition.
1701    ///
1702    /// Companion to [`Self::block_route`] — `block_route` stays as the
1703    /// capability-mismatch shorthand; this method is the primitive
1704    /// the scheduler reaches for when the reason is known at the
1705    /// call site (budget denial, quota denial, etc.). FF #511 Phase 2b.
1706    #[cfg(feature = "core")]
1707    async fn block_execution_for_admission(
1708        &self,
1709        _args: BlockExecutionForAdmissionArgs,
1710    ) -> Result<BlockExecutionForAdmissionOutcome, EngineError> {
1711        Err(EngineError::Unavailable {
1712            op: "block_execution_for_admission",
1713        })
1714    }
1715
1716    /// FF #511 Phase 3 — typed snapshot of a budget's usage + limits
1717    /// hashes. Replaces the scheduler's Valkey-shaped HGETALL/HGET
1718    /// pattern on `ff:budget:{K}:{id}:limits` + `ff:budget:{K}:{id}:usage`.
1719    /// Returns [`BudgetUsageAndLimits::empty`] when the limits hash
1720    /// is absent ("no limits configured" — not an error).
1721    #[cfg(feature = "core")]
1722    async fn read_budget_usage_and_limits(
1723        &self,
1724        _budget_id: &BudgetId,
1725    ) -> Result<BudgetUsageAndLimits, EngineError> {
1726        Err(EngineError::Unavailable {
1727            op: "read_budget_usage_and_limits",
1728        })
1729    }
1730
1731    /// Read the admission-relevant fields of a quota policy
1732    /// (rate limit, window, concurrency cap, jitter). Replaces the
1733    /// Valkey-shaped 4-HGET pattern on `ff:quota:{K}:def` that
1734    /// `ff_scheduler` used pre-FF #511. Returns `None` when the
1735    /// policy row is absent; absence is a well-defined "no admission
1736    /// configured" signal, not an error.
1737    #[cfg(feature = "core")]
1738    async fn read_quota_policy_limits(
1739        &self,
1740        _quota_policy_id: &crate::types::QuotaPolicyId,
1741    ) -> Result<Option<QuotaPolicyLimits>, EngineError> {
1742        Err(EngineError::Unavailable {
1743            op: "read_quota_policy_limits",
1744        })
1745    }
1746
1747    /// Service-layer `evaluate_flow_eligibility` — read-only check
1748    /// that returns the execution's current eligibility state
1749    /// (`eligible`, `blocked_by_dependencies`, or a backend-specific
1750    /// status string). Called by cairn's dependency-resolution path
1751    /// to decide whether a downstream execution can proceed.
1752    #[cfg(feature = "core")]
1753    async fn evaluate_flow_eligibility(
1754        &self,
1755        _args: EvaluateFlowEligibilityArgs,
1756    ) -> Result<EvaluateFlowEligibilityResult, EngineError> {
1757        Err(EngineError::Unavailable {
1758            op: "evaluate_flow_eligibility",
1759        })
1760    }
1761
1762    // ── #454 — cairn typed-FCALL additions (4) ─────────────────
1763
1764    /// Per-execution budget spend with tenant-open dimensions.
1765    ///
1766    /// Cairn #454 Q1/Q2: takes an open-set `BTreeMap<String, u64>` of
1767    /// deltas (distinct from the fixed-shape
1768    /// [`Self::report_usage`]/[`Self::report_usage_admin`] which use
1769    /// [`UsageDimensions`]). Return shape reuses [`ReportUsageResult`]
1770    /// — same `Ok` / `SoftBreach` / `HardBreach` / `AlreadyApplied`
1771    /// variants cairn's UI already branches on.
1772    ///
1773    /// The default impl returns [`EngineError::Unavailable`] so the
1774    /// trait remains additive for backends that have not landed the
1775    /// #454 body yet.
1776    #[cfg(feature = "core")]
1777    async fn record_spend(
1778        &self,
1779        _args: RecordSpendArgs,
1780    ) -> Result<ReportUsageResult, EngineError> {
1781        Err(EngineError::Unavailable {
1782            op: "record_spend",
1783        })
1784    }
1785
1786    /// Per-execution budget attribution release.
1787    ///
1788    /// Called on execution termination to reverse this execution's
1789    /// contribution to a budget counter. Per cairn #454 clarification
1790    /// this is **per-execution**, not a whole-budget flush — the
1791    /// budget persists across executions.
1792    ///
1793    /// The default impl returns [`EngineError::Unavailable`].
1794    #[cfg(feature = "core")]
1795    async fn release_budget(
1796        &self,
1797        _args: ReleaseBudgetArgs,
1798    ) -> Result<(), EngineError> {
1799        Err(EngineError::Unavailable {
1800            op: "release_budget",
1801        })
1802    }
1803
1804    /// Release a quota-admission slot that was recorded via
1805    /// [`Self::check_admission`] / `ff_check_admission_and_record` but
1806    /// for which `issue_claim_grant` subsequently failed. Idempotent:
1807    /// releasing an already-released slot is a no-op.
1808    ///
1809    /// Valkey wraps the existing `ff_release_admission` FCALL (DEL
1810    /// guard + SREM admitted_set + DECR-if-positive concurrency
1811    /// counter). PG/SQLite write to their admission-tracking rows.
1812    ///
1813    /// Used by `ff_scheduler::Scheduler` on the claim-fail rollback
1814    /// path (FF #511). The default impl returns
1815    /// [`EngineError::Unavailable`].
1816    #[cfg(feature = "core")]
1817    async fn release_admission(
1818        &self,
1819        _args: ReleaseAdmissionArgs,
1820    ) -> Result<ReleaseAdmissionResult, EngineError> {
1821        Err(EngineError::Unavailable {
1822            op: "release_admission",
1823        })
1824    }
1825
1826    /// Operator-driven approval-signal delivery.
1827    ///
1828    /// Pre-shaped variant of [`Self::deliver_signal`] where the caller
1829    /// **does not carry the waitpoint token**. The backend reads the
1830    /// token from `ff_waitpoint_pending`, HMAC-verifies server-side,
1831    /// and dispatches. Operator API never sees the token bytes.
1832    ///
1833    /// Cairn #454 Q3 — `signal_name` is a flat string (conventional
1834    /// values `"approved"` / `"rejected"`); audit metadata lives in
1835    /// cairn's audit log, not on the FF surface.
1836    ///
1837    /// The default impl returns [`EngineError::Unavailable`].
1838    #[cfg(feature = "core")]
1839    async fn deliver_approval_signal(
1840        &self,
1841        _args: DeliverApprovalSignalArgs,
1842    ) -> Result<DeliverSignalResult, EngineError> {
1843        Err(EngineError::Unavailable {
1844            op: "deliver_approval_signal",
1845        })
1846    }
1847
1848    /// Backend-atomic `issue_claim_grant` + `claim_execution`.
1849    ///
1850    /// Cairn #454 Q4 — composing the two primitives caller-side risks
1851    /// leaking a grant if `claim_execution` fails after
1852    /// `issue_claim_grant` succeeded. This method's contract is that
1853    /// the composition is backend-atomic: Valkey fuses them in one
1854    /// FCALL; PG/SQLite fuse them in one tx.
1855    ///
1856    /// The default impl **must not** be a chained call — it returns
1857    /// [`EngineError::Unavailable`] so consumers cannot accidentally
1858    /// use a non-atomic fallback.
1859    #[cfg(feature = "core")]
1860    async fn issue_grant_and_claim(
1861        &self,
1862        _args: IssueGrantAndClaimArgs,
1863    ) -> Result<ClaimGrantOutcome, EngineError> {
1864        Err(EngineError::Unavailable {
1865            op: "issue_grant_and_claim",
1866        })
1867    }
1868
1869    // ── RFC-017 Stage A — Budget + quota admin (5) ─────────────
1870
1871    /// Create a budget definition (row 6).
1872    #[cfg(feature = "core")]
1873    async fn create_budget(
1874        &self,
1875        _args: CreateBudgetArgs,
1876    ) -> Result<CreateBudgetResult, EngineError> {
1877        Err(EngineError::Unavailable {
1878            op: "create_budget",
1879        })
1880    }
1881
1882    /// Reset a budget's usage counters (row 10).
1883    #[cfg(feature = "core")]
1884    async fn reset_budget(
1885        &self,
1886        _args: ResetBudgetArgs,
1887    ) -> Result<ResetBudgetResult, EngineError> {
1888        Err(EngineError::Unavailable { op: "reset_budget" })
1889    }
1890
1891    /// Create a quota policy (row 7).
1892    #[cfg(feature = "core")]
1893    async fn create_quota_policy(
1894        &self,
1895        _args: CreateQuotaPolicyArgs,
1896    ) -> Result<CreateQuotaPolicyResult, EngineError> {
1897        Err(EngineError::Unavailable {
1898            op: "create_quota_policy",
1899        })
1900    }
1901
1902    /// Read-only budget status for operator visibility (row 8).
1903    #[cfg(feature = "core")]
1904    async fn get_budget_status(
1905        &self,
1906        _id: &BudgetId,
1907    ) -> Result<BudgetStatus, EngineError> {
1908        Err(EngineError::Unavailable {
1909            op: "get_budget_status",
1910        })
1911    }
1912
1913    /// Admin-path `report_usage` (row 9 + RFC-017 §5 round-1 F4).
1914    /// Distinct from the existing [`Self::report_usage`] which takes
1915    /// a worker handle — the admin path has no lease context.
1916    #[cfg(feature = "core")]
1917    async fn report_usage_admin(
1918        &self,
1919        _budget: &BudgetId,
1920        _args: ReportUsageAdminArgs,
1921    ) -> Result<ReportUsageResult, EngineError> {
1922        Err(EngineError::Unavailable {
1923            op: "report_usage_admin",
1924        })
1925    }
1926
1927    // ── RFC-017 Stage A — Read + diagnostics (3) ───────────────
1928
1929    /// Fetch the stored result payload for a completed execution
1930    /// (row 4). Returns `Ok(None)` when the execution is missing, not
1931    /// yet complete, or its payload was trimmed by retention policy.
1932    async fn get_execution_result(
1933        &self,
1934        _id: &ExecutionId,
1935    ) -> Result<Option<Vec<u8>>, EngineError> {
1936        Err(EngineError::Unavailable {
1937            op: "get_execution_result",
1938        })
1939    }
1940
1941    /// List the pending-or-active waitpoints for an execution, cursor
1942    /// paginated (row 5 / §8). Stage A preserves the existing
1943    /// `PendingWaitpointInfo` shape; Stage D ships the §8 HMAC
1944    /// sanitisation + `(token_kid, token_fingerprint)` schema.
1945    #[cfg(feature = "core")]
1946    async fn list_pending_waitpoints(
1947        &self,
1948        _args: ListPendingWaitpointsArgs,
1949    ) -> Result<ListPendingWaitpointsResult, EngineError> {
1950        Err(EngineError::Unavailable {
1951            op: "list_pending_waitpoints",
1952        })
1953    }
1954
1955    /// Backend-level reachability probe (row 1). Valkey: `PING`;
1956    /// Postgres: `SELECT 1`.
1957    async fn ping(&self) -> Result<(), EngineError> {
1958        Err(EngineError::Unavailable { op: "ping" })
1959    }
1960
1961    // ── RFC-025 worker registry ────────────────────────────────
1962    //
1963    // Four core primitives + Phase-6 `list_workers` readback. See
1964    // `rfcs/RFC-025-worker-registry.md` for the full design. Default
1965    // impls return `Unavailable` so out-of-tree backends keep
1966    // compiling; every in-tree backend overrides.
1967
1968    /// Register (or idempotently refresh) a worker instance. See RFC-025 §4.
1969    #[cfg(feature = "core")]
1970    async fn register_worker(
1971        &self,
1972        _args: RegisterWorkerArgs,
1973    ) -> Result<RegisterWorkerOutcome, EngineError> {
1974        Err(EngineError::Unavailable {
1975            op: "register_worker",
1976        })
1977    }
1978
1979    /// Refresh the worker-instance liveness TTL.
1980    #[cfg(feature = "core")]
1981    async fn heartbeat_worker(
1982        &self,
1983        _args: HeartbeatWorkerArgs,
1984    ) -> Result<HeartbeatWorkerOutcome, EngineError> {
1985        Err(EngineError::Unavailable {
1986            op: "heartbeat_worker",
1987        })
1988    }
1989
1990    /// Operator-driven worker death (distinct from passive TTL expiry).
1991    #[cfg(feature = "core")]
1992    async fn mark_worker_dead(
1993        &self,
1994        _args: MarkWorkerDeadArgs,
1995    ) -> Result<MarkWorkerDeadOutcome, EngineError> {
1996        Err(EngineError::Unavailable {
1997            op: "mark_worker_dead",
1998        })
1999    }
2000
2001    /// Enumerate expired leases for reclaim-decision tooling.
2002    #[cfg(feature = "suspension")]
2003    async fn list_expired_leases(
2004        &self,
2005        _args: ListExpiredLeasesArgs,
2006    ) -> Result<ListExpiredLeasesResult, EngineError> {
2007        Err(EngineError::Unavailable {
2008            op: "list_expired_leases",
2009        })
2010    }
2011
2012    /// Enumerate live workers (RFC-025 Phase 6, §9.4).
2013    #[cfg(feature = "core")]
2014    async fn list_workers(
2015        &self,
2016        _args: ListWorkersArgs,
2017    ) -> Result<ListWorkersResult, EngineError> {
2018        Err(EngineError::Unavailable {
2019            op: "list_workers",
2020        })
2021    }
2022
2023    // ── RFC-017 Stage A — Scheduling (1) ───────────────────────
2024
2025    /// Scheduler-routed claim entrypoint (row 18, RFC-017 §7). Valkey
2026    /// forwards to its `ff_scheduler::Scheduler` cursor; Postgres
2027    /// forwards to `PostgresScheduler`'s `FOR UPDATE SKIP LOCKED`
2028    /// path.
2029    ///
2030    /// Backends that carry an embedded scheduler (e.g. `ValkeyBackend`
2031    /// constructed via `with_embedded_scheduler`, or `PostgresBackend`
2032    /// with its `with_scanners` sibling) route the claim through it.
2033    /// Backends without a wired scheduler return
2034    /// [`EngineError::Unavailable`]. HTTP consumers use
2035    /// `FlowFabricWorker::claim_via_server` instead.
2036    #[cfg(feature = "core")]
2037    async fn claim_for_worker(
2038        &self,
2039        _args: ClaimForWorkerArgs,
2040    ) -> Result<ClaimForWorkerOutcome, EngineError> {
2041        Err(EngineError::Unavailable {
2042            op: "claim_for_worker",
2043        })
2044    }
2045
2046    // ── Cross-cutting (RFC-017 Stage B trait-lift) ──────────────
2047
2048    /// Static observability label identifying the backend family in
2049    /// logs + metrics (RFC-017 §5.4 + §9 Stage B). Default impl
2050    /// returns `"unknown"` so legacy `impl EngineBackend` blocks that
2051    /// have not upgraded keep compiling; every in-tree backend
2052    /// overrides — `ValkeyBackend` → `"valkey"`, `PostgresBackend` →
2053    /// `"postgres"`.
2054    fn backend_label(&self) -> &'static str {
2055        "unknown"
2056    }
2057
2058    /// Backend downcast escape hatch (v0.12 PR-7a transitional).
2059    ///
2060    /// Scanner supervisors in `ff-engine` still dispatch through a
2061    /// concrete `ferriskey::Client`; to keep the engine's public
2062    /// boundary backend-agnostic (`Arc<dyn EngineBackend>`) while the
2063    /// scanner internals remain Valkey-shaped, the engine downcasts
2064    /// via this method and reaches in for the embedded client. Every
2065    /// backend that wants to be consumed by `Engine::start_with_completions`
2066    /// overrides this to return `self` as `&dyn Any`; the default
2067    /// returns a placeholder so a stray `downcast_ref` fails cleanly
2068    /// rather than risking unsound behaviour.
2069    ///
2070    /// v0.13 (PR-7b) will trait-ify individual scanners onto
2071    /// `EngineBackend` and retire `ff-engine`'s dependence on this
2072    /// downcast path. The method itself will remain on the trait
2073    /// (likely deprecated) rather than be removed — removing a
2074    /// public trait method is a breaking change for external
2075    /// `impl EngineBackend` blocks.
2076    fn as_any(&self) -> &(dyn std::any::Any + 'static) {
2077        // Placeholder so the default does not expose `Self` for
2078        // downcast. Backends override to return `self`.
2079        &()
2080    }
2081
2082    /// RFC-018 Stage A: snapshot of this backend's identity + the
2083    /// flat `Supports` surface it can actually service. Consumers use
2084    /// this at startup to gate UI features / choose between alternative
2085    /// code paths before dispatching. See
2086    /// `rfcs/RFC-018-backend-capability-discovery.md` for the full
2087    /// discovery contract and the four owner-adjudicated open
2088    /// questions (granularity: coarse; version: struct; sync; no
2089    /// event stream).
2090    ///
2091    /// Default: returns a value tagged `family = "unknown"` with every
2092    /// `supports.*` bool `false`, so pre-RFC-018 out-of-tree backends
2093    /// keep compiling and consumers treat "all false" as "dispatch
2094    /// and catch [`EngineError::Unavailable`]" (pre-RFC-018 behaviour).
2095    /// Concrete in-tree backends (`ValkeyBackend`, `PostgresBackend`)
2096    /// override to populate a real value.
2097    ///
2098    /// Sync (no `.await`): backend-static info should not require a
2099    /// probe on every query. Dynamic probes happen once at
2100    /// `connect*` time and cache the result.
2101    fn capabilities(&self) -> crate::capability::Capabilities {
2102        crate::capability::Capabilities::new(
2103            crate::capability::BackendIdentity::new(
2104                "unknown",
2105                crate::capability::Version::new(0, 0, 0),
2106                "unknown",
2107            ),
2108            crate::capability::Supports::none(),
2109        )
2110    }
2111
2112    /// Issue #281: run one-time backend-specific boot preparation.
2113    ///
2114    /// Intended to run ONCE per deployment startup — NOT per request.
2115    /// Idempotent and safe for consumers to call on every application
2116    /// boot; backends that have nothing to do return
2117    /// [`PrepareOutcome::NoOp`] without side effects.
2118    ///
2119    /// Per-backend behaviour:
2120    ///
2121    /// * **Valkey** — issues `FUNCTION LOAD REPLACE` for the
2122    ///   `flowfabric` Lua library (with bounded retry on transient
2123    ///   transport faults; permanent compile errors surface as
2124    ///   [`EngineError::Transport`] without retry). Returns
2125    ///   [`PrepareOutcome::Applied`] carrying
2126    ///   `"FUNCTION LOAD (flowfabric lib v<N>)"`.
2127    /// * **Postgres** — returns [`PrepareOutcome::NoOp`]. Schema
2128    ///   migrations are applied out-of-band per
2129    ///   `rfcs/drafts/v0.7-migration-master.md §Q12`; the backend
2130    ///   runs a schema-version check at connect time and refuses to
2131    ///   start on mismatch, so no boot-side prepare work remains.
2132    /// * **Default impl** — returns [`PrepareOutcome::NoOp`] so
2133    ///   out-of-tree backends without preparation work compile
2134    ///   without boilerplate.
2135    ///
2136    /// # Relationship to the in-tree boot path
2137    ///
2138    /// `ValkeyBackend::initialize_deployment` (called from
2139    /// `Server::start_with_metrics`) already invokes
2140    /// [`ensure_library`](ff_script::loader::ensure_library) inline as
2141    /// its step 4; that path is unchanged. `prepare()` exists as a
2142    /// **trait-surface entry point** so consumers that construct an
2143    /// `Arc<dyn EngineBackend>` outside of `Server` (e.g.
2144    /// cairn-fabric's boot path at `cairn-fabric/src/boot.rs`) can
2145    /// run the same preparation without reaching into
2146    /// backend-specific modules. The overlap is intentional: calling
2147    /// both `prepare()` and `initialize_deployment` is safe because
2148    /// `FUNCTION LOAD REPLACE` is idempotent under the version
2149    /// check.
2150    ///
2151    /// # Layer forwarding
2152    ///
2153    /// Layer impls (`HookedBackend`, ff-sdk layers) do NOT forward
2154    /// `prepare` today — consistent with `backend_label` / `ping` /
2155    /// `shutdown_prepare`. Consumers that wrap a backend in layers
2156    /// MUST call `prepare()` on the raw backend before wrapping, or
2157    /// accept the default [`PrepareOutcome::NoOp`].
2158    async fn prepare(&self) -> Result<PrepareOutcome, EngineError> {
2159        Ok(PrepareOutcome::NoOp)
2160    }
2161
2162    /// Drain-before-shutdown hook (RFC-017 §5.4). The server calls
2163    /// this before draining its own background tasks so backend-
2164    /// scoped primitives (Valkey stream semaphore, Postgres sqlx
2165    /// pool, …) can close their gates and await in-flight work up to
2166    /// `grace`.
2167    ///
2168    /// Default impl returns `Ok(())` — a no-op backend has nothing
2169    /// backend-scoped to drain. Concrete backends whose data plane
2170    /// owns resources (connection pools, semaphores, listeners)
2171    /// override with a real body.
2172    async fn shutdown_prepare(&self, _grace: Duration) -> Result<(), EngineError> {
2173        Ok(())
2174    }
2175
2176    // ── RFC-017 Stage E2 — `Server::client` removal (header + reads) ───
2177
2178    /// RFC-017 Stage E2: the "header" portion of `cancel_flow` — run the
2179    /// atomic flow-state flip (Valkey: `ff_cancel_flow` FCALL; Postgres:
2180    /// `cancel_flow_once` tx), decode policy + membership, and surface
2181    /// the `flow_already_terminal` idempotency branch as a first-class
2182    /// [`CancelFlowHeader::AlreadyTerminal`] so the Server can build
2183    /// the wire [`CancelFlowResult`] without reaching for a raw
2184    /// `Client`. Separate from the existing
2185    /// [`EngineBackend::cancel_flow`] entry point (which takes the
2186    /// enum-typed `(policy, wait)` split and returns the wait-collapsed
2187    /// `CancelFlowResult`) because the Server owns its own
2188    /// wait-dispatch + member-cancel machinery via
2189    /// [`EngineBackend::cancel_execution`] + backlog ack.
2190    ///
2191    /// Default impl returns [`EngineError::Unavailable`] so un-migrated
2192    /// backends surface the miss loudly.
2193    #[cfg(feature = "core")]
2194    async fn cancel_flow_header(
2195        &self,
2196        _args: CancelFlowArgs,
2197    ) -> Result<crate::contracts::CancelFlowHeader, EngineError> {
2198        Err(EngineError::Unavailable {
2199            op: "cancel_flow_header",
2200        })
2201    }
2202
2203    /// RFC-017 Stage E2: best-effort acknowledgement that one member of
2204    /// a `cancel_all` flow has completed its per-member cancel. Drains
2205    /// the member from the flow's `pending_cancels` set and, if empty,
2206    /// removes the flow from the partition-level `cancel_backlog`
2207    /// (Valkey: `ff_ack_cancel_member` FCALL; Postgres: table write —
2208    /// default `Unavailable` until Wave 9).
2209    ///
2210    /// Failures are swallowed by the caller — the cancel-backlog
2211    /// reconciler is the authoritative drain — but a typed error here
2212    /// lets the caller log a backend-scoped context string.
2213    #[cfg(feature = "core")]
2214    async fn ack_cancel_member(
2215        &self,
2216        _flow_id: &FlowId,
2217        _execution_id: &ExecutionId,
2218    ) -> Result<(), EngineError> {
2219        Err(EngineError::Unavailable {
2220            op: "ack_cancel_member",
2221        })
2222    }
2223
2224    /// RFC-017 Stage E2: full-shape execution read used by the
2225    /// `GET /v1/executions/{id}` HTTP route. Returns the legacy
2226    /// [`ExecutionInfo`] wire shape (not the decoupled
2227    /// [`ExecutionSnapshot`]) so the existing HTTP response bytes stay
2228    /// identical across the migration.
2229    ///
2230    /// `Ok(None)` ⇒ no such execution. Default `Unavailable` because
2231    /// the Valkey HGETALL-and-parse is backend-specific.
2232    #[cfg(feature = "core")]
2233    async fn read_execution_info(
2234        &self,
2235        _id: &ExecutionId,
2236    ) -> Result<Option<ExecutionInfo>, EngineError> {
2237        Err(EngineError::Unavailable {
2238            op: "read_execution_info",
2239        })
2240    }
2241
2242    /// RFC-017 Stage E2: narrow `public_state` read used by the
2243    /// `GET /v1/executions/{id}/state` HTTP route. Returns `Ok(None)`
2244    /// when the execution is missing. Default `Unavailable`.
2245    #[cfg(feature = "core")]
2246    async fn read_execution_state(
2247        &self,
2248        _id: &ExecutionId,
2249    ) -> Result<Option<PublicState>, EngineError> {
2250        Err(EngineError::Unavailable {
2251            op: "read_execution_state",
2252        })
2253    }
2254
2255    // ── RFC-019 Stage A/B/C — Stream-cursor subscriptions ─────────
2256    //
2257    // Four owner-adjudicated families (RFC-019 §Open Questions #5):
2258    // `lease_history`, `completion`, `signal_delivery`,
2259    // `instance_tags`. Stage C (this crate) promotes each family to
2260    // a typed event enum; consumers `match` on variants instead of
2261    // parsing a backend-shaped byte blob.
2262    //
2263    // Each method returns a family-specific subscription alias (see
2264    // [`crate::stream_events`]). All defaults return
2265    // `EngineError::Unavailable` per RFC-017 trait-growth conventions.
2266
2267    /// Subscribe to lease lifecycle events (acquired / renewed /
2268    /// expired / reclaimed / revoked) for the partition this backend
2269    /// is configured with.
2270    ///
2271    /// Cross-partition fan-out is consumer-side merge: subscribe
2272    /// per-partition backend instance and interleave on the read
2273    /// side. Yields
2274    /// `Err(EngineError::StreamDisconnected { cursor })` on backend
2275    /// disconnect; resume by calling this method again with the
2276    /// returned cursor.
2277    ///
2278    /// `filter` (#282): when `filter.instance_tag` is `Some((k, v))`,
2279    /// only events whose execution carries tag `k = v` are yielded
2280    /// (matching the [`crate::backend::ScannerFilter`] surface from
2281    /// #122). Pass `&ScannerFilter::default()` for unfiltered
2282    /// behaviour. Filtering happens inside the backend stream; the
2283    /// [`crate::stream_events::LeaseHistorySubscription`] return type
2284    /// is unchanged.
2285    async fn subscribe_lease_history(
2286        &self,
2287        _cursor: crate::stream_subscribe::StreamCursor,
2288        _filter: &crate::backend::ScannerFilter,
2289    ) -> Result<crate::stream_events::LeaseHistorySubscription, EngineError> {
2290        Err(EngineError::Unavailable {
2291            op: "subscribe_lease_history",
2292        })
2293    }
2294
2295    /// Subscribe to completion events (terminal state transitions).
2296    ///
2297    /// - **Postgres**: wraps the `ff_completion_event` outbox +
2298    ///   LISTEN/NOTIFY machinery. Durable via event-id cursor.
2299    /// - **Valkey**: wraps the RESP3 `ff:dag:completions` pubsub
2300    ///   subscriber. Pubsub is at-most-once over the live
2301    ///   subscription window; the cursor is always the empty
2302    ///   sentinel. If you need at-least-once replay with durable
2303    ///   cursor resume, use the Postgres backend (see
2304    ///   `docs/POSTGRES_PARITY_MATRIX.md` row `subscribe_completion`).
2305    ///
2306    /// `filter` (#282): see [`Self::subscribe_lease_history`]. Valkey
2307    /// reuses the `subscribe_completions_filtered` per-event HGET
2308    /// gate; Postgres filters inline against the outbox's denormalised
2309    /// `instance_tag` column.
2310    async fn subscribe_completion(
2311        &self,
2312        _cursor: crate::stream_subscribe::StreamCursor,
2313        _filter: &crate::backend::ScannerFilter,
2314    ) -> Result<crate::stream_events::CompletionSubscription, EngineError> {
2315        Err(EngineError::Unavailable {
2316            op: "subscribe_completion",
2317        })
2318    }
2319
2320    /// Subscribe to signal-delivery events (satisfied / buffered /
2321    /// deduped).
2322    ///
2323    /// `filter` (#282): see [`Self::subscribe_lease_history`].
2324    async fn subscribe_signal_delivery(
2325        &self,
2326        _cursor: crate::stream_subscribe::StreamCursor,
2327        _filter: &crate::backend::ScannerFilter,
2328    ) -> Result<crate::stream_events::SignalDeliverySubscription, EngineError> {
2329        Err(EngineError::Unavailable {
2330            op: "subscribe_signal_delivery",
2331        })
2332    }
2333
2334    /// Subscribe to instance-tag events (tag attached / cleared).
2335    ///
2336    /// Producer wiring is deferred per #311 audit ("no concrete
2337    /// demand"); the trait method exists for API uniformity across
2338    /// the four families. Backends currently return
2339    /// `EngineError::Unavailable`.
2340    async fn subscribe_instance_tags(
2341        &self,
2342        _cursor: crate::stream_subscribe::StreamCursor,
2343    ) -> Result<crate::stream_events::InstanceTagSubscription, EngineError> {
2344        Err(EngineError::Unavailable {
2345            op: "subscribe_instance_tags",
2346        })
2347    }
2348
2349    // ── PR-7b Cluster 1 — Foundation scanner operations ─────────
2350    //
2351    // Per-execution write hooks invoked by the engine scanner loop.
2352    // Scanner bodies discover candidate executions via partition
2353    // indices (Valkey ZSETs / PG index scans) and, for each candidate,
2354    // call through to one of the methods below to perform the atomic
2355    // state-flip. Defaults return `EngineError::Unavailable { op }`;
2356    // Valkey impls wrap the corresponding `ff_*` FCALL; Postgres impls
2357    // run a single-row tx mirroring the Lua semantic.
2358
2359    /// Lease-expiry scanner hook — mark an expired lease as reclaimable
2360    /// so another worker can redeem the execution. Atomic per call.
2361    ///
2362    /// Valkey: `FCALL ff_mark_lease_expired_if_due`.
2363    /// Postgres: single-row tx on `ff_attempt` + `ff_exec_core` (see
2364    /// `ff_backend_postgres::reconcilers::lease_expiry`).
2365    async fn mark_lease_expired_if_due(
2366        &self,
2367        _partition: crate::partition::Partition,
2368        _execution_id: &ExecutionId,
2369    ) -> Result<(), EngineError> {
2370        Err(EngineError::Unavailable {
2371            op: "mark_lease_expired_if_due",
2372        })
2373    }
2374
2375    /// Delayed-promoter scanner hook — promote a delayed execution to
2376    /// `eligible_now` once its `delay_until` has passed.
2377    ///
2378    /// Valkey: `FCALL ff_promote_delayed`.
2379    /// Postgres: Wave 9 schema scope (no current impl).
2380    async fn promote_delayed(
2381        &self,
2382        _partition: crate::partition::Partition,
2383        _lane: &LaneId,
2384        _execution_id: &ExecutionId,
2385        _now_ms: TimestampMs,
2386    ) -> Result<(), EngineError> {
2387        Err(EngineError::Unavailable {
2388            op: "promote_delayed",
2389        })
2390    }
2391
2392    /// Pending-waitpoint-expiry scanner hook — close a pending
2393    /// waitpoint whose deadline has passed (wake the suspended
2394    /// execution with a timeout signal).
2395    ///
2396    /// Valkey: `FCALL ff_close_waitpoint`.
2397    /// Postgres: Wave 9 schema scope (no current impl).
2398    async fn close_waitpoint(
2399        &self,
2400        _partition: crate::partition::Partition,
2401        _execution_id: &ExecutionId,
2402        _waitpoint_id: &str,
2403        _now_ms: TimestampMs,
2404    ) -> Result<(), EngineError> {
2405        Err(EngineError::Unavailable {
2406            op: "close_waitpoint",
2407        })
2408    }
2409
2410    /// Shared hook for the attempt-timeout and execution-deadline
2411    /// scanners — terminate or retry an execution whose wall-clock
2412    /// budget has elapsed. `phase` discriminates which of the two
2413    /// scanner paths is calling so the backend can preserve diagnostic
2414    /// breadcrumbs without forking the surface.
2415    ///
2416    /// Valkey: `FCALL ff_expire_execution` (with `phase` passed through
2417    /// as an ARGV discriminator).
2418    /// Postgres: single-row tx mirror of the Lua semantic for
2419    /// `AttemptTimeout`; `ExecutionDeadline` is Wave 9 schema scope.
2420    async fn expire_execution(
2421        &self,
2422        _partition: crate::partition::Partition,
2423        _execution_id: &ExecutionId,
2424        _phase: ExpirePhase,
2425        _now_ms: TimestampMs,
2426    ) -> Result<(), EngineError> {
2427        Err(EngineError::Unavailable {
2428            op: "expire_execution",
2429        })
2430    }
2431
2432    /// Suspension-timeout scanner hook — expire a suspended execution
2433    /// whose suspension deadline has passed (wake with timeout).
2434    ///
2435    /// Valkey: `FCALL ff_expire_suspension`.
2436    /// Postgres: single-row tx on `ff_suspend` + `ff_exec_core`.
2437    async fn expire_suspension(
2438        &self,
2439        _partition: crate::partition::Partition,
2440        _execution_id: &ExecutionId,
2441        _now_ms: TimestampMs,
2442    ) -> Result<(), EngineError> {
2443        Err(EngineError::Unavailable {
2444            op: "expire_suspension",
2445        })
2446    }
2447
2448    // ── PR-7b Cluster 2 — Reconciler scanner operations ─────────
2449    //
2450    // Per-execution write hooks invoked by the `unblock` scanner. The
2451    // other two cluster-2 scanners (`budget_reset`, `dependency_reconciler`)
2452    // route through `reset_budget` + `resolve_dependency`, which are
2453    // already part of the RFC-017 service-layer surface and live above.
2454
2455    /// Unblock-scanner hook — move an execution from a blocked ZSET back
2456    /// to `eligible_now` once its blocking condition has cleared (budget
2457    /// under limit, quota window drained, or a capable worker has come
2458    /// online). `expected_blocking_reason` discriminates which of the
2459    /// `blocked:{budget,quota,route}` sets the execution is leaving and
2460    /// also fences against a stale unblock (Lua rejects if the core's
2461    /// `blocking_reason` no longer matches).
2462    ///
2463    /// # Backend status
2464    ///
2465    /// - **Valkey:** wraps `ff_unblock_execution` (RFC-010 §6). Atomic
2466    ///   single-slot FCALL on `{p:N}`.
2467    /// - **Postgres:** `Unavailable` (structural). PG does not persist a
2468    ///   per-reason `blocked:{budget,quota,route}` index — scheduler
2469    ///   eligibility is re-evaluated live via SQL predicates on
2470    ///   `ff_exec_core` + budget / quota tables (see
2471    ///   `ff_backend_postgres::scheduler`). Nothing to reconcile, so the
2472    ///   engine's PG scanner loop does not run this path; callers on
2473    ///   a PG deployment receive the typed `Unavailable` per PR-7b/final
2474    ///   contract.
2475    /// - **SQLite:** `Unavailable` for the same reason as Postgres.
2476    async fn unblock_execution(
2477        &self,
2478        _partition: crate::partition::Partition,
2479        _lane_id: &LaneId,
2480        _execution_id: &ExecutionId,
2481        _expected_blocking_reason: &str,
2482        _now_ms: TimestampMs,
2483    ) -> Result<(), EngineError> {
2484        Err(EngineError::Unavailable {
2485            op: "unblock_execution",
2486        })
2487    }
2488
2489    /// RFC-016 Stage C — sibling-cancel group drain.
2490    ///
2491    /// After `edge_cancel_dispatcher` has issued
2492    /// [`EngineBackend::cancel_execution`] against every listed
2493    /// sibling of a `(flow_id, downstream_eid)` group, this trait
2494    /// method atomically removes the tuple from the partition-local
2495    /// pending-cancel-groups index and clears the per-group flag +
2496    /// members state.
2497    ///
2498    /// Valkey: `FCALL ff_drain_sibling_cancel_group` (SREM +
2499    /// HDEL in one Lua unit).
2500    /// Postgres: `Unavailable` — PG's Wave-6b
2501    /// `reconcilers::edge_cancel_dispatcher::dispatcher_tick`
2502    /// owns the equivalent row drain inside its own per-group
2503    /// transaction; the Valkey-shaped per-tuple call is not used on
2504    /// the PG scanner path.
2505    /// SQLite: `Unavailable` (mirrors PG; RFC-023 scope).
2506    #[cfg(feature = "core")]
2507    async fn drain_sibling_cancel_group(
2508        &self,
2509        _flow_partition: crate::partition::Partition,
2510        _flow_id: &FlowId,
2511        _downstream_eid: &ExecutionId,
2512    ) -> Result<(), EngineError> {
2513        Err(EngineError::Unavailable {
2514            op: "drain_sibling_cancel_group",
2515        })
2516    }
2517
2518    /// RFC-016 Stage D — sibling-cancel group reconcile (Invariant
2519    /// Q6 safety net).
2520    ///
2521    /// Re-examines a `(flow_id, downstream_eid)` tuple in the
2522    /// partition-local pending-cancel-groups index and returns one
2523    /// of three atomic dispositions — see
2524    /// [`SiblingCancelReconcileAction`] — so the crash-recovery
2525    /// reconciler can drain stale or completed tuples without
2526    /// fighting the Stage-C dispatcher.
2527    ///
2528    /// Valkey: `FCALL ff_reconcile_sibling_cancel_group`.
2529    /// Postgres: `Unavailable` — PG's
2530    /// `reconcilers::edge_cancel_reconciler::reconciler_tick`
2531    /// owns the row-level reconcile inside its own batched tx.
2532    /// SQLite: `Unavailable` (mirrors PG).
2533    #[cfg(feature = "core")]
2534    async fn reconcile_sibling_cancel_group(
2535        &self,
2536        _flow_partition: crate::partition::Partition,
2537        _flow_id: &FlowId,
2538        _downstream_eid: &ExecutionId,
2539    ) -> Result<SiblingCancelReconcileAction, EngineError> {
2540        Err(EngineError::Unavailable {
2541            op: "reconcile_sibling_cancel_group",
2542        })
2543    }
2544
2545    // ── PR-7b Cluster 2b-A — Tally-recompute reconciler scanners ─────
2546    //
2547    // Coarse trait methods: each runs a full scan-and-fix pass over
2548    // one partition. Unlike cluster-2's per-candidate write hooks,
2549    // these scanners do multi-round-trip discovery (SSCAN / HMGET /
2550    // HGETALL / ZSCORE / ZREMRANGEBYSCORE) interleaved with conditional
2551    // writes. Moving the whole pass into the trait impl achieves the
2552    // PR-7b/final contract ("scanner trait leaks no `ferriskey::Client`")
2553    // without atomising operations that are inherently not atomic.
2554    //
2555    // Backend status (all three methods):
2556    //
2557    // - **Valkey:** real scan-loop in the trait impl; identical command
2558    //   shape to the pre-routing scanner path.
2559    // - **Postgres:** default `Unavailable`. PG mutates budget / quota
2560    //   counters and scheduling state inside SERIALIZABLE transactions,
2561    //   and the scheduler evaluates eligibility via live SQL predicates
2562    //   on `ff_exec_core` + budget / quota tables — no persistent index
2563    //   or counter hash that can drift, hence nothing to reconcile. The
2564    //   engine's PG scanner supervisor does not run these paths.
2565    // - **SQLite:** same as Postgres (RFC-023: SQLite runs its own
2566    //   scanner supervisor; these FF-owned tally-recompute scanners
2567    //   are not registered).
2568
2569    /// Index-reconciler pass — walks `ff:idx:{p:N}:all_executions`
2570    /// and verifies each execution appears in the correct scheduling
2571    /// sorted set (`eligible` / `delayed` / `blocked:*` / `active` /
2572    /// `suspended` / `terminal`) for its current `(lifecycle_phase,
2573    /// eligibility_state, ownership_state)` triple. Phase 1 is
2574    /// log-only; auto-fix is deferred to a later phase (RFC-010 §6.14).
2575    async fn reconcile_execution_index(
2576        &self,
2577        _partition: crate::partition::Partition,
2578        _lanes: &[LaneId],
2579        _filter: &crate::backend::ScannerFilter,
2580    ) -> Result<ReconcileCounts, EngineError> {
2581        Err(EngineError::Unavailable {
2582            op: "reconcile_execution_index",
2583        })
2584    }
2585
2586    /// Budget-reconciler pass — walks `ff:budget:{b:M}:policies_idx`,
2587    /// reads each budget's definition / usage / limits, and reconciles
2588    /// the `breached_at` marker against hard limits. Resetting budgets
2589    /// (non-zero `reset_interval_ms`) are skipped — they are handled
2590    /// by the `budget_reset` scanner (cluster 2). Drops index entries
2591    /// for budgets whose definition hash has been deleted (retention
2592    /// purge / manual). RFC-008 §Budget Reconciliation, RFC-010 §6.5.
2593    async fn reconcile_budget_counters(
2594        &self,
2595        _partition: crate::partition::Partition,
2596        _now_ms: TimestampMs,
2597    ) -> Result<ReconcileCounts, EngineError> {
2598        Err(EngineError::Unavailable {
2599            op: "reconcile_budget_counters",
2600        })
2601    }
2602
2603    /// Quota-reconciler pass — walks `ff:quota:{q:M}:policies_idx`,
2604    /// trims expired entries from rate-limit sliding windows, and
2605    /// recomputes each policy's concurrency counter by walking its
2606    /// `admitted_set` and pruning entries whose admission guard key
2607    /// has TTLed out. RFC-008 §Quota Reconciliation, RFC-010 §6.6.
2608    async fn reconcile_quota_counters(
2609        &self,
2610        _partition: crate::partition::Partition,
2611        _now_ms: TimestampMs,
2612    ) -> Result<ReconcileCounts, EngineError> {
2613        Err(EngineError::Unavailable {
2614            op: "reconcile_quota_counters",
2615        })
2616    }
2617
2618    // ── PR-7b Cluster 2b-B — Projection + retention scanners ──────
2619    //
2620    // Two additional non-FCALL scanners routed through the trait so
2621    // engine-side scanners stop touching `ferriskey::Client` directly.
2622    // Both ride on aggregate reads + derived writes; neither is a thin
2623    // single-FCALL passthrough.
2624
2625    /// Flow-projector scanner hook — sample flow members, derive an
2626    /// aggregate `public_flow_state` + per-state counts, and write them
2627    /// to the flow summary projection. Returns `Ok(true)` when the
2628    /// summary was updated, `Ok(false)` when the flow had no members
2629    /// or the index entry was defensively pruned (core missing).
2630    ///
2631    /// The derived `public_flow_state` written here is distinct from
2632    /// `ff_flow_core.public_flow_state` — the former is a rollup
2633    /// dashboard field, the latter is the authoritative mutation-guard
2634    /// state owned by `create_flow` / `cancel_flow`. See
2635    /// `ff_engine::scanner::flow_projector` module doc for the
2636    /// two-sources contract.
2637    ///
2638    /// # Backend status
2639    ///
2640    /// - **Valkey:** lifts the pre-PR-7b Rust-composed
2641    ///   SCARD + SRANDMEMBER + per-member HGET + HSET summary pattern.
2642    ///   No new Lua function; the aggregation is inherently
2643    ///   multi-round-trip (cross-partition member reads) and atomicity
2644    ///   is neither required nor achievable against 256 partitions.
2645    /// - **Postgres:** SELECT aggregates from `ff_exec_core` grouped by
2646    ///   `public_state` for the flow's members, INSERT ... ON CONFLICT
2647    ///   DO UPDATE into `ff_flow_summary` (migration 0019). One query
2648    ///   per flow; partition-local aggregation.
2649    /// - **SQLite:** `Unavailable` per RFC-023 Phase 3.5 (flow summary
2650    ///   projection is a shared-deployment dashboard feature; local
2651    ///   single-tenant SQLite deployments don't need it).
2652    #[cfg(feature = "core")]
2653    async fn project_flow_summary(
2654        &self,
2655        _partition: crate::partition::Partition,
2656        _flow_id: &FlowId,
2657        _now_ms: TimestampMs,
2658    ) -> Result<bool, EngineError> {
2659        Err(EngineError::Unavailable {
2660            op: "project_flow_summary",
2661        })
2662    }
2663
2664    /// Retention-trimmer scanner hook — delete terminal executions and
2665    /// all their subordinate keys/rows once they are older than the
2666    /// configured retention window. Returns the number of executions
2667    /// actually purged in this call (so the scanner can loop when it
2668    /// hits `batch_size`).
2669    ///
2670    /// Retention trimming is inherently a scan-and-delete loop over
2671    /// time; the trait surface exists to remove engine-side Valkey
2672    /// coupling, **not** to atomise the operation into a single
2673    /// round-trip. Implementations may issue multiple round-trips
2674    /// (e.g. per-execution cascade across sibling tables); the trait
2675    /// contract is "make progress, bounded by batch_size".
2676    ///
2677    /// # Backend status
2678    ///
2679    /// - **Valkey:** lifts the pre-PR-7b ZRANGEBYSCORE + per-exec
2680    ///   cascade-delete loop verbatim. Cluster-safe (all keys for one
2681    ///   execution live on the same `{p:N}` slot).
2682    /// - **Postgres:** `DELETE FROM ff_exec_core` for terminal rows
2683    ///   past the cutoff plus explicit cascade DELETEs on every
2684    ///   execution-scoped sibling table (no FK CASCADE in the schema).
2685    ///   Single transaction per batch.
2686    /// - **SQLite:** `Unavailable` per RFC-023 Phase 3.5. Single-tenant
2687    ///   local deployments typically manage their own DB lifecycle and
2688    ///   do not need a retention scanner.
2689    #[cfg(feature = "core")]
2690    async fn trim_retention(
2691        &self,
2692        _partition: crate::partition::Partition,
2693        _lane_id: &LaneId,
2694        _retention_ms: u64,
2695        _now_ms: TimestampMs,
2696        _batch_size: u32,
2697        _filter: &crate::backend::ScannerFilter,
2698    ) -> Result<u32, EngineError> {
2699        Err(EngineError::Unavailable {
2700            op: "trim_retention",
2701        })
2702    }
2703
2704    // ── PR-7b Wave 0a: exec_core field read primitive ──
2705
2706    /// Point-read N fields from the `exec_core` hash for a given
2707    /// execution. Returns a map of field-name → Option<String>
2708    /// (None for fields absent or stored as NULL). Scanner call
2709    /// sites formerly issuing raw `HGET`/`HMGET` on `ExecKeyContext::core()`
2710    /// route through this trait method (cairn #436 / PR-7b Wave 0a).
2711    ///
2712    /// Field values are coerced to String at the trait boundary for
2713    /// wire compatibility with the Valkey HGET shape. Consumers parse
2714    /// specific fields (`lane_id`, `current_attempt_index`, etc.) from
2715    /// the returned strings as needed.
2716    ///
2717    /// - **Valkey:** `HMGET exec_core_key f1 f2 ...`, zipping names to values.
2718    /// - **Postgres:** `SELECT` against `ff_exec_core` with dynamic column
2719    ///   extraction; fields in `raw_fields` JSONB are extracted via `->>`.
2720    /// - **SQLite:** equivalent with `json_extract(raw_fields, '$.field')`.
2721    ///
2722    /// Default body returns `Unavailable` so non-v0.13 backends remain
2723    /// compile-compatible.
2724    #[cfg(feature = "core")]
2725    async fn read_exec_core_fields(
2726        &self,
2727        _partition: crate::partition::Partition,
2728        _execution_id: &crate::types::ExecutionId,
2729        _fields: &[&str],
2730    ) -> Result<std::collections::HashMap<String, Option<String>>, EngineError> {
2731        Err(EngineError::Unavailable {
2732            op: "read_exec_core_fields",
2733        })
2734    }
2735
2736    // ── PR-7b Wave 0a: backend clock primitive ──
2737
2738    /// Returns the backend's current wall-clock epoch milliseconds.
2739    ///
2740    /// Used by 15 scanners to compute "due" thresholds before issuing
2741    /// per-partition due-scans. Previously a Valkey-only helper in
2742    /// `ff-engine`'s `scanner::lease_expiry` issuing `TIME`; this
2743    /// trait method is the backend-agnostic replacement (cairn #436 /
2744    /// PR-7b Wave 0a).
2745    ///
2746    /// Every in-tree backend overrides. The default falls back to
2747    /// `SystemTime::now()` so out-of-tree `EngineBackend` impls (e.g.
2748    /// cairn mocks, test doubles) stay source-compatible across v0.12
2749    /// → v0.13.
2750    ///
2751    /// - **Valkey:** `TIME` command.
2752    /// - **Postgres:** `SELECT (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint`
2753    ///   (not `now()` — `now()` is the transaction start timestamp,
2754    ///   which is stale under any long-running tx; scanners need the
2755    ///   true wall-clock read).
2756    /// - **SQLite:** `SELECT CAST((julianday('now') - 2440587.5) * 86400000 AS INTEGER)`.
2757    #[cfg(feature = "core")]
2758    async fn server_time_ms(&self) -> Result<u64, EngineError> {
2759        use std::time::{SystemTime, UNIX_EPOCH};
2760        let d = SystemTime::now()
2761            .duration_since(UNIX_EPOCH)
2762            .map_err(|e| EngineError::Transport {
2763                backend: "system-clock",
2764                source: format!("server_time_ms default: {e}").into(),
2765            })?;
2766        Ok(d.as_millis() as u64)
2767    }
2768}
2769
2770/// RFC-016 Stage D outcome of a single
2771/// [`EngineBackend::reconcile_sibling_cancel_group`] call.
2772///
2773/// Cardinality is intentionally bounded to three so the
2774/// `ff_sibling_cancel_reconcile_total{action}` metric label stays
2775/// closed.
2776#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2777pub enum SiblingCancelReconcileAction {
2778    /// Pending tuple was stale (flag cleared or edgegroup absent);
2779    /// SREM'd / DELETE'd without touching the group.
2780    SremmedStale,
2781    /// Flag still set but every listed sibling is already terminal —
2782    /// an interrupted drain. Flag cleared + tuple drained.
2783    CompletedDrain,
2784    /// Flag set and at least one sibling non-terminal — dispatcher
2785    /// owns this tuple; left untouched.
2786    NoOp,
2787}
2788
2789impl SiblingCancelReconcileAction {
2790    /// Short label for observability (matches the Lua + PG action
2791    /// strings exactly).
2792    pub fn as_str(&self) -> &'static str {
2793        match self {
2794            Self::SremmedStale => "sremmed_stale",
2795            Self::CompletedDrain => "completed_drain",
2796            Self::NoOp => "no_op",
2797        }
2798    }
2799}
2800
2801/// Result of one reconciler scan pass over a single partition.
2802///
2803/// `processed` counts candidates where the reconciler detected a
2804/// correctable condition (drift, breach flip, stale guard). `errors`
2805/// counts transport / parse failures. Scanners sum these into the
2806/// engine's per-pass metrics exactly as they did before PR-7b/2b-A.
2807#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
2808pub struct ReconcileCounts {
2809    /// Items the scanner corrected (drift fix, breach flip, guard
2810    /// eviction). Zero when the partition is healthy.
2811    pub processed: u32,
2812    /// Items the scanner could not process due to transport or parse
2813    /// errors. Non-zero values surface through scanner metrics and
2814    /// should be investigated.
2815    pub errors: u32,
2816}
2817
2818/// Which scanner invoked [`EngineBackend::expire_execution`].
2819///
2820/// The attempt-timeout and execution-deadline scanners share a single
2821/// trait method — their underlying state flip is identical (terminate
2822/// or retry per retry policy); the distinction is purely diagnostic
2823/// (which deadline elapsed) and carried through to the backend so the
2824/// same Lua / SQL path can log or tag appropriately.
2825#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2826pub enum ExpirePhase {
2827    /// Invoked by `attempt_timeout` scanner — the per-attempt
2828    /// wall-clock budget elapsed.
2829    AttemptTimeout,
2830    /// Invoked by `execution_deadline` scanner — the whole-execution
2831    /// wall-clock deadline elapsed.
2832    ExecutionDeadline,
2833}
2834
2835impl ExpirePhase {
2836    /// Short string tag suitable for Lua ARGV or Postgres breadcrumbs.
2837    pub fn as_str(&self) -> &'static str {
2838        match self {
2839            Self::AttemptTimeout => "attempt_timeout",
2840            Self::ExecutionDeadline => "execution_deadline",
2841        }
2842    }
2843}
2844
2845/// Object-safety assertion: `dyn EngineBackend` compiles iff every
2846/// method is dyn-compatible. Kept as a compile-time guard so a future
2847/// trait change that accidentally breaks dyn-safety fails the build
2848/// at this site rather than at every downstream `Arc<dyn
2849/// EngineBackend>` use.
2850#[allow(dead_code)]
2851fn _assert_dyn_compatible(_: &dyn EngineBackend) {}
2852
2853/// Polling interval for [`wait_for_flow_cancellation`]. Tight enough
2854/// that a local single-node cancel cascade observes `cancelled` within
2855/// one or two polls; slack enough that a `WaitIndefinite` caller does
2856/// not hammer `describe_flow` on a live cluster.
2857const CANCEL_WAIT_POLL_INTERVAL: Duration = Duration::from_millis(100);
2858
2859/// Defensive ceiling for [`CancelFlowWait::WaitIndefinite`] — if the
2860/// reconciler cascade has not converged in five minutes, something is
2861/// wedged and returning `Timeout` is strictly more useful than blocking
2862/// forever. RFC-012 §3.1.1 expects real-world cascades to finish within
2863/// `reconciler_interval + grace`, which is orders of magnitude below
2864/// this.
2865const CANCEL_WAIT_INDEFINITE_CEILING: Duration = Duration::from_secs(300);
2866
2867/// Poll `backend.describe_flow(flow_id)` until `public_flow_state` is
2868/// `"cancelled"` or `deadline` elapses.
2869///
2870/// Shared by every backend's `cancel_flow` trait impl that honours
2871/// [`CancelFlowWait::WaitTimeout`] / [`CancelFlowWait::WaitIndefinite`].
2872/// The underlying `cancel_flow` FCALL / SQL transaction flips the
2873/// flow-level state synchronously; member cancellations dispatch
2874/// asynchronously via the reconciler, which also flips
2875/// `public_flow_state` to `cancelled` once the cascade completes. This
2876/// helper waits for that terminal flip.
2877///
2878/// Returns:
2879/// * `Ok(())` once `public_flow_state = "cancelled"` is observed.
2880/// * `Err(EngineError::Timeout { op: "cancel_flow", elapsed })` when
2881///   `deadline` elapses first. `elapsed` is the wait budget (the
2882///   requested timeout), not wall-clock precision.
2883/// * `Err(e)` if `describe_flow` itself errors (propagated).
2884pub async fn wait_for_flow_cancellation<B: EngineBackend + ?Sized>(
2885    backend: &B,
2886    flow_id: &crate::types::FlowId,
2887    deadline: Duration,
2888) -> Result<(), EngineError> {
2889    let start = std::time::Instant::now();
2890    loop {
2891        match backend.describe_flow(flow_id).await? {
2892            Some(snap) if snap.public_flow_state == "cancelled" => return Ok(()),
2893            // `None` (flow removed) is also terminal from the caller's
2894            // perspective — nothing left to wait on.
2895            None => return Ok(()),
2896            Some(_) => {}
2897        }
2898        if start.elapsed() >= deadline {
2899            return Err(EngineError::Timeout {
2900                op: "cancel_flow",
2901                elapsed: deadline,
2902            });
2903        }
2904        tokio::time::sleep(CANCEL_WAIT_POLL_INTERVAL).await;
2905    }
2906}
2907
2908/// Convert a [`CancelFlowWait`] into the deadline passed to
2909/// [`wait_for_flow_cancellation`]. `NoWait` returns `None` — the caller
2910/// must skip the wait entirely.
2911pub fn cancel_flow_wait_deadline(wait: CancelFlowWait) -> Option<Duration> {
2912    // `CancelFlowWait` is `#[non_exhaustive]`; this match lives in the
2913    // defining crate so the exhaustiveness check keeps the compiler
2914    // honest. Future variants must be wired here explicitly.
2915    match wait {
2916        CancelFlowWait::NoWait => None,
2917        CancelFlowWait::WaitTimeout(d) => Some(d),
2918        CancelFlowWait::WaitIndefinite => Some(CANCEL_WAIT_INDEFINITE_CEILING),
2919    }
2920}
2921
2922/// Validate a caller-namespaced tag key against the regex
2923/// `^[a-z][a-z0-9_]*\.[a-z0-9_][a-z0-9_.]*$`.
2924///
2925/// The Rust trait-side check is **stricter than the Valkey Lua
2926/// contracts** (`ff_set_execution_tags` / `ff_set_flow_tags`), which
2927/// only check `^[a-z][a-z0-9_]*%.[^.]` — namespace prefix + first
2928/// suffix char, with the rest of the suffix unvalidated. Every
2929/// backend-side impl (`ff-backend-{valkey,postgres,sqlite}`) calls
2930/// this helper **before** the wire hop so the effective parity
2931/// contract is this full-key regex; Valkey's Lua is an additional,
2932/// more permissive server-side guard, not the parity-of-record.
2933///
2934/// A key passes iff:
2935///
2936/// * it begins with an ASCII lowercase letter;
2937/// * all characters up to the first `.` are lowercase alnum or `_`;
2938/// * the first `.` is followed by at least one non-`.` character
2939///   (so `cairn.` and `cairn..x` fail — the `<field>` must be non-empty).
2940///
2941/// Shared entry point for [`EngineBackend::set_execution_tag`] /
2942/// [`EngineBackend::set_flow_tag`] / [`EngineBackend::get_execution_tag`] /
2943/// [`EngineBackend::get_flow_tag`] so every backend rejects the same
2944/// keyspace. On rejection, returns
2945/// [`EngineError::Validation { kind: ValidationKind::InvalidInput, .. }`](crate::engine_error::EngineError::Validation)
2946/// with the offending key in `detail`.
2947///
2948/// `#[allow(clippy::result_large_err)]` — `EngineError` is the uniform
2949/// error across the whole `EngineBackend` trait surface (see trait method
2950/// signatures above); boxing it here alone would introduce a
2951/// gratuitous signature deviation. Clippy 1.95 flags free functions but
2952/// not trait methods; this function mirrors the trait-method convention.
2953#[allow(clippy::result_large_err)]
2954pub fn validate_tag_key(key: &str) -> Result<(), EngineError> {
2955    use crate::engine_error::ValidationKind;
2956
2957    let bad = || EngineError::Validation {
2958        kind: ValidationKind::InvalidInput,
2959        detail: format!(
2960            "invalid tag key: {key:?} (must match ^[a-z][a-z0-9_]*\\.[a-z0-9_][a-z0-9_.]*$)"
2961        ),
2962    };
2963
2964    let mut chars = key.chars();
2965    let first = chars.next().ok_or_else(bad)?;
2966    if !first.is_ascii_lowercase() {
2967        return Err(bad());
2968    }
2969    // Phase 1: consume the namespace prefix up to (but not including) the first `.`.
2970    // Chars must be `[a-z0-9_]`.
2971    let mut saw_dot = false;
2972    for c in chars.by_ref() {
2973        if c == '.' {
2974            saw_dot = true;
2975            break;
2976        }
2977        if !(c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_') {
2978            return Err(bad());
2979        }
2980    }
2981    if !saw_dot {
2982        return Err(bad());
2983    }
2984    // Phase 2: the first char after the first `.` must exist and be a
2985    // non-dot member char `[a-z0-9_]` (matches Valkey's Lua regex
2986    // `^[a-z][a-z0-9_]*%.[^.]` — a second consecutive dot is rejected).
2987    let second = chars.next().ok_or_else(bad)?;
2988    if !(second.is_ascii_lowercase() || second.is_ascii_digit() || second == '_') {
2989        return Err(bad());
2990    }
2991    // Phase 3 (Finding 2 tightening): every remaining char MUST be
2992    // `[a-z0-9_.]`. Before this, the suffix was unvalidated beyond
2993    // its first char — so `cairn.foo bar`, `cairn.Foo`,
2994    // `cairn.foo"bar`, `cairn.foo-bar` all passed trait-side validation
2995    // even though they would break SQLite JSON-path quoting, the Lua
2996    // HSET field-name conventions, or consumer grep patterns. Valkey's
2997    // Lua prefix regex is identically permissive on the suffix; this
2998    // tightens both layers from the Rust side. Dots in the suffix
2999    // remain legal (`app.sub.field` is valid) to preserve the existing
3000    // accepted shape.
3001    for c in chars {
3002        if !(c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_' || c == '.') {
3003            return Err(bad());
3004        }
3005    }
3006    Ok(())
3007}
3008
3009#[cfg(test)]
3010mod tests {
3011    use super::*;
3012
3013    /// A zero-state backend stub used to exercise the default
3014    /// `capabilities()` impl without pulling in a real
3015    /// transport. Only the default method is under test here; every
3016    /// other method is unreachable on this type.
3017    struct DefaultBackend;
3018
3019    #[async_trait]
3020    impl EngineBackend for DefaultBackend {
3021        async fn claim(
3022            &self,
3023            _lane: &LaneId,
3024            _capabilities: &CapabilitySet,
3025            _policy: ClaimPolicy,
3026        ) -> Result<Option<Handle>, EngineError> {
3027            unreachable!()
3028        }
3029        async fn renew(&self, _handle: &Handle) -> Result<LeaseRenewal, EngineError> {
3030            unreachable!()
3031        }
3032        async fn progress(
3033            &self,
3034            _handle: &Handle,
3035            _percent: Option<u8>,
3036            _message: Option<String>,
3037        ) -> Result<(), EngineError> {
3038            unreachable!()
3039        }
3040        async fn append_frame(
3041            &self,
3042            _handle: &Handle,
3043            _frame: Frame,
3044        ) -> Result<AppendFrameOutcome, EngineError> {
3045            unreachable!()
3046        }
3047        async fn complete(
3048            &self,
3049            _handle: &Handle,
3050            _payload: Option<Vec<u8>>,
3051        ) -> Result<(), EngineError> {
3052            unreachable!()
3053        }
3054        async fn fail(
3055            &self,
3056            _handle: &Handle,
3057            _reason: FailureReason,
3058            _classification: FailureClass,
3059        ) -> Result<FailOutcome, EngineError> {
3060            unreachable!()
3061        }
3062        async fn cancel(&self, _handle: &Handle, _reason: &str) -> Result<(), EngineError> {
3063            unreachable!()
3064        }
3065        async fn suspend(
3066            &self,
3067            _handle: &Handle,
3068            _args: SuspendArgs,
3069        ) -> Result<SuspendOutcome, EngineError> {
3070            unreachable!()
3071        }
3072        async fn create_waitpoint(
3073            &self,
3074            _handle: &Handle,
3075            _waitpoint_key: &str,
3076            _expires_in: Duration,
3077        ) -> Result<PendingWaitpoint, EngineError> {
3078            unreachable!()
3079        }
3080        async fn observe_signals(
3081            &self,
3082            _handle: &Handle,
3083        ) -> Result<Vec<ResumeSignal>, EngineError> {
3084            unreachable!()
3085        }
3086        async fn claim_from_resume_grant(
3087            &self,
3088            _token: ResumeToken,
3089        ) -> Result<Option<Handle>, EngineError> {
3090            unreachable!()
3091        }
3092        async fn delay(
3093            &self,
3094            _handle: &Handle,
3095            _delay_until: TimestampMs,
3096        ) -> Result<(), EngineError> {
3097            unreachable!()
3098        }
3099        async fn wait_children(&self, _handle: &Handle) -> Result<(), EngineError> {
3100            unreachable!()
3101        }
3102        async fn describe_execution(
3103            &self,
3104            _id: &ExecutionId,
3105        ) -> Result<Option<ExecutionSnapshot>, EngineError> {
3106            unreachable!()
3107        }
3108        async fn read_execution_context(
3109            &self,
3110            _execution_id: &ExecutionId,
3111        ) -> Result<ExecutionContext, EngineError> {
3112            Ok(ExecutionContext::new(
3113                Vec::new(),
3114                String::new(),
3115                std::collections::HashMap::new(),
3116            ))
3117        }
3118        async fn read_current_attempt_index(
3119            &self,
3120            _execution_id: &ExecutionId,
3121        ) -> Result<AttemptIndex, EngineError> {
3122            Ok(AttemptIndex::new(0))
3123        }
3124        async fn read_total_attempt_count(
3125            &self,
3126            _execution_id: &ExecutionId,
3127        ) -> Result<AttemptIndex, EngineError> {
3128            Ok(AttemptIndex::new(0))
3129        }
3130        async fn describe_flow(
3131            &self,
3132            _id: &FlowId,
3133        ) -> Result<Option<FlowSnapshot>, EngineError> {
3134            unreachable!()
3135        }
3136        #[cfg(feature = "core")]
3137        async fn list_edges(
3138            &self,
3139            _flow_id: &FlowId,
3140            _direction: EdgeDirection,
3141        ) -> Result<Vec<EdgeSnapshot>, EngineError> {
3142            unreachable!()
3143        }
3144        #[cfg(feature = "core")]
3145        async fn describe_edge(
3146            &self,
3147            _flow_id: &FlowId,
3148            _edge_id: &EdgeId,
3149        ) -> Result<Option<EdgeSnapshot>, EngineError> {
3150            unreachable!()
3151        }
3152        #[cfg(feature = "core")]
3153        async fn resolve_execution_flow_id(
3154            &self,
3155            _eid: &ExecutionId,
3156        ) -> Result<Option<FlowId>, EngineError> {
3157            unreachable!()
3158        }
3159        #[cfg(feature = "core")]
3160        async fn list_flows(
3161            &self,
3162            _partition: PartitionKey,
3163            _cursor: Option<FlowId>,
3164            _limit: usize,
3165        ) -> Result<ListFlowsPage, EngineError> {
3166            unreachable!()
3167        }
3168        #[cfg(feature = "core")]
3169        async fn list_lanes(
3170            &self,
3171            _cursor: Option<LaneId>,
3172            _limit: usize,
3173        ) -> Result<ListLanesPage, EngineError> {
3174            unreachable!()
3175        }
3176        #[cfg(feature = "core")]
3177        async fn list_suspended(
3178            &self,
3179            _partition: PartitionKey,
3180            _cursor: Option<ExecutionId>,
3181            _limit: usize,
3182        ) -> Result<ListSuspendedPage, EngineError> {
3183            unreachable!()
3184        }
3185        #[cfg(feature = "core")]
3186        async fn list_executions(
3187            &self,
3188            _partition: PartitionKey,
3189            _cursor: Option<ExecutionId>,
3190            _limit: usize,
3191        ) -> Result<ListExecutionsPage, EngineError> {
3192            unreachable!()
3193        }
3194        #[cfg(feature = "core")]
3195        async fn deliver_signal(
3196            &self,
3197            _args: DeliverSignalArgs,
3198        ) -> Result<DeliverSignalResult, EngineError> {
3199            unreachable!()
3200        }
3201        #[cfg(feature = "core")]
3202        async fn claim_resumed_execution(
3203            &self,
3204            _args: ClaimResumedExecutionArgs,
3205        ) -> Result<ClaimResumedExecutionResult, EngineError> {
3206            unreachable!()
3207        }
3208        async fn cancel_flow(
3209            &self,
3210            _id: &FlowId,
3211            _policy: CancelFlowPolicy,
3212            _wait: CancelFlowWait,
3213        ) -> Result<CancelFlowResult, EngineError> {
3214            unreachable!()
3215        }
3216        #[cfg(feature = "core")]
3217        async fn set_edge_group_policy(
3218            &self,
3219            _flow_id: &FlowId,
3220            _downstream_execution_id: &ExecutionId,
3221            _policy: crate::contracts::EdgeDependencyPolicy,
3222        ) -> Result<crate::contracts::SetEdgeGroupPolicyResult, EngineError> {
3223            unreachable!()
3224        }
3225        async fn report_usage(
3226            &self,
3227            _handle: &Handle,
3228            _budget: &BudgetId,
3229            _dimensions: crate::backend::UsageDimensions,
3230        ) -> Result<ReportUsageResult, EngineError> {
3231            unreachable!()
3232        }
3233        #[cfg(feature = "streaming")]
3234        async fn read_stream(
3235            &self,
3236            _execution_id: &ExecutionId,
3237            _attempt_index: AttemptIndex,
3238            _from: StreamCursor,
3239            _to: StreamCursor,
3240            _count_limit: u64,
3241        ) -> Result<StreamFrames, EngineError> {
3242            unreachable!()
3243        }
3244        #[cfg(feature = "streaming")]
3245        async fn tail_stream(
3246            &self,
3247            _execution_id: &ExecutionId,
3248            _attempt_index: AttemptIndex,
3249            _after: StreamCursor,
3250            _block_ms: u64,
3251            _count_limit: u64,
3252            _visibility: TailVisibility,
3253        ) -> Result<StreamFrames, EngineError> {
3254            unreachable!()
3255        }
3256        #[cfg(feature = "streaming")]
3257        async fn read_summary(
3258            &self,
3259            _execution_id: &ExecutionId,
3260            _attempt_index: AttemptIndex,
3261        ) -> Result<Option<SummaryDocument>, EngineError> {
3262            unreachable!()
3263        }
3264    }
3265
3266    /// The default `capabilities()` impl returns a value tagged
3267    /// `family = "unknown"` with every `supports.*` bool false, so
3268    /// pre-RFC-018 out-of-tree backends keep compiling and consumers
3269    /// can distinguish "backend predates RFC-018" from "backend
3270    /// reports concrete bools." Every concrete in-tree backend
3271    /// overrides.
3272    #[test]
3273    fn default_capabilities_is_unknown_family_all_false() {
3274        let b = DefaultBackend;
3275        let caps = b.capabilities();
3276        assert_eq!(caps.identity.family, "unknown");
3277        assert_eq!(
3278            caps.identity.version,
3279            crate::capability::Version::new(0, 0, 0)
3280        );
3281        assert_eq!(caps.identity.rfc017_stage, "unknown");
3282        // Every field false on the default (matches `Supports::none()`).
3283        assert_eq!(caps.supports, crate::capability::Supports::none());
3284    }
3285
3286    // ── resolve_dependency (PR-7b Step 0) ──
3287
3288    /// Default impl returns `Unavailable { op: "resolve_dependency" }`
3289    /// so pre-migration backends surface a typed Unsupported-grade
3290    /// error rather than panic. Both cluster 2's dependency_reconciler
3291    /// and cluster 4's completion_listener route through this method;
3292    /// the default is the safety net for non-Valkey deployments.
3293    #[cfg(feature = "core")]
3294    #[tokio::test]
3295    async fn default_resolve_dependency_is_unavailable() {
3296        use crate::contracts::ResolveDependencyArgs;
3297        use crate::partition::{Partition, PartitionFamily};
3298        use crate::types::{AttemptIndex, EdgeId, FlowId, LaneId};
3299
3300        let b = DefaultBackend;
3301        let partition = Partition {
3302            family: PartitionFamily::Flow,
3303            index: 0,
3304        };
3305        let args = ResolveDependencyArgs::new(
3306            partition,
3307            FlowId::parse("11111111-1111-1111-1111-111111111111").unwrap(),
3308            ExecutionId::parse("{fp:0}:22222222-2222-2222-2222-222222222222").unwrap(),
3309            ExecutionId::parse("{fp:0}:33333333-3333-3333-3333-333333333333").unwrap(),
3310            EdgeId::parse("44444444-4444-4444-4444-444444444444").unwrap(),
3311            LaneId::new("default"),
3312            AttemptIndex::new(0),
3313            "success".to_owned(),
3314            TimestampMs::now(),
3315        );
3316        match b.resolve_dependency(args).await {
3317            Err(EngineError::Unavailable { op }) => {
3318                assert_eq!(op, "resolve_dependency");
3319            }
3320            other => panic!("expected Unavailable, got {other:?}"),
3321        }
3322    }
3323
3324    // ── cascade_completion (PR-7b Cluster 4) ──
3325
3326    /// Default impl returns `Unavailable { op: "cascade_completion" }`
3327    /// so pre-migration / non-core builds surface a typed error rather
3328    /// than panic. The push-based completion listener routes through
3329    /// this method; the default is the safety net for deployments
3330    /// whose backend doesn't cascade (e.g. SQLite in the current
3331    /// Wave 9 scope).
3332    #[cfg(feature = "core")]
3333    #[tokio::test]
3334    async fn default_cascade_completion_is_unavailable() {
3335        use crate::backend::CompletionPayload;
3336
3337        let b = DefaultBackend;
3338        let eid = ExecutionId::parse(
3339            "{fp:0}:66666666-6666-6666-6666-666666666666",
3340        )
3341        .unwrap();
3342        let payload = CompletionPayload::new(eid, "success", None, TimestampMs::now());
3343        match b.cascade_completion(&payload).await {
3344            Err(EngineError::Unavailable { op }) => {
3345                assert_eq!(op, "cascade_completion");
3346            }
3347            other => panic!("expected Unavailable, got {other:?}"),
3348        }
3349    }
3350
3351    // ── unblock_execution (PR-7b Cluster 2) ──
3352
3353    /// Default impl returns `Unavailable { op: "unblock_execution" }`
3354    /// so non-Valkey deployments get a typed `Unavailable` rather than
3355    /// a panic. The engine scanner loop on PG/SQLite skips this path
3356    /// entirely (scheduler eligibility is re-evaluated live via SQL
3357    /// predicates), but a stray direct call must still fail gracefully.
3358    #[cfg(feature = "core")]
3359    #[tokio::test]
3360    async fn default_unblock_execution_is_unavailable() {
3361        use crate::partition::{Partition, PartitionFamily};
3362
3363        let b = DefaultBackend;
3364        let partition = Partition {
3365            family: PartitionFamily::Execution,
3366            index: 0,
3367        };
3368        let eid = ExecutionId::parse(
3369            "{fp:0}:55555555-5555-5555-5555-555555555555",
3370        )
3371        .unwrap();
3372        let lane = LaneId::new("default");
3373        match b
3374            .unblock_execution(
3375                partition,
3376                &lane,
3377                &eid,
3378                "waiting_for_budget",
3379                TimestampMs::from_millis(0),
3380            )
3381            .await
3382        {
3383            Err(EngineError::Unavailable { op }) => {
3384                assert_eq!(op, "unblock_execution");
3385            }
3386            other => panic!("expected Unavailable, got {other:?}"),
3387        }
3388    }
3389
3390    // ── project_flow_summary (PR-7b Cluster 2b-B) ──
3391
3392    /// Default impl returns `Unavailable { op: "project_flow_summary" }`
3393    /// so non-Valkey deployments get a typed `Unavailable` rather than
3394    /// a panic. SQLite rides the default per RFC-023 Phase 3.5.
3395    #[cfg(feature = "core")]
3396    #[tokio::test]
3397    async fn default_project_flow_summary_is_unavailable() {
3398        use crate::partition::{Partition, PartitionFamily};
3399        use crate::types::FlowId;
3400
3401        let b = DefaultBackend;
3402        let partition = Partition {
3403            family: PartitionFamily::Flow,
3404            index: 0,
3405        };
3406        let fid = FlowId::parse("11111111-1111-1111-1111-111111111111").unwrap();
3407        match b
3408            .project_flow_summary(partition, &fid, TimestampMs::from_millis(0))
3409            .await
3410        {
3411            Err(EngineError::Unavailable { op }) => {
3412                assert_eq!(op, "project_flow_summary");
3413            }
3414            other => panic!("expected Unavailable, got {other:?}"),
3415        }
3416    }
3417
3418    // ── trim_retention (PR-7b Cluster 2b-B) ──
3419
3420    /// Default impl returns `Unavailable { op: "trim_retention" }` so
3421    /// non-Valkey deployments get a typed `Unavailable` rather than a
3422    /// panic. SQLite rides the default per RFC-023 Phase 3.5.
3423    #[cfg(feature = "core")]
3424    #[tokio::test]
3425    async fn default_trim_retention_is_unavailable() {
3426        use crate::partition::{Partition, PartitionFamily};
3427
3428        let b = DefaultBackend;
3429        let partition = Partition {
3430            family: PartitionFamily::Execution,
3431            index: 0,
3432        };
3433        let lane = LaneId::new("default");
3434        match b
3435            .trim_retention(
3436                partition,
3437                &lane,
3438                60_000,
3439                TimestampMs::from_millis(0),
3440                20,
3441                &crate::backend::ScannerFilter::NOOP,
3442            )
3443            .await
3444        {
3445            Err(EngineError::Unavailable { op }) => {
3446                assert_eq!(op, "trim_retention");
3447            }
3448            other => panic!("expected Unavailable, got {other:?}"),
3449        }
3450    }
3451
3452    // ── read_exec_core_fields (PR-7b Wave 0a) ──
3453
3454    /// Default impl returns `Unavailable { op: "read_exec_core_fields" }`
3455    /// so out-of-tree backends that pre-date v0.13 keep compiling while
3456    /// surfacing a typed error on call. Empty-field input short-circuits
3457    /// to `Ok(empty map)` in all in-tree impls; the default is triggered
3458    /// only when the trait method itself is un-overridden.
3459    #[cfg(feature = "core")]
3460    #[tokio::test]
3461    async fn default_read_exec_core_fields_is_unavailable() {
3462        use crate::partition::{Partition, PartitionFamily};
3463
3464        let b = DefaultBackend;
3465        let partition = Partition {
3466            family: PartitionFamily::Execution,
3467            index: 0,
3468        };
3469        let eid = ExecutionId::parse(
3470            "{fp:0}:66666666-6666-6666-6666-666666666666",
3471        )
3472        .unwrap();
3473        match b
3474            .read_exec_core_fields(partition, &eid, &["lane_id"])
3475            .await
3476        {
3477            Err(EngineError::Unavailable { op }) => {
3478                assert_eq!(op, "read_exec_core_fields");
3479            }
3480            other => panic!("expected Unavailable, got {other:?}"),
3481        }
3482    }
3483
3484    // ── validate_tag_key (issue #433) ──
3485
3486    #[test]
3487    fn validate_tag_key_accepts_valid() {
3488        for k in [
3489            "cairn.session_id",
3490            "cairn.project",
3491            "a.b",
3492            "a1_2.x",
3493            "app.sub.field",
3494            "x.y_z",
3495        ] {
3496            validate_tag_key(k).unwrap_or_else(|e| panic!("{k:?} should pass: {e:?}"));
3497        }
3498    }
3499
3500    #[test]
3501    fn validate_tag_key_rejects_invalid() {
3502        for k in [
3503            "",                // empty
3504            "Cairn.x",         // uppercase first
3505            "1cairn.x",        // leading digit
3506            "cairn",           // no dot
3507            "cairn.",          // empty suffix
3508            "cairn..x",        // dot immediately after first dot
3509            ".cairn",          // leading dot
3510            "cair n.x",        // space before dot
3511            "ca-irn.x",        // hyphen in prefix
3512            // Finding 2 tightening — suffix now fully validated.
3513            "cairn.Foo",       // uppercase in suffix
3514            "cairn.foo bar",   // space in suffix
3515            "cairn.foo\"bar",  // double-quote in suffix (would break SQLite JSON-path quoting)
3516            "cairn.foo-bar",   // hyphen in suffix
3517            "cairn.foo\\bar",  // backslash in suffix
3518        ] {
3519            let err = validate_tag_key(k)
3520                .err()
3521                .unwrap_or_else(|| panic!("{k:?} should fail"));
3522            match err {
3523                EngineError::Validation {
3524                    kind: crate::engine_error::ValidationKind::InvalidInput,
3525                    ..
3526                } => {}
3527                other => panic!("{k:?}: unexpected err {other:?}"),
3528            }
3529        }
3530    }
3531
3532    // ── cairn #389: service-layer FCALL trait-method defaults ──
3533    //
3534    // Each method MUST return `EngineError::Unavailable { op: "<name>" }`
3535    // on backends that haven't overridden it, so out-of-tree backends
3536    // keep compiling and consumers get a terminal-classified error
3537    // rather than a panic. Mirrors the precedent used by
3538    // `issue_reclaim_grant` / `reclaim_execution` / `suspend_by_triple`.
3539    //
3540    // Feature-gated on `core` because the methods under test only
3541    // exist on the trait under that feature — matches the precedent
3542    // of `default_resolve_dependency_is_unavailable` above.
3543
3544    #[cfg(feature = "core")]
3545    #[tokio::test]
3546    async fn default_complete_execution_is_unavailable() {
3547        use crate::contracts::CompleteExecutionArgs;
3548        use crate::types::{ExecutionId, FlowId};
3549        let b = DefaultBackend;
3550        let config = crate::partition::PartitionConfig::default();
3551        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3552        let args = CompleteExecutionArgs {
3553            execution_id: eid,
3554            fence: None,
3555            attempt_index: AttemptIndex::new(0),
3556            result_payload: None,
3557            result_encoding: None,
3558            source: crate::types::CancelSource::default(),
3559            now: TimestampMs::from_millis(0),
3560        };
3561        match b.complete_execution(args).await.unwrap_err() {
3562            EngineError::Unavailable { op } => assert_eq!(op, "complete_execution"),
3563            other => panic!("expected Unavailable, got {other:?}"),
3564        }
3565    }
3566
3567    #[cfg(feature = "core")]
3568    #[tokio::test]
3569    async fn default_fail_execution_is_unavailable() {
3570        use crate::contracts::FailExecutionArgs;
3571        use crate::types::{ExecutionId, FlowId};
3572        let b = DefaultBackend;
3573        let config = crate::partition::PartitionConfig::default();
3574        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3575        let args = FailExecutionArgs {
3576            execution_id: eid,
3577            fence: None,
3578            attempt_index: AttemptIndex::new(0),
3579            failure_reason: String::new(),
3580            failure_category: String::new(),
3581            retry_policy_json: String::new(),
3582            next_attempt_policy_json: String::new(),
3583            source: crate::types::CancelSource::default(),
3584        };
3585        match b.fail_execution(args).await.unwrap_err() {
3586            EngineError::Unavailable { op } => assert_eq!(op, "fail_execution"),
3587            other => panic!("expected Unavailable, got {other:?}"),
3588        }
3589    }
3590
3591    #[cfg(feature = "core")]
3592    #[tokio::test]
3593    async fn default_renew_lease_is_unavailable() {
3594        use crate::contracts::RenewLeaseArgs;
3595        use crate::types::{ExecutionId, FlowId};
3596        let b = DefaultBackend;
3597        let config = crate::partition::PartitionConfig::default();
3598        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3599        let args = RenewLeaseArgs {
3600            execution_id: eid,
3601            attempt_index: AttemptIndex::new(0),
3602            fence: None,
3603            lease_ttl_ms: 1_000,
3604            lease_history_grace_ms: 60_000,
3605        };
3606        match b.renew_lease(args).await.unwrap_err() {
3607            EngineError::Unavailable { op } => assert_eq!(op, "renew_lease"),
3608            other => panic!("expected Unavailable, got {other:?}"),
3609        }
3610    }
3611
3612    #[cfg(feature = "core")]
3613    #[tokio::test]
3614    async fn default_resume_execution_is_unavailable() {
3615        use crate::contracts::ResumeExecutionArgs;
3616        use crate::types::{ExecutionId, FlowId};
3617        let b = DefaultBackend;
3618        let config = crate::partition::PartitionConfig::default();
3619        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3620        let args = ResumeExecutionArgs {
3621            execution_id: eid,
3622            trigger_type: "signal".to_owned(),
3623            resume_delay_ms: 0,
3624        };
3625        match b.resume_execution(args).await.unwrap_err() {
3626            EngineError::Unavailable { op } => assert_eq!(op, "resume_execution"),
3627            other => panic!("expected Unavailable, got {other:?}"),
3628        }
3629    }
3630
3631    #[cfg(feature = "core")]
3632    #[tokio::test]
3633    async fn default_check_admission_is_unavailable() {
3634        use crate::contracts::CheckAdmissionArgs;
3635        use crate::types::{ExecutionId, FlowId, QuotaPolicyId};
3636        let b = DefaultBackend;
3637        let config = crate::partition::PartitionConfig::default();
3638        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3639        let args = CheckAdmissionArgs {
3640            execution_id: eid,
3641            now: TimestampMs::from_millis(0),
3642            window_seconds: 60,
3643            rate_limit: 10,
3644            concurrency_cap: 1,
3645            jitter_ms: None,
3646        };
3647        let qid = QuotaPolicyId::new();
3648        match b
3649            .check_admission(&qid, "default", args)
3650            .await
3651            .unwrap_err()
3652        {
3653            EngineError::Unavailable { op } => assert_eq!(op, "check_admission"),
3654            other => panic!("expected Unavailable, got {other:?}"),
3655        }
3656    }
3657
3658    #[cfg(feature = "core")]
3659    #[tokio::test]
3660    async fn default_evaluate_flow_eligibility_is_unavailable() {
3661        use crate::contracts::EvaluateFlowEligibilityArgs;
3662        use crate::types::{ExecutionId, FlowId};
3663        let b = DefaultBackend;
3664        let config = crate::partition::PartitionConfig::default();
3665        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3666        let args = EvaluateFlowEligibilityArgs { execution_id: eid };
3667        match b.evaluate_flow_eligibility(args).await.unwrap_err() {
3668            EngineError::Unavailable { op } => assert_eq!(op, "evaluate_flow_eligibility"),
3669            other => panic!("expected Unavailable, got {other:?}"),
3670        }
3671    }
3672
3673    // ── Cluster 2b-A tally-recompute reconcilers ──
3674
3675    #[cfg(feature = "core")]
3676    #[tokio::test]
3677    async fn default_reconcile_execution_index_is_unavailable() {
3678        use crate::backend::ScannerFilter;
3679        use crate::partition::{Partition, PartitionFamily};
3680        let b = DefaultBackend;
3681        let partition = Partition { family: PartitionFamily::Execution, index: 0 };
3682        let lanes = [LaneId::new("default")];
3683        let filter = ScannerFilter::default();
3684        match b
3685            .reconcile_execution_index(partition, &lanes, &filter)
3686            .await
3687            .unwrap_err()
3688        {
3689            EngineError::Unavailable { op } => assert_eq!(op, "reconcile_execution_index"),
3690            other => panic!("expected Unavailable, got {other:?}"),
3691        }
3692    }
3693
3694    #[cfg(feature = "core")]
3695    #[tokio::test]
3696    async fn default_reconcile_budget_counters_is_unavailable() {
3697        use crate::partition::{Partition, PartitionFamily};
3698        let b = DefaultBackend;
3699        let partition = Partition { family: PartitionFamily::Budget, index: 0 };
3700        match b
3701            .reconcile_budget_counters(partition, TimestampMs::from_millis(0))
3702            .await
3703            .unwrap_err()
3704        {
3705            EngineError::Unavailable { op } => assert_eq!(op, "reconcile_budget_counters"),
3706            other => panic!("expected Unavailable, got {other:?}"),
3707        }
3708    }
3709
3710    #[cfg(feature = "core")]
3711    #[tokio::test]
3712    async fn default_reconcile_quota_counters_is_unavailable() {
3713        use crate::partition::{Partition, PartitionFamily};
3714        let b = DefaultBackend;
3715        let partition = Partition { family: PartitionFamily::Quota, index: 0 };
3716        match b
3717            .reconcile_quota_counters(partition, TimestampMs::from_millis(0))
3718            .await
3719            .unwrap_err()
3720        {
3721            EngineError::Unavailable { op } => assert_eq!(op, "reconcile_quota_counters"),
3722            other => panic!("expected Unavailable, got {other:?}"),
3723        }
3724    }
3725
3726    // ── #454 trait-additions default-impl tests (4) ────────────
3727
3728    #[cfg(feature = "core")]
3729    #[tokio::test]
3730    async fn default_record_spend_is_unavailable() {
3731        use crate::contracts::RecordSpendArgs;
3732        use crate::types::{BudgetId, ExecutionId, FlowId};
3733        let b = DefaultBackend;
3734        let config = crate::partition::PartitionConfig::default();
3735        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3736        let args = RecordSpendArgs::new(
3737            BudgetId::new(),
3738            eid,
3739            std::collections::BTreeMap::new(),
3740            "k",
3741        );
3742        match b.record_spend(args).await.unwrap_err() {
3743            EngineError::Unavailable { op } => assert_eq!(op, "record_spend"),
3744            other => panic!("expected Unavailable, got {other:?}"),
3745        }
3746    }
3747
3748    #[cfg(feature = "core")]
3749    #[tokio::test]
3750    async fn default_release_budget_is_unavailable() {
3751        use crate::contracts::ReleaseBudgetArgs;
3752        use crate::types::{BudgetId, ExecutionId, FlowId};
3753        let b = DefaultBackend;
3754        let config = crate::partition::PartitionConfig::default();
3755        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3756        let args = ReleaseBudgetArgs::new(BudgetId::new(), eid);
3757        match b.release_budget(args).await.unwrap_err() {
3758            EngineError::Unavailable { op } => assert_eq!(op, "release_budget"),
3759            other => panic!("expected Unavailable, got {other:?}"),
3760        }
3761    }
3762
3763    #[cfg(feature = "core")]
3764    #[tokio::test]
3765    async fn default_deliver_approval_signal_is_unavailable() {
3766        use crate::contracts::DeliverApprovalSignalArgs;
3767        use crate::types::{ExecutionId, FlowId, LaneId, WaitpointId};
3768        let b = DefaultBackend;
3769        let config = crate::partition::PartitionConfig::default();
3770        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3771        let args = DeliverApprovalSignalArgs::new(
3772            eid,
3773            LaneId::new("default"),
3774            WaitpointId::new(),
3775            "approved",
3776            "sfx",
3777            1_000,
3778            Some(100),
3779            Some(10),
3780        );
3781        match b.deliver_approval_signal(args).await.unwrap_err() {
3782            EngineError::Unavailable { op } => assert_eq!(op, "deliver_approval_signal"),
3783            other => panic!("expected Unavailable, got {other:?}"),
3784        }
3785    }
3786
3787    #[cfg(feature = "core")]
3788    #[tokio::test]
3789    async fn default_issue_grant_and_claim_is_unavailable() {
3790        use crate::contracts::IssueGrantAndClaimArgs;
3791        use crate::types::{ExecutionId, FlowId, LaneId};
3792        let b = DefaultBackend;
3793        let config = crate::partition::PartitionConfig::default();
3794        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3795        let args = IssueGrantAndClaimArgs::new(eid, LaneId::new("default"), 30_000);
3796        match b.issue_grant_and_claim(args).await.unwrap_err() {
3797            EngineError::Unavailable { op } => assert_eq!(op, "issue_grant_and_claim"),
3798            other => panic!("expected Unavailable, got {other:?}"),
3799        }
3800    }
3801}