Skip to main content

ff_core/
engine_backend.rs

1//! The `EngineBackend` trait — abstracting FlowFabric's write surface.
2//!
3//! **RFC-012 Stage 1a:** this is the trait landing. The
4//! Valkey-backed impl lives in `ff-backend-valkey`; future backends
5//! (Postgres) add a sibling crate with their own impl. ff-sdk's
6//! `FlowFabricWorker` gains `connect_with(backend)` /
7//! `backend(&self)` accessors so consumers that want to bring their
8//! own backend (tests, future non-Valkey deployments) can hand one
9//! in. The hot-path migration of `ClaimedTask` / `FlowFabricWorker`
10//! to forward through the trait lands across Stages 1b-1d.
11//!
12//! # Object safety
13//!
14//! `EngineBackend` is object-safe: all methods are `async fn` behind
15//! `#[async_trait]` and take `&self`. Consumers can hold
16//! `Arc<dyn EngineBackend>` for heterogenous-backend deployments.
17//! The trait is `Send + Sync + 'static` per RFC-012 §4.1; every impl
18//! must honour that bound.
19//!
20//! # Error surface
21//!
22//! Every method returns [`Result<_, EngineError>`]. `EngineError`'s
23//! `Transport` variant carries a boxed `dyn Error + Send + Sync`;
24//! Valkey-backed transport faults box a
25//! `ff_script::error::ScriptError` (downcast via
26//! `ff_script::engine_error_ext::transport_script_ref`). Other
27//! backends box their native error type and set the `backend` tag
28//! accordingly.
29//!
30//! # Atomicity contract
31//!
32//! Per-op state transitions MUST be atomic (RFC-012 §3.4). On Valkey
33//! this is the single-FCALL-per-op property; on Postgres it is the
34//! per-transaction property. A backend that cannot honour atomicity
35//! for a given op either MUST NOT implement `EngineBackend` or MUST
36//! return `EngineError::Unavailable { op }` for the affected method.
37//!
38//! # Replay semantics
39//!
40//! `complete`, `fail`, `cancel`, `suspend`, `delay`, `wait_children`
41//! are idempotent under replay — calling twice with the same handle
42//! and args returns the same outcome (success on first call, typed
43//! `State` / `Contention` on subsequent calls where the fence triple
44//! no longer matches a live lease).
45
46use std::time::Duration;
47
48use async_trait::async_trait;
49
50use crate::backend::{
51    AppendFrameOutcome, CancelFlowPolicy, CancelFlowWait, CapabilitySet, ClaimPolicy,
52    FailOutcome, FailureClass, FailureReason, Frame, Handle, LeaseRenewal, PendingWaitpoint,
53    PrepareOutcome, ResumeSignal, ResumeToken,
54};
55// `SummaryDocument` and `TailVisibility` are referenced only inside
56// `#[cfg(feature = "streaming")]` trait methods below, so the imports
57// must be gated to avoid an unused-imports warning on the non-streaming
58// build.
59#[cfg(feature = "streaming")]
60use crate::backend::{SummaryDocument, TailVisibility};
61use crate::contracts::{
62    CancelFlowResult, ExecutionContext, ExecutionSnapshot, FlowSnapshot, IssueReclaimGrantArgs,
63    IssueReclaimGrantOutcome, ReclaimExecutionArgs, ReclaimExecutionOutcome, ReportUsageResult,
64    RotateWaitpointHmacSecretAllArgs, RotateWaitpointHmacSecretAllResult, SeedOutcome,
65    SeedWaitpointHmacSecretArgs, SuspendArgs, SuspendOutcome,
66};
67#[cfg(feature = "core")]
68use crate::contracts::{
69    AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
70    ApplyDependencyToChildResult, BudgetStatus, CancelExecutionArgs, CancelExecutionResult,
71    CancelFlowArgs, ChangePriorityArgs, ChangePriorityResult, ClaimExecutionArgs,
72    ClaimExecutionResult, ClaimForWorkerArgs, ClaimForWorkerOutcome, ClaimResumedExecutionArgs,
73    ClaimResumedExecutionResult,
74    BlockRouteArgs, BlockRouteOutcome, CheckAdmissionArgs, CheckAdmissionResult,
75    ClaimGrantOutcome,
76    CompleteExecutionArgs, CompleteExecutionResult, CreateBudgetArgs, CreateBudgetResult,
77    CreateExecutionArgs, CreateExecutionResult, CreateFlowArgs, CreateFlowResult,
78    CreateQuotaPolicyArgs, CreateQuotaPolicyResult,
79    DeliverApprovalSignalArgs, DeliverSignalArgs, DeliverSignalResult, EdgeDirection, EdgeSnapshot,
80    EvaluateFlowEligibilityArgs, EvaluateFlowEligibilityResult, ExecutionInfo,
81    FailExecutionArgs, FailExecutionResult,
82    IssueClaimGrantArgs, IssueClaimGrantOutcome, IssueGrantAndClaimArgs,
83    RecordSpendArgs, ReleaseBudgetArgs, ScanEligibleArgs,
84    ListExecutionsPage, ListFlowsPage, ListLanesPage, ListPendingWaitpointsArgs,
85    ListPendingWaitpointsResult, ListSuspendedPage, RenewLeaseArgs, RenewLeaseResult,
86    ReplayExecutionArgs, ReplayExecutionResult,
87    ReportUsageAdminArgs, ResetBudgetArgs, ResetBudgetResult, ResumeExecutionArgs,
88    ResumeExecutionResult, RevokeLeaseArgs, RevokeLeaseResult,
89    StageDependencyEdgeArgs, StageDependencyEdgeResult,
90};
91// RFC-025 worker registry.
92#[cfg(feature = "core")]
93use crate::contracts::{
94    HeartbeatWorkerArgs, HeartbeatWorkerOutcome, ListWorkersArgs, ListWorkersResult,
95    MarkWorkerDeadArgs, MarkWorkerDeadOutcome, RegisterWorkerArgs, RegisterWorkerOutcome,
96};
97#[cfg(feature = "suspension")]
98use crate::contracts::{ListExpiredLeasesArgs, ListExpiredLeasesResult};
99#[cfg(feature = "core")]
100use crate::state::PublicState;
101#[cfg(feature = "core")]
102use crate::partition::PartitionKey;
103#[cfg(feature = "streaming")]
104use crate::contracts::{StreamCursor, StreamFrames};
105use crate::engine_error::EngineError;
106#[cfg(feature = "core")]
107use crate::types::EdgeId;
108#[cfg(feature = "core")]
109use crate::types::WaitpointId;
110use crate::types::{AttemptIndex, BudgetId, ExecutionId, FlowId, LaneId, LeaseFence, TimestampMs};
111
112/// The engine write surface — a single trait a backend implementation
113/// honours to serve a `FlowFabricWorker`.
114///
115/// See RFC-012 §3.1 for the inventory rationale and §3.3 for the
116/// type-level shape. 16 methods (Round-7 added `create_waitpoint`;
117/// `append_frame` return widened; `report_usage` return replaced —
118/// RFC-012 §R7). Issue #150 added the two trigger-surface methods
119/// (`deliver_signal` / `claim_resumed_execution`).
120///
121/// # Note on `complete` payload shape
122///
123/// The RFC §3.3 sketch uses `Option<Bytes>`; the Stage 1a trait uses
124/// `Option<Vec<u8>>` to match the existing
125/// `ff_sdk::ClaimedTask::complete` signature and avoid adding a
126/// `bytes` public-type dep for zero consumer benefit. Round-4 §7.17
127/// resolved the payload container debate to `Box<[u8]>` in the
128/// public type (see `HandleOpaque`); `Option<Vec<u8>>` is the
129/// zero-churn choice consistent with today's code. Consumers that
130/// need `&[u8]` can borrow via `.as_deref()` on the Option.
131#[async_trait]
132pub trait EngineBackend: Send + Sync + 'static {
133    // ── Claim + lifecycle ──
134
135    /// Fresh-work claim. Returns `Ok(None)` when no work is currently
136    /// available; `Err` only on transport or input-validation faults.
137    async fn claim(
138        &self,
139        lane: &LaneId,
140        capabilities: &CapabilitySet,
141        policy: ClaimPolicy,
142    ) -> Result<Option<Handle>, EngineError>;
143
144    /// Renew a held lease. Returns the updated expiry + epoch on
145    /// success; typed `State::StaleLease` / `State::LeaseExpired`
146    /// when the lease has been stolen or timed out.
147    async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError>;
148
149    /// Numeric-progress heartbeat.
150    ///
151    /// Writes scalar `progress_percent` / `progress_message` fields on
152    /// `exec_core`; each call overwrites the previous value. This does
153    /// NOT append to the output stream — stream-frame producers must use
154    /// [`append_frame`](Self::append_frame) instead.
155    async fn progress(
156        &self,
157        handle: &Handle,
158        percent: Option<u8>,
159        message: Option<String>,
160    ) -> Result<(), EngineError>;
161
162    /// Append one stream frame. Distinct from [`progress`](Self::progress)
163    /// per RFC-012 §3.1.1 K#6. Returns the backend-assigned stream entry
164    /// id and post-append frame count (RFC-012 §R7.2.1).
165    ///
166    /// Stream-frame producers (arbitrary `frame_type` + payload, consumed
167    /// via the read/tail surfaces) MUST use this method rather than
168    /// [`progress`](Self::progress); the latter updates scalar fields on
169    /// `exec_core` and is invisible to stream consumers.
170    async fn append_frame(
171        &self,
172        handle: &Handle,
173        frame: Frame,
174    ) -> Result<AppendFrameOutcome, EngineError>;
175
176    /// Terminal success. Borrows `handle` (round-4 M-D2) so callers
177    /// can retry under `EngineError::Transport` without losing the
178    /// cookie. Payload is `Option<Vec<u8>>` per the note above.
179    async fn complete(&self, handle: &Handle, payload: Option<Vec<u8>>) -> Result<(), EngineError>;
180
181    /// Terminal failure with classification. Returns [`FailOutcome`]
182    /// so the caller learns whether a retry was scheduled.
183    async fn fail(
184        &self,
185        handle: &Handle,
186        reason: FailureReason,
187        classification: FailureClass,
188    ) -> Result<FailOutcome, EngineError>;
189
190    /// Cooperative cancel by the worker holding the lease.
191    async fn cancel(&self, handle: &Handle, reason: &str) -> Result<(), EngineError>;
192
193    /// Suspend the execution awaiting a typed resume condition
194    /// (RFC-013 Stage 1d).
195    ///
196    /// Borrows `handle` (round-4 M-D2). Terminal-looking behaviour is
197    /// expressed through [`SuspendOutcome`]:
198    ///
199    /// * [`SuspendOutcome::Suspended`] — the pre-suspend handle is
200    ///   logically invalidated; the fresh `HandleKind::Suspended`
201    ///   handle inside the variant supersedes it. Runtime enforcement
202    ///   via the fence triple: subsequent ops against the stale handle
203    ///   surface as `Contention(LeaseConflict)`.
204    /// * [`SuspendOutcome::AlreadySatisfied`] — buffered signals on a
205    ///   pending waitpoint already matched the resume condition at
206    ///   suspension time. The lease is NOT released; the caller's
207    ///   pre-suspend handle remains valid.
208    ///
209    /// See RFC-013 §2 for the type shapes, §3 for the replay /
210    /// idempotency contract, §4 for the error taxonomy.
211    async fn suspend(
212        &self,
213        handle: &Handle,
214        args: SuspendArgs,
215    ) -> Result<SuspendOutcome, EngineError>;
216
217    /// Suspend by execution id + lease fence triple, for service-layer
218    /// callers that hold a run record / lease-claim descriptor but no
219    /// worker [`Handle`] (cairn issue #322).
220    ///
221    /// Semantics mirror [`Self::suspend`] exactly — the same
222    /// [`SuspendArgs`] validation, the same [`SuspendOutcome`]
223    /// lifecycle, the same RFC-013 §3 dedup / replay contract. The
224    /// only difference is the fencing source: instead of the
225    /// `(lease_id, lease_epoch, attempt_id)` fields embedded in a
226    /// `Handle`, the backend fences against the triple passed directly.
227    /// Attempt-index, lane, and worker-instance metadata that
228    /// [`Self::suspend`] reads from the handle payload are recovered
229    /// from the backend's authoritative execution record (Valkey:
230    /// `exec_core` HGETs; Postgres: `ff_attempt` row lookup).
231    ///
232    /// The default impl returns [`EngineError::Unavailable`] so
233    /// existing backend impls remain non-breaking. Production backends
234    /// (Valkey, Postgres) override.
235    async fn suspend_by_triple(
236        &self,
237        exec_id: ExecutionId,
238        triple: LeaseFence,
239        args: SuspendArgs,
240    ) -> Result<SuspendOutcome, EngineError> {
241        let _ = (exec_id, triple, args);
242        Err(EngineError::Unavailable {
243            op: "suspend_by_triple",
244        })
245    }
246
247    /// Issue a pending waitpoint for future signal delivery.
248    ///
249    /// Waitpoints have two states in the Valkey wire contract:
250    /// **pending** (token issued, not yet backing a suspension) and
251    /// **active** (bound to a suspension). This method creates a
252    /// waitpoint in the **pending** state. A later `suspend` call
253    /// transitions a pending waitpoint to active (see Lua
254    /// `use_pending_waitpoint` ARGV flag at
255    /// `flowfabric.lua:3603,3641,3690`) — or, if buffered signals
256    /// already satisfy its condition, the suspend call returns
257    /// `SuspendOutcome::AlreadySatisfied` and the waitpoint activates
258    /// without ever releasing the lease.
259    ///
260    /// Pending-waitpoint expiry is a first-class terminal error on
261    /// the wire (`PendingWaitpointExpired` at
262    /// `ff-script/src/error.rs:170,403-408`). The attempt retains its
263    /// lease while the waitpoint is pending; signals delivered to
264    /// this waitpoint are buffered server-side (RFC-012 §R7.2.2).
265    async fn create_waitpoint(
266        &self,
267        handle: &Handle,
268        waitpoint_key: &str,
269        expires_in: Duration,
270    ) -> Result<PendingWaitpoint, EngineError>;
271
272    /// Read the HMAC token stored on a waitpoint record, keyed by
273    /// `(partition, waitpoint_id)`.
274    ///
275    /// Returns `Ok(Some(token))` when the waitpoint exists and carries
276    /// a token, `Ok(None)` when the waitpoint does not exist or no
277    /// token field is present. A missing waitpoint is not an error —
278    /// signals can legitimately arrive after a waitpoint has been
279    /// consumed or expired, and the signal-bridge authenticates on the
280    /// presence of a matching token, not on the waitpoint's liveness.
281    ///
282    /// # Use case
283    ///
284    /// Control-plane signal delivery (cairn signal-bridge): at
285    /// signal-resume time the bridge reads the token off the
286    /// waitpoint hash / row to authenticate the resume request it
287    /// subsequently issues. Previously implemented as direct
288    /// `ferriskey::Client::hget(waitpoint_key, "waitpoint_token")` —
289    /// Valkey-only. This method routes the read through the trait so
290    /// the pattern works on Postgres + SQLite as well.
291    ///
292    /// # Per-backend shape
293    ///
294    /// * **Valkey** — `HGET ff:wp:<tag>:<waitpoint_id> waitpoint_token`
295    ///   on the waitpoint's partition. Empty string / missing field
296    ///   maps to `None`.
297    /// * **Postgres** — `SELECT token FROM ff_waitpoint_pending
298    ///   WHERE partition_key = $1 AND waitpoint_id = $2 LIMIT 1`.
299    ///   Row-absent → `None`; empty `token` → `None`.
300    /// * **SQLite** — same shape as Postgres.
301    ///
302    /// The `partition` argument is the opaque [`PartitionKey`]
303    /// produced by FlowFabric (typically extracted from the
304    /// `Handle` / `ResumeToken` the waitpoint was minted against).
305    ///
306    /// # Default impl
307    ///
308    /// Returns [`EngineError::Unavailable`] with
309    /// `op = "read_waitpoint_token"` so out-of-tree backends and
310    /// in-tree backends not yet overriding this method continue to
311    /// compile. Mirrors the precedent used by
312    /// [`Self::issue_reclaim_grant`] / [`Self::reclaim_execution`].
313    #[cfg(feature = "core")]
314    async fn read_waitpoint_token(
315        &self,
316        _partition: PartitionKey,
317        _waitpoint_id: &WaitpointId,
318    ) -> Result<Option<String>, EngineError> {
319        Err(EngineError::Unavailable {
320            op: "read_waitpoint_token",
321        })
322    }
323
324    /// Non-mutating observation of signals that satisfied the handle's
325    /// resume condition.
326    async fn observe_signals(&self, handle: &Handle) -> Result<Vec<ResumeSignal>, EngineError>;
327
328    /// Consume a resume grant (via [`ResumeToken`]) to mint a
329    /// resumed-kind handle. Routes to `ff_claim_resumed_execution` on
330    /// Valkey / the epoch-bump reconciler on PG/SQLite. Returns
331    /// `Ok(None)` when the grant's target execution is no longer
332    /// resumable (already reclaimed, terminal, etc.).
333    ///
334    /// **Renamed from `claim_from_reclaim` (RFC-024 PR-B+C).** The
335    /// pre-rename name advertised "reclaim" but the semantic has
336    /// always been resume-after-suspend. The new lease-reclaim path
337    /// lives on [`Self::reclaim_execution`].
338    async fn claim_from_resume_grant(
339        &self,
340        token: ResumeToken,
341    ) -> Result<Option<Handle>, EngineError>;
342
343    /// Issue a lease-reclaim grant (RFC-024 §3.2). Admits executions
344    /// in `lease_expired_reclaimable` or `lease_revoked` state to the
345    /// reclaim path; the returned [`IssueReclaimGrantOutcome::Granted`]
346    /// carries a [`crate::contracts::ReclaimGrant`] which is then fed
347    /// to [`Self::reclaim_execution`] to mint a fresh attempt.
348    ///
349    /// Default impl returns [`EngineError::Unavailable`] — PR-D (PG),
350    /// PR-E (SQLite), and PR-F (Valkey) override with real bodies.
351    async fn issue_reclaim_grant(
352        &self,
353        _args: IssueReclaimGrantArgs,
354    ) -> Result<IssueReclaimGrantOutcome, EngineError> {
355        Err(EngineError::Unavailable {
356            op: "issue_reclaim_grant",
357        })
358    }
359
360    /// Consume a [`crate::contracts::ReclaimGrant`] to mint a fresh
361    /// attempt for a previously lease-expired / lease-revoked
362    /// execution (RFC-024 §3.2). Creates a new attempt row, bumps the
363    /// execution's `lease_reclaim_count`, and mints a
364    /// [`crate::backend::HandleKind::Reclaimed`] handle.
365    ///
366    /// Default impl returns [`EngineError::Unavailable`] — PR-D (PG),
367    /// PR-E (SQLite), and PR-F (Valkey) override with real bodies.
368    async fn reclaim_execution(
369        &self,
370        _args: ReclaimExecutionArgs,
371    ) -> Result<ReclaimExecutionOutcome, EngineError> {
372        Err(EngineError::Unavailable {
373            op: "reclaim_execution",
374        })
375    }
376
377    // Round-5 amendment: lease-releasing peers of `suspend`.
378
379    /// Park the execution until `delay_until`, releasing the lease.
380    async fn delay(&self, handle: &Handle, delay_until: TimestampMs) -> Result<(), EngineError>;
381
382    /// Mark the execution as waiting for its child flow to complete,
383    /// releasing the lease.
384    async fn wait_children(&self, handle: &Handle) -> Result<(), EngineError>;
385
386    // ── Read / admin ──
387
388    /// Snapshot an execution by id. `Ok(None)` ⇒ no such execution.
389    async fn describe_execution(
390        &self,
391        id: &ExecutionId,
392    ) -> Result<Option<ExecutionSnapshot>, EngineError>;
393
394    /// Point-read of the execution-scoped `(input_payload,
395    /// execution_kind, tags)` bundle used by the SDK worker when
396    /// assembling a `ClaimedTask` (see `ff_sdk::ClaimedTask`) after a
397    /// successful claim.
398    ///
399    /// No default impl — every `EngineBackend` must answer this
400    /// explicitly. Distinct from [`Self::describe_execution`]
401    /// (read-model projection) because the SDK needs the raw payload
402    /// bytes + kind + tags immediately post-claim, and the snapshot
403    /// projection deliberately omits the payload bytes.
404    ///
405    /// Per-backend shape:
406    ///
407    /// * **Valkey** — pipelined `GET :payload` + `HGETALL :core`
408    ///   + `HGETALL :tags` on the execution's partition (same pattern
409    ///   as [`Self::describe_execution`]).
410    /// * **Postgres** — single `SELECT payload, raw_fields` on
411    ///   `ff_exec_core` keyed by `(partition_key, execution_id)`;
412    ///   `execution_kind` + `tags` live in `raw_fields` JSONB.
413    /// * **SQLite** — identical shape to Postgres.
414    ///
415    /// Returns [`EngineError::Validation { kind: ValidationKind::InvalidInput, .. }`](crate::engine_error::EngineError::Validation)
416    /// when the execution does not exist — the SDK worker only calls
417    /// this after a successful claim, so a missing row is a loud
418    /// storage-tier invariant violation rather than a routine `Ok(None)`.
419    async fn read_execution_context(
420        &self,
421        execution_id: &ExecutionId,
422    ) -> Result<ExecutionContext, EngineError>;
423
424    /// Point-read of the execution's current attempt-index **pointer**
425    /// — the index of the currently-leased attempt row.
426    ///
427    /// Distinct from [`Self::read_total_attempt_count`]: this method
428    /// names the attempt that *already exists* (pointer), whereas
429    /// `read_total_attempt_count` is the monotonic claim counter used
430    /// to compute the next fresh attempt index. See the sibling's
431    /// rustdoc for the retry-path scenario that motivates the split.
432    ///
433    /// Used on the SDK worker's `claim_from_resume_grant` path —
434    /// specifically the private `claim_resumed_execution` helper —
435    /// immediately before dispatching [`Self::claim_resumed_execution`].
436    /// The returned index is fed into
437    /// [`ClaimResumedExecutionArgs::current_attempt_index`](crate::contracts::ClaimResumedExecutionArgs)
438    /// so the backend's script / transaction targets the correct
439    /// existing attempt row (KEYS[6] on Valkey; `ff_attempt` PK tuple
440    /// on PG/SQLite).
441    ///
442    /// Per-backend shape:
443    ///
444    /// * **Valkey** — `HGET {exec}:core current_attempt_index` on the
445    ///   execution's partition. Single command. Both the
446    ///   **missing-field** case (`exec_core` present but
447    ///   `current_attempt_index` absent or empty-string, i.e. pre-claim
448    ///   state) **and** the **missing-row** case (no `exec_core` hash
449    ///   at all) read back as `AttemptIndex(0)`. This preserves the
450    ///   pre-PR-3 inline-`HGET` semantic and is safe because Valkey's
451    ///   happy path requires `exec_core` to exist before this method
452    ///   is reached — the SDK only calls `read_current_attempt_index`
453    ///   post-grant, and grant issuance is gated on `exec_core`
454    ///   presence. A genuinely absent row would surface as the proper
455    ///   business-logic error (`NotAResumedExecution` /
456    ///   `ExecutionNotLeaseable`) on the downstream FCALL.
457    /// * **Postgres** — `SELECT attempt_index FROM ff_exec_core
458    ///   WHERE partition_key = $1 AND execution_id = $2`. The column
459    ///   is `NOT NULL DEFAULT 0` so a pre-claim row reads back as `0`
460    ///   (matching Valkey's missing-field case). **Missing row**
461    ///   surfaces as [`EngineError::Validation { kind:
462    ///   ValidationKind::InvalidInput, .. }`](crate::engine_error::EngineError::Validation)
463    ///   — diverges from Valkey's missing-row `→ 0` mapping.
464    /// * **SQLite** — `SELECT attempt_index FROM ff_exec_core
465    ///   WHERE partition_key = ? AND execution_id = ?`; identical
466    ///   semantics to Postgres (missing-row → `InvalidInput`).
467    ///
468    /// **Cross-backend asymmetry on missing row is intentional.** The
469    /// SDK happy path never observes it (grant issuance on Valkey
470    /// requires `exec_core`, and PG/SQLite currently return
471    /// `Unavailable` from `claim_from_grant` per
472    /// `project_claim_from_grant_pg_sqlite_gap.md`). Consumers writing
473    /// backend-agnostic tooling against this method directly must
474    /// treat the missing-row case as backend-dependent — match on
475    /// `InvalidInput` for PG/SQLite, and treat an unexpected `0` as
476    /// the Valkey equivalent signal.
477    ///
478    /// The default impl returns [`EngineError::Unavailable`] so the
479    /// trait addition is non-breaking for out-of-tree backends (same
480    /// precedent as [`Self::read_execution_context`] landing in v0.12
481    /// PR-1).
482    async fn read_current_attempt_index(
483        &self,
484        _execution_id: &ExecutionId,
485    ) -> Result<AttemptIndex, EngineError> {
486        Err(EngineError::Unavailable {
487            op: "read_current_attempt_index",
488        })
489    }
490
491    /// Point-read of the execution's **total attempt counter** — the
492    /// monotonic count of claims that have ever fired against this
493    /// execution (including the in-flight one once claimed).
494    ///
495    /// Used on the SDK worker's `claim_from_grant` / `claim_execution`
496    /// path — the next attempt-index for a fresh claim is this
497    /// counter's current value (so `1` on the second retry after the
498    /// first attempt failed terminally). This is semantically distinct
499    /// from [`Self::read_current_attempt_index`], which is a *pointer*
500    /// at the currently-leased attempt row and is only meaningful on
501    /// the `claim_from_resume_grant` path (where a live attempt already
502    /// exists and we want to re-seat its lease rather than mint a new
503    /// attempt row).
504    ///
505    /// Reading the pointer on the `claim_from_grant` path was a live
506    /// bug: on the retry-of-a-retry scenario the pointer still named
507    /// the *previous* terminal-failed attempt, so the newly-minted
508    /// attempt collided with it (Valkey KEYS[6]) or mis-targeted the
509    /// PG/SQLite `ff_attempt` PK tuple. This method fixes that by
510    /// reading the counter that Lua 5920 / PG `ff_claim_execution` /
511    /// SQLite `claim_impl` all already consult when computing the
512    /// next attempt index.
513    ///
514    /// Per-backend shape:
515    ///
516    /// * **Valkey** — `HGET {exec}:core total_attempt_count` on the
517    ///   execution's partition. Single command; pre-claim read (field
518    ///   absent or empty) maps to `0`.
519    /// * **Postgres** — `SELECT raw_fields->>'total_attempt_count'
520    ///   FROM ff_exec_core WHERE (partition_key, execution_id) = ...`.
521    ///   The field lives in the JSONB `raw_fields` bag rather than a
522    ///   dedicated column (mirrors how `create_execution_impl` seeds
523    ///   it on row creation). Missing row → `InvalidInput`; missing
524    ///   field → `0`.
525    /// * **SQLite** — `SELECT CAST(json_extract(raw_fields,
526    ///   '$.total_attempt_count') AS INTEGER) FROM ff_exec_core
527    ///   WHERE ...`. Same JSON-in-`raw_fields` shape as PG; uses the
528    ///   same `json_extract` idiom already employed in
529    ///   `ff-backend-sqlite/src/queries/operator.rs` for replay_count.
530    ///
531    /// The default impl returns [`EngineError::Unavailable`] so the
532    /// trait addition is non-breaking for out-of-tree backends (same
533    /// precedent as [`Self::read_current_attempt_index`] landing in
534    /// v0.12 PR-3).
535    async fn read_total_attempt_count(
536        &self,
537        _execution_id: &ExecutionId,
538    ) -> Result<AttemptIndex, EngineError> {
539        Err(EngineError::Unavailable {
540            op: "read_total_attempt_count",
541        })
542    }
543
544    /// Snapshot a flow by id. `Ok(None)` ⇒ no such flow.
545    async fn describe_flow(&self, id: &FlowId) -> Result<Option<FlowSnapshot>, EngineError>;
546
547    // ── Namespaced tag point-writes / reads (issue #433) ──
548
549    /// Set a single namespaced tag on an execution. Tag `key` MUST match
550    /// the reserved caller-namespace pattern `^[a-z][a-z0-9_]*\.[a-z0-9_][a-z0-9_.]*$` —
551    /// i.e. `<caller>.<field>` — or the call returns
552    /// [`EngineError::Validation { kind: ValidationKind::InvalidInput, .. }`](crate::engine_error::EngineError::Validation)
553    /// with the offending key in `detail`. `value` is arbitrary UTF-8.
554    ///
555    /// The namespace prefix is carried inline in `key` (e.g.
556    /// `"cairn.session_id"`) — there is no separate `namespace` arg.
557    /// This matches the existing `ff_set_execution_tags` wire shape and
558    /// the flow-tag projection in [`ExecutionSnapshot::tags`].
559    ///
560    /// Validation is performed by each overriding backend impl via
561    /// [`validate_tag_key`] **before** the wire hop so PG / SQLite /
562    /// Valkey reject the same set of keys. The default trait impl
563    /// returns [`EngineError::Unavailable`] without running validation
564    /// — there is no meaningful storage to validate against on an
565    /// unsupported backend, and surfacing `Unavailable` before
566    /// `Validation` matches the precedence used elsewhere on the trait.
567    /// Backends MAY additionally validate on the storage tier (Valkey's
568    /// Lua path does, with a more permissive prefix-only check).
569    ///
570    /// Per-backend shape:
571    ///
572    /// * **Valkey** — `ff_set_execution_tags` FCALL with a single
573    ///   `{key → value}` pair. Routes through the existing Lua
574    ///   contract (no new wire format).
575    /// * **Postgres** — `UPDATE ff_exec_core SET raw_fields = jsonb_set(
576    ///   coalesce(raw_fields, '{}'::jsonb), '{tags,<key>}', to_jsonb($value))
577    ///   WHERE (partition_key, execution_id) = ...`. Same storage shape
578    ///   read by [`Self::describe_execution`] / [`Self::read_execution_context`].
579    /// * **SQLite** — `UPDATE ff_exec_core SET raw_fields = json_set(
580    ///   coalesce(raw_fields, '{}'), '$.tags."<key>"', $value) WHERE ...`.
581    ///   The key is quoted in the JSON path so dots inside the
582    ///   namespaced key (e.g. `cairn.session_id`) are treated as a
583    ///   single literal member name rather than JSON-path separators —
584    ///   yielding the same flat `raw_fields.tags` shape as PG.
585    ///
586    /// Missing execution surfaces as
587    /// [`EngineError::NotFound { entity: "execution" }`](crate::engine_error::EngineError::NotFound)
588    /// — matches the Valkey FCALL's `execution_not_found` mapping and
589    /// the existing `ScriptError::ExecutionNotFound` → `EngineError`
590    /// conversion (`ff_script::engine_error_ext`).
591    ///
592    /// The default impl returns [`EngineError::Unavailable`] so the
593    /// trait addition is non-breaking for out-of-tree backends.
594    async fn set_execution_tag(
595        &self,
596        _execution_id: &ExecutionId,
597        _key: &str,
598        _value: &str,
599    ) -> Result<(), EngineError> {
600        Err(EngineError::Unavailable {
601            op: "set_execution_tag",
602        })
603    }
604
605    /// Set a single namespaced tag on a flow. Same namespace rule as
606    /// [`Self::set_execution_tag`]: `key` MUST match
607    /// `^[a-z][a-z0-9_]*\.[a-z0-9_][a-z0-9_.]*$`.
608    ///
609    /// Per-backend shape:
610    ///
611    /// * **Valkey** — `ff_set_flow_tags` FCALL with a single pair.
612    ///   Tags land on the dedicated `ff:flow:{fp:N}:<flow_id>:tags`
613    ///   hash, not on the `flow_core` hash (diverges from the
614    ///   execution shape — execution tags live on `ff:exec:...:tags`
615    ///   by the same split). **Lazy migration on first write**: the
616    ///   Lua (`ff_script::flowfabric.lua`, `ff_set_flow_tags`) scans
617    ///   `flow_core` once per flow for pre-58.4 inline namespaced
618    ///   fields (anything matching `^[a-z][a-z0-9_]*\.`), HSETs them
619    ///   onto `:tags`, HDELs them from `flow_core`, and stamps
620    ///   `tags_migrated=1` on `flow_core` so subsequent calls
621    ///   short-circuit to O(1). This heals flows created before
622    ///   RFC-058.4 landed; well-formed flows pay the migration cost
623    ///   only on their very first tag write. Callers MUST read tags
624    ///   via [`Self::get_flow_tag`] (`HGET :tags <key>`) — direct
625    ///   `HGETALL` against `flow_core` will not see post-migration
626    ///   values.
627    ///
628    ///   **Cross-backend parity caveat on `describe_flow`**: the
629    ///   pre-existing `ValkeyBackend::describe_flow` /
630    ///   `FlowSnapshot::tags` read path snapshots `flow_core` fields
631    ///   only and does NOT today merge the `:tags` sub-hash, whereas
632    ///   Postgres `describe_flow` DOES surface flow tags via
633    ///   `ff_backend_postgres::flow::extract_tags` (which reads them
634    ///   off `raw_fields` — the same store `set_flow_tag` writes on
635    ///   PG). Trait consumers MUST NOT assume a tag written here
636    ///   will be visible via `describe_flow` on every backend: on
637    ///   Valkey, callers that need the full tag set should
638    ///   complement the snapshot with per-key [`Self::get_flow_tag`]
639    ///   reads. Extending Valkey `describe_flow` to merge `:tags`
640    ///   is additive and out of scope for this trait addition.
641    /// * **Postgres** — `UPDATE ff_flow_core SET raw_fields =
642    ///   jsonb_set(..., '{<key>}', ...)` — flow tags are stored as
643    ///   top-level `raw_fields` keys (matches
644    ///   `ff_backend_postgres::flow::extract_tags`). No `tags` nesting
645    ///   on flows, which diverges from the execution shape.
646    /// * **SQLite** — mirrors PG: `UPDATE ff_flow_core SET raw_fields =
647    ///   json_set(..., '$."<key>"', $value) WHERE ...`. The key is
648    ///   quoted so the dotted namespaced key lands as a single flat
649    ///   top-level member of `raw_fields`.
650    ///
651    /// Missing flow surfaces as
652    /// [`EngineError::NotFound { entity: "flow" }`](crate::engine_error::EngineError::NotFound)
653    /// (matches the Valkey FCALL's `flow_not_found` mapping).
654    ///
655    /// The default impl returns [`EngineError::Unavailable`].
656    async fn set_flow_tag(
657        &self,
658        _flow_id: &FlowId,
659        _key: &str,
660        _value: &str,
661    ) -> Result<(), EngineError> {
662        Err(EngineError::Unavailable {
663            op: "set_flow_tag",
664        })
665    }
666
667    /// Read a single namespaced execution tag. Returns `Ok(None)` when
668    /// the tag is absent **or** the execution row does not exist —
669    /// the two cases are not distinguished on the read path. Callers
670    /// that need to distinguish should call [`Self::describe_execution`]
671    /// first (an `Ok(None)` from that method proves the execution is
672    /// absent). This matches Valkey's native `HGET` semantics and
673    /// keeps the read path at a single round-trip on every backend.
674    ///
675    /// `key` must pass [`validate_tag_key`] — a malformed key can
676    /// never be present in storage so the call short-circuits with
677    /// [`EngineError::Validation { kind: ValidationKind::InvalidInput, .. }`](crate::engine_error::EngineError::Validation)
678    /// rather than round-tripping.
679    ///
680    /// Per-backend shape:
681    ///
682    /// * **Valkey** — `HGET :tags <key>` on the execution's partition.
683    /// * **Postgres** — `SELECT raw_fields->'tags'->><key> FROM ff_exec_core
684    ///   WHERE ...` with `fetch_optional` → missing row collapses to `None`.
685    /// * **SQLite** — `SELECT json_extract(raw_fields, '$.tags."<key>"')
686    ///   FROM ff_exec_core WHERE ...` with the same collapse. The key is
687    ///   quoted in the JSON path so dotted namespaced keys resolve to
688    ///   the flat literal member written by `set_execution_tag`.
689    ///
690    /// The default impl returns [`EngineError::Unavailable`].
691    async fn get_execution_tag(
692        &self,
693        _execution_id: &ExecutionId,
694        _key: &str,
695    ) -> Result<Option<String>, EngineError> {
696        Err(EngineError::Unavailable {
697            op: "get_execution_tag",
698        })
699    }
700
701    /// Read an execution's `namespace` scalar. Returns `Ok(None)` when
702    /// the row is absent or the field is unset. Dedicated point-read
703    /// used by the scanner per-candidate filter (`should_skip_candidate`)
704    /// to preserve the 1-HGET cost contract documented in
705    /// `ff_engine::scanner::should_skip_candidate` — `describe_execution`
706    /// is heavier (HGETALL / full snapshot) and unnecessary when only
707    /// the namespace scalar is needed.
708    ///
709    /// Per-backend shape:
710    ///
711    /// * **Valkey** — `HGET :core namespace` on the execution's partition
712    ///   (single field read on the already-hot exec_core hash).
713    /// * **Postgres** — `SELECT raw_fields->>'namespace' FROM ff_exec_core
714    ///   WHERE partition_key = $1 AND execution_id = $2`.
715    /// * **SQLite** — `SELECT json_extract(raw_fields, '$.namespace')
716    ///   FROM ff_exec_core WHERE ...`.
717    ///
718    /// The default impl returns [`EngineError::Unavailable`].
719    async fn get_execution_namespace(
720        &self,
721        _execution_id: &ExecutionId,
722    ) -> Result<Option<String>, EngineError> {
723        Err(EngineError::Unavailable {
724            op: "get_execution_namespace",
725        })
726    }
727
728    /// Read a single namespaced flow tag. Returns `Ok(None)` when
729    /// the tag is absent **or** the flow row does not exist (same
730    /// collapse semantics as [`Self::get_execution_tag`]). Symmetry
731    /// partner — consumers like cairn read `cairn.session_id` off
732    /// flows for archival.
733    ///
734    /// `key` must pass [`validate_tag_key`].
735    ///
736    /// Per-backend shape:
737    ///
738    /// * **Valkey** — `HGET :tags <key>` on the flow's partition.
739    /// * **Postgres** — `SELECT raw_fields->><key> FROM ff_flow_core
740    ///   WHERE ...` (top-level `raw_fields` key, matches the flow-tag
741    ///   storage shape).
742    /// * **SQLite** — `SELECT json_extract(raw_fields, '$."<key>"')
743    ///   FROM ff_flow_core WHERE ...` (quoted key — see
744    ///   `set_flow_tag`).
745    ///
746    /// The default impl returns [`EngineError::Unavailable`].
747    async fn get_flow_tag(
748        &self,
749        _flow_id: &FlowId,
750        _key: &str,
751    ) -> Result<Option<String>, EngineError> {
752        Err(EngineError::Unavailable {
753            op: "get_flow_tag",
754        })
755    }
756
757    /// List dependency edges adjacent to an execution. Read-only; the
758    /// backend resolves the subject execution's flow, reads the
759    /// direction-specific adjacency SET, and decodes each member's
760    /// flow-scoped `edge:<edge_id>` hash.
761    ///
762    /// Returns an empty `Vec` when the subject has no edges on the
763    /// requested side — including standalone executions (no owning
764    /// flow). Ordering is unspecified: the underlying adjacency SET
765    /// is an unordered SMEMBERS read. Callers that need deterministic
766    /// order should sort by [`EdgeSnapshot::edge_id`] /
767    /// [`EdgeSnapshot::created_at`] themselves.
768    ///
769    /// Parse failures on the edge hash surface as
770    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]
771    /// — unknown fields, missing required fields, endpoint mismatches
772    /// against the adjacency SET all fail loud rather than silently
773    /// returning partial results.
774    ///
775    /// Gated on the `core` feature — edge reads are part of the
776    /// minimal engine surface a Postgres-style backend must honour.
777    ///
778    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
779    #[cfg(feature = "core")]
780    async fn list_edges(
781        &self,
782        _flow_id: &FlowId,
783        _direction: EdgeDirection,
784    ) -> Result<Vec<EdgeSnapshot>, EngineError> {
785        Err(EngineError::Unavailable { op: "list_edges" })
786    }
787
788    /// Snapshot a single dependency edge by its owning flow + edge id.
789    ///
790    /// `Ok(None)` when the edge hash is absent (never staged, or
791    /// staged under a different flow than `flow_id`). Parse failures
792    /// on a present edge hash surface as
793    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]
794    /// — the stored `flow_id` field is cross-checked against the
795    /// caller's expected `flow_id` so a wrong-key read fails loud
796    /// rather than returning an unrelated edge.
797    ///
798    /// Gated on the `core` feature — single-edge reads are part of
799    /// the minimal snapshot surface an alternate backend must honour
800    /// alongside [`Self::describe_execution`] / [`Self::describe_flow`]
801    /// / [`Self::list_edges`].
802    ///
803    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
804    #[cfg(feature = "core")]
805    async fn describe_edge(
806        &self,
807        _flow_id: &FlowId,
808        _edge_id: &EdgeId,
809    ) -> Result<Option<EdgeSnapshot>, EngineError> {
810        Err(EngineError::Unavailable {
811            op: "describe_edge",
812        })
813    }
814
815    /// Resolve an execution's owning flow id, if any.
816    ///
817    /// `Ok(None)` when the execution's core record is absent or has
818    /// no associated flow (standalone execution). A present-but-
819    /// malformed `flow_id` field surfaces as
820    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`].
821    ///
822    /// Gated on the `core` feature. Used by ff-sdk's
823    /// `list_outgoing_edges` / `list_incoming_edges` to pivot from a
824    /// consumer-supplied `ExecutionId` to the `FlowId` required by
825    /// [`Self::list_edges`]. A Valkey backend serves this with a
826    /// single `HGET exec_core flow_id`; a Postgres backend serves it
827    /// with the equivalent single-column row lookup.
828    ///
829    /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
830    #[cfg(feature = "core")]
831    async fn resolve_execution_flow_id(
832        &self,
833        _eid: &ExecutionId,
834    ) -> Result<Option<FlowId>, EngineError> {
835        Err(EngineError::Unavailable {
836            op: "resolve_execution_flow_id",
837        })
838    }
839
840    /// List flows on a partition with cursor-based pagination (issue
841    /// #185).
842    ///
843    /// Returns a [`ListFlowsPage`] of [`FlowSummary`](crate::contracts::FlowSummary)
844    /// rows ordered by `flow_id` (UUID byte-lexicographic). `cursor`
845    /// is `None` for the first page; callers forward the returned
846    /// `next_cursor` verbatim to continue iteration, and the listing
847    /// is exhausted when `next_cursor` is `None`. `limit` is the
848    /// maximum number of rows to return on this page — implementations
849    /// MAY return fewer (end of partition) but MUST NOT exceed it.
850    ///
851    /// Ordering rationale: flow ids are UUIDs, and both Valkey
852    /// (sort after-the-fact) and Postgres (`ORDER BY flow_id`) can
853    /// agree on byte-lexicographic order — the same order
854    /// `FlowId::to_string()` produces for canonical hyphenated UUIDs.
855    /// Mapping to `cursor > flow_id` keeps the contract backend-
856    /// independent.
857    ///
858    /// # Postgres implementation pattern
859    ///
860    /// A Postgres-backed implementation serves this directly with
861    ///
862    /// ```sql
863    /// SELECT flow_id, created_at_ms, public_flow_state
864    ///   FROM ff_flow
865    ///  WHERE partition_key = $1
866    ///    AND ($2::uuid IS NULL OR flow_id > $2)
867    ///  ORDER BY flow_id
868    ///  LIMIT $3 + 1;
869    /// ```
870    ///
871    /// — reading one extra row to decide whether `next_cursor` should
872    /// be set to the last row's `flow_id`. The Valkey implementation
873    /// maintains the `ff:idx:{fp:N}:flow_index` SET and performs the
874    /// sort + slice client-side (SMEMBERS then sort-by-UUID-bytes),
875    /// pipelining `HGETALL flow_core` for each row on the page.
876    ///
877    /// Gated on the `core` feature — flow listing is part of the
878    /// minimal engine surface a Postgres-style backend must honour.
879    #[cfg(feature = "core")]
880    async fn list_flows(
881        &self,
882        _partition: PartitionKey,
883        _cursor: Option<FlowId>,
884        _limit: usize,
885    ) -> Result<ListFlowsPage, EngineError> {
886        Err(EngineError::Unavailable { op: "list_flows" })
887    }
888
889    /// Enumerate registered lanes with cursor-based pagination.
890    ///
891    /// Lanes are global (not partition-scoped) — the backend serves
892    /// this from its lane registry and does NOT accept a
893    /// [`crate::partition::Partition`] argument. Results are sorted
894    /// by [`LaneId`] name so the ordering is stable across calls and
895    /// cursors address a deterministic position in the sort.
896    ///
897    /// * `cursor` — exclusive lower bound. `None` starts from the
898    ///   first lane. To continue a walk, pass the previous page's
899    ///   [`ListLanesPage::next_cursor`].
900    /// * `limit` — hard cap on the number of lanes returned in the
901    ///   page. Backends MAY round this down when the registry size
902    ///   is smaller; they MUST NOT return more than `limit`.
903    ///
904    /// [`ListLanesPage::next_cursor`] is `Some(last_lane_in_page)`
905    /// iff at least one more lane exists after the returned page,
906    /// and `None` on the final page. Callers loop until `next_cursor`
907    /// is `None` to read the full registry.
908    ///
909    /// Gated on the `core` feature — lane enumeration is part of the
910    /// minimal snapshot surface an alternate backend must honour
911    /// alongside [`Self::describe_flow`] / [`Self::list_edges`].
912    #[cfg(feature = "core")]
913    async fn list_lanes(
914        &self,
915        _cursor: Option<LaneId>,
916        _limit: usize,
917    ) -> Result<ListLanesPage, EngineError> {
918        Err(EngineError::Unavailable { op: "list_lanes" })
919    }
920
921    /// List suspended executions in one partition, cursor-paginated,
922    /// with each entry's suspension `reason_code` populated (issue
923    /// #183).
924    ///
925    /// Consumer-facing "what's blocked on what?" panels (ff-board's
926    /// suspended-executions view, operator CLIs) need the reason in
927    /// the list response so the UI does not round-trip per row to
928    /// `describe_execution` for a field it knows it needs. `reason`
929    /// on [`SuspendedExecutionEntry`] carries the free-form
930    /// `suspension:current.reason_code` field — see the type rustdoc
931    /// for the String-not-enum rationale.
932    ///
933    /// `cursor` is opaque to callers; pass `None` to start a fresh
934    /// scan and feed the returned [`ListSuspendedPage::next_cursor`]
935    /// back in on subsequent pages until it comes back `None`.
936    /// `limit` bounds the `entries` count; backends MAY return fewer
937    /// when the partition is exhausted.
938    ///
939    /// Ordering is by ascending `suspended_at_ms` (the per-lane
940    /// suspended ZSET score == `timeout_at` or the no-timeout
941    /// sentinel) with execution id as a lex tiebreak, so cursor
942    /// continuation is deterministic across calls.
943    ///
944    /// Gated on the `core` feature — suspended-list enumeration is
945    /// part of the minimal engine surface a Postgres-style backend
946    /// must honour.
947    #[cfg(feature = "core")]
948    async fn list_suspended(
949        &self,
950        _partition: PartitionKey,
951        _cursor: Option<ExecutionId>,
952        _limit: usize,
953    ) -> Result<ListSuspendedPage, EngineError> {
954        Err(EngineError::Unavailable {
955            op: "list_suspended",
956        })
957    }
958
959    /// Forward-only paginated listing of the executions indexed under
960    /// one partition.
961    ///
962    /// Reads the partition-wide `ff:idx:{p:N}:all_executions` set,
963    /// sorts lexicographically on `ExecutionId`, and returns the page
964    /// of ids strictly greater than `cursor` (or starting from the
965    /// smallest id when `cursor = None`). The returned
966    /// [`ListExecutionsPage::next_cursor`] is the last id on the page
967    /// iff at least one more id exists past it; `None` signals
968    /// end-of-stream.
969    ///
970    /// `limit` is the maximum number of ids returned on this page. A
971    /// `limit` of `0` returns an empty page with `next_cursor = None`.
972    /// Backends MAY cap `limit` internally (Valkey: 1000) and return
973    /// fewer ids than requested; callers continue paginating until
974    /// `next_cursor == None`.
975    ///
976    /// Ordering is stable under concurrent inserts for already-emitted
977    /// ids (an id less-than-or-equal-to the caller's cursor is never
978    /// re-emitted in later pages) but new inserts past the cursor WILL
979    /// appear in subsequent pages — consistent with forward-only
980    /// cursor semantics.
981    ///
982    /// Gated on the `core` feature — partition-scoped listing is part
983    /// of the minimal engine surface every backend must honour.
984    #[cfg(feature = "core")]
985    async fn list_executions(
986        &self,
987        _partition: PartitionKey,
988        _cursor: Option<ExecutionId>,
989        _limit: usize,
990    ) -> Result<ListExecutionsPage, EngineError> {
991        Err(EngineError::Unavailable {
992            op: "list_executions",
993        })
994    }
995
996    // ── Trigger ops (issue #150) ──
997
998    /// Deliver an external signal to a suspended execution's waitpoint.
999    ///
1000    /// The backend atomically records the signal, evaluates the resume
1001    /// condition, and — when satisfied — transitions the execution
1002    /// from `suspended` to `runnable` (or buffers the signal when the
1003    /// waitpoint is still `pending`). Duplicate delivery — same
1004    /// `idempotency_key` + waitpoint — surfaces as
1005    /// [`DeliverSignalResult::Duplicate`] with the pre-existing
1006    /// `signal_id` rather than mutating state twice.
1007    ///
1008    /// Input validation (HMAC token presence, payload size limits,
1009    /// signal-name shape) is the backend's responsibility; callers
1010    /// pass a fully populated [`DeliverSignalArgs`] and receive typed
1011    /// outcomes or typed errors (`ScriptError::invalid_token`,
1012    /// `ScriptError::token_expired`, `ScriptError::ExecutionNotFound`
1013    /// surfaced via [`EngineError::Transport`] on the Valkey backend).
1014    ///
1015    /// Gated on the `core` feature — signal delivery is part of the
1016    /// minimal trigger surface every backend must honour so ff-server
1017    /// / REST handlers can dispatch against `Arc<dyn EngineBackend>`
1018    /// without knowing which backend is running underneath.
1019    #[cfg(feature = "core")]
1020    async fn deliver_signal(
1021        &self,
1022        _args: DeliverSignalArgs,
1023    ) -> Result<DeliverSignalResult, EngineError> {
1024        Err(EngineError::Unavailable {
1025            op: "deliver_signal",
1026        })
1027    }
1028
1029    /// Claim a resumed execution — a previously-suspended attempt that
1030    /// has cleared its resume condition (e.g. via
1031    /// [`Self::deliver_signal`]) and now needs a worker to pick up the
1032    /// same attempt index.
1033    ///
1034    /// Distinct from [`Self::claim`] (fresh work) and
1035    /// [`Self::claim_from_resume_grant`] (grant-based ownership transfer
1036    /// after a crash): the resumed-claim path re-binds an existing
1037    /// attempt rather than minting a new one. The backend issues a
1038    /// fresh `lease_id` + bumps the `lease_epoch`, preserving
1039    /// `attempt_id` / `attempt_index` so stream frames and progress
1040    /// updates continue on the same attempt.
1041    ///
1042    /// Typed failures surface via `ScriptError` → `EngineError`:
1043    /// `NotAResumedExecution` when the attempt state is not
1044    /// `attempt_interrupted`, `ExecutionNotLeaseable` when the
1045    /// lifecycle phase is not `runnable`, and `InvalidClaimGrant`
1046    /// when the grant key is missing or was already consumed.
1047    ///
1048    /// Gated on the `core` feature — resumed-claim is part of the
1049    /// minimal trigger surface every backend must honour.
1050    #[cfg(feature = "core")]
1051    async fn claim_resumed_execution(
1052        &self,
1053        _args: ClaimResumedExecutionArgs,
1054    ) -> Result<ClaimResumedExecutionResult, EngineError> {
1055        Err(EngineError::Unavailable {
1056            op: "claim_resumed_execution",
1057        })
1058    }
1059
1060    /// Scan a lane's eligible ZSET on one partition for
1061    /// highest-priority executions awaiting a worker (v0.12 PR-5).
1062    ///
1063    /// Lifted from the SDK-side `ZRANGEBYSCORE` inline on
1064    /// `FlowFabricWorker::claim_next` — the scheduler-bypass scanner
1065    /// gated behind `direct-valkey-claim`. The trait method itself is
1066    /// backend-agnostic; consumers that drive the scanner loop
1067    /// (bench harnesses, single-tenant dev) compose it with
1068    /// [`Self::issue_claim_grant`] + [`Self::claim_execution`] to
1069    /// replicate the pre-PR-5 `claim_next` body.
1070    ///
1071    /// # Backend coverage
1072    ///
1073    /// * **Valkey** — `ZRANGEBYSCORE eligible_zset -inf +inf LIMIT 0 <limit>`
1074    ///   on the lane's partition-scoped eligible key. Single
1075    ///   command; no script round-trip. Wire shape is byte-for-byte
1076    ///   identical to the pre-PR SDK inline call so bench traces
1077    ///   match pre-PR without new `#[tracing::instrument]` span names.
1078    /// * **Postgres / SQLite** — use the `Err(Unavailable)` default.
1079    ///   PG/SQLite consumers drive work through the scheduler-routed
1080    ///   [`Self::claim_for_worker`] path instead of the scanner
1081    ///   primitives exposed here; lifting the scheduler itself onto
1082    ///   the trait is RFC-024 follow-up scope. See
1083    ///   `project_claim_from_grant_pg_sqlite_gap.md` for motivation.
1084    ///
1085    /// Default impl returns [`EngineError::Unavailable`] so the trait
1086    /// addition is non-breaking for out-of-tree backends. Same
1087    /// precedent as [`Self::claim_execution`] landing in v0.12 PR-4.
1088    #[cfg(feature = "core")]
1089    async fn scan_eligible_executions(
1090        &self,
1091        _args: ScanEligibleArgs,
1092    ) -> Result<Vec<ExecutionId>, EngineError> {
1093        Err(EngineError::Unavailable {
1094            op: "scan_eligible_executions",
1095        })
1096    }
1097
1098    /// Issue a claim grant — the scheduler's admission write — for a
1099    /// single execution on a single lane (v0.12 PR-5).
1100    ///
1101    /// Lifted from the SDK-side `ff_issue_claim_grant` inline helper
1102    /// on `FlowFabricWorker::claim_next`. The backend atomically
1103    /// writes the grant hash, appends to the per-worker grant index,
1104    /// and removes the execution from the lane's eligible ZSET.
1105    ///
1106    /// Typed rejects surface via [`EngineError::Validation`]:
1107    /// `CapabilityMismatch` when the worker's capabilities do not
1108    /// cover the execution's `required_capabilities`, `InvalidInput`
1109    /// for malformed args. Transport faults surface via
1110    /// [`EngineError::Transport`].
1111    ///
1112    /// # Backend coverage
1113    ///
1114    /// * **Valkey** — one `ff_issue_claim_grant` FCALL. KEYS/ARGV
1115    ///   shape is byte-for-byte identical to the pre-PR SDK inline
1116    ///   call; bench traces match pre-PR.
1117    /// * **Postgres / SQLite** — `Err(Unavailable)` default; use
1118    ///   [`Self::claim_for_worker`] instead. See
1119    ///   [`Self::scan_eligible_executions`] for the cross-link
1120    ///   rationale.
1121    #[cfg(feature = "core")]
1122    async fn issue_claim_grant(
1123        &self,
1124        _args: IssueClaimGrantArgs,
1125    ) -> Result<IssueClaimGrantOutcome, EngineError> {
1126        Err(EngineError::Unavailable {
1127            op: "issue_claim_grant",
1128        })
1129    }
1130
1131    /// Move an execution from a lane's eligible ZSET into its
1132    /// blocked_route ZSET (v0.12 PR-5).
1133    ///
1134    /// Lifted from the SDK-side `ff_block_execution_for_admission`
1135    /// inline helper on `FlowFabricWorker::claim_next`. Called after
1136    /// a [`Self::issue_claim_grant`] `CapabilityMismatch` reject —
1137    /// without a block step, the inline scanner would re-pick the
1138    /// same top-of-ZSET every tick (parity with
1139    /// `ff-scheduler::Scheduler::block_candidate`).
1140    ///
1141    /// The engine's unblock scanner periodically promotes
1142    /// blocked_route back to eligible once a worker with matching
1143    /// caps registers.
1144    ///
1145    /// # Backend coverage
1146    ///
1147    /// * **Valkey** — one `ff_block_execution_for_admission` FCALL.
1148    /// * **Postgres / SQLite** — `Err(Unavailable)` default; the
1149    ///   scheduler-routed [`Self::claim_for_worker`] path handles
1150    ///   admission rejects server-side.
1151    #[cfg(feature = "core")]
1152    async fn block_route(
1153        &self,
1154        _args: BlockRouteArgs,
1155    ) -> Result<BlockRouteOutcome, EngineError> {
1156        Err(EngineError::Unavailable { op: "block_route" })
1157    }
1158
1159    /// Consume a scheduler-issued claim grant to mint a fresh attempt.
1160    ///
1161    /// The SDK's grant-consumer path — paired with `FlowFabricWorker::claim_from_grant`
1162    /// in `ff-sdk` — routes through this method. The scheduler has
1163    /// already validated budget / quota / capabilities and written a
1164    /// grant (Valkey `claim_grant` hash); this call atomically
1165    /// consumes that grant and creates the attempt row, mints
1166    /// `lease_id` + `lease_epoch`, and returns a
1167    /// [`ClaimExecutionResult::Claimed`] carrying the minted lease
1168    /// triple.
1169    ///
1170    /// Distinct from [`Self::claim`] (the scheduler-bypass scanner
1171    /// used by the `direct-valkey-claim` feature) — this method
1172    /// assumes the grant already exists and skips capability / ZSET
1173    /// scanning. The Valkey impl fires exactly one `ff_claim_execution`
1174    /// FCALL.
1175    ///
1176    /// Typed failures surface via `ScriptError` → `EngineError`:
1177    /// `UseClaimResumedExecution` when the attempt is actually
1178    /// `attempt_interrupted` (caller should retry via
1179    /// [`Self::claim_resumed_execution`] — see `ContentionKind` at
1180    /// `ff_core::engine_error`), `InvalidClaimGrant` when the grant is
1181    /// missing / consumed / worker-mismatched, `CapabilityMismatch`
1182    /// when the execution's `required_capabilities` drifted after
1183    /// grant issuance.
1184    ///
1185    /// # Backend coverage
1186    ///
1187    /// * **Valkey** — implemented in `ff-backend-valkey` (one
1188    ///   `ff_claim_execution` FCALL).
1189    /// * **Postgres / SQLite** — use the `Err(Unavailable)` default in
1190    ///   this PR. Grants on PG / SQLite today flow through
1191    ///   `PostgresScheduler::claim_for_worker` (a sibling struct, not
1192    ///   an `EngineBackend` method); wiring the default-over-trait
1193    ///   behaviour into a PG / SQLite `claim_execution` impl lands
1194    ///   with a future RFC-024 grant-consumer extension.
1195    ///
1196    /// The default impl returns [`EngineError::Unavailable`] so the
1197    /// trait addition is non-breaking for out-of-tree backends. Same
1198    /// precedent as [`Self::read_current_attempt_index`] landing in
1199    /// v0.12 PR-3.
1200    #[cfg(feature = "core")]
1201    async fn claim_execution(
1202        &self,
1203        _args: ClaimExecutionArgs,
1204    ) -> Result<ClaimExecutionResult, EngineError> {
1205        Err(EngineError::Unavailable {
1206            op: "claim_execution",
1207        })
1208    }
1209
1210    /// Operator-initiated cancellation of a flow and (optionally) its
1211    /// member executions. See RFC-012 §3.1.1 for the policy /wait
1212    /// matrix.
1213    async fn cancel_flow(
1214        &self,
1215        id: &FlowId,
1216        policy: CancelFlowPolicy,
1217        wait: CancelFlowWait,
1218    ) -> Result<CancelFlowResult, EngineError>;
1219
1220    /// RFC-016 Stage A: set the inbound-edge-group policy for a
1221    /// downstream execution. Must be called before the first
1222    /// `add_dependency(... -> downstream_execution_id)` — the backend
1223    /// rejects with [`EngineError::Conflict`] if edges have already
1224    /// been staged for this group.
1225    ///
1226    /// Stage A honours only
1227    /// [`EdgeDependencyPolicy::AllOf`](crate::contracts::EdgeDependencyPolicy::AllOf);
1228    /// the `AnyOf` / `Quorum` variants return
1229    /// [`EngineError::Validation`] with
1230    /// `detail = "stage A supports AllOf only; AnyOf/Quorum land in stage B"`
1231    /// until Stage B's resolver lands.
1232    #[cfg(feature = "core")]
1233    async fn set_edge_group_policy(
1234        &self,
1235        _flow_id: &FlowId,
1236        _downstream_execution_id: &ExecutionId,
1237        _policy: crate::contracts::EdgeDependencyPolicy,
1238    ) -> Result<crate::contracts::SetEdgeGroupPolicyResult, EngineError> {
1239        Err(EngineError::Unavailable {
1240            op: "set_edge_group_policy",
1241        })
1242    }
1243
1244    // ── HMAC secret rotation (v0.7 migration-master Q4) ──
1245
1246    /// Rotate the waitpoint HMAC signing kid **cluster-wide**.
1247    ///
1248    /// **v0.7 migration-master Q4 (adjudicated 2026-04-24).**
1249    /// Additive trait surface so Valkey and Postgres backends can
1250    /// both expose the "rotate everywhere" semantic under one name.
1251    ///
1252    /// * Valkey impl fans out an `ff_rotate_waitpoint_hmac_secret`
1253    ///   FCALL per execution partition. `entries.len() == num_flow_partitions`
1254    ///   and per-partition failures are surfaced as inner `Err`
1255    ///   entries — the call as a whole does not fail when one
1256    ///   partition's FCALL fails, matching
1257    ///   [`ff_sdk::admin::rotate_waitpoint_hmac_secret_all_partitions`]'s
1258    ///   partial-success contract.
1259    /// * Postgres impl (Wave 4) writes one row to
1260    ///   `ff_waitpoint_hmac(kid, secret, rotated_at)` and returns a
1261    ///   single-entry vec with `partition = 0`.
1262    ///
1263    /// The default impl returns
1264    /// [`EngineError::Unavailable`] with
1265    /// `op = "rotate_waitpoint_hmac_secret_all"` so backends that
1266    /// haven't implemented the method surface the miss loudly rather
1267    /// than silently no-op'ing. Both concrete backends override.
1268    async fn rotate_waitpoint_hmac_secret_all(
1269        &self,
1270        _args: RotateWaitpointHmacSecretAllArgs,
1271    ) -> Result<RotateWaitpointHmacSecretAllResult, EngineError> {
1272        Err(EngineError::Unavailable {
1273            op: "rotate_waitpoint_hmac_secret_all",
1274        })
1275    }
1276
1277    /// Seed the initial waitpoint HMAC secret for a fresh deployment
1278    /// (issue #280).
1279    ///
1280    /// **Idempotent.** If a `current_kid` (Valkey per-partition) or
1281    /// an active kid row (Postgres) already exists with the given
1282    /// `kid`, the method returns
1283    /// [`SeedOutcome::AlreadySeeded`] without overwriting, reporting
1284    /// whether the stored secret matches the caller-supplied one via
1285    /// `same_secret`. Callers (cairn boot, operator tooling) invoke
1286    /// this on every boot and let the backend decide whether to
1287    /// install — removing the client-side "check then HSET" race that
1288    /// cairn's raw-HSET boot path silently tolerated.
1289    ///
1290    /// For rotation of an already-seeded secret, use
1291    /// [`Self::rotate_waitpoint_hmac_secret_all`] instead; seed is
1292    /// install-only.
1293    ///
1294    /// The default impl returns [`EngineError::Unavailable`] with
1295    /// `op = "seed_waitpoint_hmac_secret"` so backends that haven't
1296    /// implemented the method surface the miss loudly.
1297    async fn seed_waitpoint_hmac_secret(
1298        &self,
1299        _args: SeedWaitpointHmacSecretArgs,
1300    ) -> Result<SeedOutcome, EngineError> {
1301        Err(EngineError::Unavailable {
1302            op: "seed_waitpoint_hmac_secret",
1303        })
1304    }
1305
1306    // ── Budget ──
1307
1308    /// Report usage against a budget and check limits. Returns the
1309    /// typed [`ReportUsageResult`] variant; backends enforce
1310    /// idempotency via the caller-supplied
1311    /// [`UsageDimensions::dedup_key`] (RFC-012 §R7.2.3 — replaces
1312    /// the pre-Round-7 `AdmissionDecision` return).
1313    async fn report_usage(
1314        &self,
1315        handle: &Handle,
1316        budget: &BudgetId,
1317        dimensions: crate::backend::UsageDimensions,
1318    ) -> Result<ReportUsageResult, EngineError>;
1319
1320    // ── Stream reads (RFC-012 Stage 1c tranche-4; issue #87) ──
1321
1322    /// Read frames from a completed or in-flight attempt's stream.
1323    ///
1324    /// `from` / `to` are [`StreamCursor`] values — `StreamCursor::Start`
1325    /// / `StreamCursor::End` are equivalent to XRANGE `-` / `+`, and
1326    /// `StreamCursor::At("<id>")` reads from a concrete entry id.
1327    ///
1328    /// Input validation (count_limit bounds, cursor shape) is the
1329    /// caller's responsibility — SDK-side wrappers in
1330    /// [`ff-sdk`](https://docs.rs/ff-sdk) enforce bounds before
1331    /// forwarding. Backends MAY additionally reject out-of-range
1332    /// input via [`EngineError::Validation`].
1333    ///
1334    /// Gated on the `streaming` feature — stream reads are part of
1335    /// the stream-subset surface a backend without XREAD-like
1336    /// primitives may omit.
1337    #[cfg(feature = "streaming")]
1338    async fn read_stream(
1339        &self,
1340        _execution_id: &ExecutionId,
1341        _attempt_index: AttemptIndex,
1342        _from: StreamCursor,
1343        _to: StreamCursor,
1344        _count_limit: u64,
1345    ) -> Result<StreamFrames, EngineError> {
1346        Err(EngineError::Unavailable { op: "read_stream" })
1347    }
1348
1349    /// Tail a live attempt's stream.
1350    ///
1351    /// `after` is an exclusive [`StreamCursor`] — entries with id
1352    /// strictly greater than `after` are returned. `StreamCursor::Start`
1353    /// / `StreamCursor::End` are NOT accepted here; callers MUST pass
1354    /// a concrete id (or `StreamCursor::from_beginning()`). The SDK
1355    /// wrapper rejects the open markers before reaching the backend.
1356    ///
1357    /// `block_ms == 0` → non-blocking peek. `block_ms > 0` → blocks up
1358    /// to that many ms for a new entry.
1359    ///
1360    /// `visibility` (RFC-015 §6.1) filters the returned entries by
1361    /// their stored [`StreamMode`](crate::backend::StreamMode)
1362    /// `mode` field. Default
1363    /// [`TailVisibility::All`](crate::backend::TailVisibility::All)
1364    /// preserves v1 behaviour.
1365    ///
1366    /// Gated on the `streaming` feature — see [`read_stream`](Self::read_stream).
1367    #[cfg(feature = "streaming")]
1368    async fn tail_stream(
1369        &self,
1370        _execution_id: &ExecutionId,
1371        _attempt_index: AttemptIndex,
1372        _after: StreamCursor,
1373        _block_ms: u64,
1374        _count_limit: u64,
1375        _visibility: TailVisibility,
1376    ) -> Result<StreamFrames, EngineError> {
1377        Err(EngineError::Unavailable { op: "tail_stream" })
1378    }
1379
1380    /// Read the rolling summary document for an attempt (RFC-015 §6.3).
1381    ///
1382    /// Returns `Ok(None)` when no [`StreamMode::DurableSummary`](crate::backend::StreamMode::DurableSummary)
1383    /// frame has ever been appended for the attempt. Non-blocking Hash
1384    /// read; safe to call from any consumer without holding the lease.
1385    ///
1386    /// Gated on the `streaming` feature — summary reads are part of
1387    /// the stream-subset surface.
1388    #[cfg(feature = "streaming")]
1389    async fn read_summary(
1390        &self,
1391        _execution_id: &ExecutionId,
1392        _attempt_index: AttemptIndex,
1393    ) -> Result<Option<SummaryDocument>, EngineError> {
1394        Err(EngineError::Unavailable {
1395            op: "read_summary",
1396        })
1397    }
1398
1399    // ── RFC-017 Stage A — Ingress (5) ──────────────────────────
1400    //
1401    // Every method in this block has a default impl returning
1402    // `EngineError::Unavailable { op }` per RFC-017 §5.3. Concrete
1403    // backends override each method with a real body. A missing
1404    // override surfaces as a loud typed error at the call site rather
1405    // than a silent no-op.
1406
1407    /// Create an execution. Ingress row 6 (RFC-017 §4). Wraps
1408    /// `ff_create_execution` on Valkey; `INSERT INTO ff_execution ...`
1409    /// on Postgres. The `idempotency_key` + backend-side default
1410    /// `dedup_ttl_ms = 86400000` make duplicate submissions idempotent.
1411    #[cfg(feature = "core")]
1412    async fn create_execution(
1413        &self,
1414        _args: CreateExecutionArgs,
1415    ) -> Result<CreateExecutionResult, EngineError> {
1416        Err(EngineError::Unavailable {
1417            op: "create_execution",
1418        })
1419    }
1420
1421    /// Create a flow header. Ingress row 5.
1422    #[cfg(feature = "core")]
1423    async fn create_flow(
1424        &self,
1425        _args: CreateFlowArgs,
1426    ) -> Result<CreateFlowResult, EngineError> {
1427        Err(EngineError::Unavailable { op: "create_flow" })
1428    }
1429
1430    /// Atomically add an execution to a flow (single-FCALL co-located
1431    /// commit on Valkey; single-transaction UPSERT on Postgres).
1432    #[cfg(feature = "core")]
1433    async fn add_execution_to_flow(
1434        &self,
1435        _args: AddExecutionToFlowArgs,
1436    ) -> Result<AddExecutionToFlowResult, EngineError> {
1437        Err(EngineError::Unavailable {
1438            op: "add_execution_to_flow",
1439        })
1440    }
1441
1442    /// Stage a dependency edge between flow members. CAS-guarded on
1443    /// `graph_revision` — stale rev returns `Contention(StaleGraphRevision)`.
1444    #[cfg(feature = "core")]
1445    async fn stage_dependency_edge(
1446        &self,
1447        _args: StageDependencyEdgeArgs,
1448    ) -> Result<StageDependencyEdgeResult, EngineError> {
1449        Err(EngineError::Unavailable {
1450            op: "stage_dependency_edge",
1451        })
1452    }
1453
1454    /// Apply a staged dependency edge to its downstream child.
1455    #[cfg(feature = "core")]
1456    async fn apply_dependency_to_child(
1457        &self,
1458        _args: ApplyDependencyToChildArgs,
1459    ) -> Result<ApplyDependencyToChildResult, EngineError> {
1460        Err(EngineError::Unavailable {
1461            op: "apply_dependency_to_child",
1462        })
1463    }
1464
1465    /// Resolve one dependency edge after its upstream reached a
1466    /// terminal outcome — satisfy on "success", mark impossible
1467    /// otherwise. Idempotent (`AlreadyResolved` on replay).
1468    ///
1469    /// PR-7b Step 0 overlap-resolver: lifted here so cluster 2
1470    /// (`scanner/dependency_reconciler`) could trait-route through
1471    /// `Arc<dyn EngineBackend>` without a merge conflict with cluster
1472    /// 4. Cluster 4 (`completion_listener::spawn_dispatch_loop`)
1473    /// ultimately routed through the coarser
1474    /// [`Self::cascade_completion`] (per-payload) instead of looping
1475    /// over this per-edge method, because the Postgres cascade is
1476    /// outbox-driven rather than per-edge — see `cascade_completion`
1477    /// rustdoc "Timing semantics" for details.
1478    ///
1479    /// # Backend status
1480    ///
1481    /// - **Valkey:** wraps `ff_resolve_dependency` (RFC-016 Stage C
1482    ///   signature). Atomic single-slot FCALL.
1483    /// - **Postgres:** `Unavailable`. PG's post-completion cascade is
1484    ///   not per-edge; it runs via
1485    ///   `ff_backend_postgres::dispatch::dispatch_completion(event_id)`
1486    ///   keyed on the `ff_completion_event` outbox row. The Valkey-
1487    ///   shaped per-edge resolve does not map cleanly to that model;
1488    ///   PG's `dependency_reconciler` already calls `dispatch_completion`
1489    ///   directly. The engine's PR-7b/final integration test expects
1490    ///   `Unsupported` logs from Valkey-shaped scanners on a PG
1491    ///   deployment — this surface honours that contract.
1492    /// - **SQLite:** `Unavailable` for the same reason (mirrors PG).
1493    ///
1494    /// The default impl returns [`EngineError::Unavailable`] so a
1495    /// backend that has not been migrated surfaces a typed
1496    /// `Unsupported`-grade error rather than a panic.
1497    #[cfg(feature = "core")]
1498    async fn resolve_dependency(
1499        &self,
1500        _args: crate::contracts::ResolveDependencyArgs,
1501    ) -> Result<crate::contracts::ResolveDependencyOutcome, EngineError> {
1502        Err(EngineError::Unavailable {
1503            op: "resolve_dependency",
1504        })
1505    }
1506
1507    /// Cascade a terminal-execution completion into its downstream
1508    /// edges. Consumed by
1509    /// `ff-engine::completion_listener::spawn_dispatch_loop` (PR-7b
1510    /// Cluster 4) to trait-route the post-completion DAG-promotion
1511    /// path through `Arc<dyn EngineBackend>`.
1512    ///
1513    /// Distinct from [`Self::resolve_dependency`]: that method is
1514    /// per-edge (one `ff_resolve_dependency` FCALL); this method is
1515    /// per-completion and orchestrates the full outgoing-edge walk
1516    /// plus `child_skipped` recursion (Valkey) or outbox-event
1517    /// dispatch (Postgres).
1518    ///
1519    /// # Timing semantics
1520    ///
1521    /// Backends diverge on *when* the caller observes cascade work.
1522    /// See [`CascadeOutcome`] for the full contract; the short form:
1523    ///
1524    /// - **Valkey:** synchronous. FCALL-driven walk completes inline;
1525    ///   `child_skipped` descendants are recursively cascaded up to the
1526    ///   internal `MAX_CASCADE_DEPTH` cap before return.
1527    /// - **Postgres:** asynchronous via the `ff_completion_event`
1528    ///   outbox. The call resolves `payload` to its `event_id`, runs
1529    ///   `ff_backend_postgres::dispatch::dispatch_completion`, and
1530    ///   returns when the outbox row has been claimed + its direct
1531    ///   hops advanced. Further-descendant cascades ride their own
1532    ///   outbox events (emitted by the per-hop tx) — NOT this call.
1533    ///
1534    /// Consumers that depend on synchronous cascade must either target
1535    /// Valkey explicitly or observe PG's `dispatched_at_ms` clearance
1536    /// via the `dependency_reconciler` partial index to verify drain.
1537    ///
1538    /// The default impl returns [`EngineError::Unavailable`] so
1539    /// backends that have not been migrated surface a typed error
1540    /// rather than a panic.
1541    ///
1542    /// [`CascadeOutcome`]: crate::contracts::CascadeOutcome
1543    #[cfg(feature = "core")]
1544    async fn cascade_completion(
1545        &self,
1546        _payload: &crate::backend::CompletionPayload,
1547    ) -> Result<crate::contracts::CascadeOutcome, EngineError> {
1548        Err(EngineError::Unavailable {
1549            op: "cascade_completion",
1550        })
1551    }
1552
1553    // ── RFC-017 Stage A — Operator control (4) ─────────────────
1554
1555    /// Operator-initiated execution cancel (row 2).
1556    #[cfg(feature = "core")]
1557    async fn cancel_execution(
1558        &self,
1559        _args: CancelExecutionArgs,
1560    ) -> Result<CancelExecutionResult, EngineError> {
1561        Err(EngineError::Unavailable {
1562            op: "cancel_execution",
1563        })
1564    }
1565
1566    /// Re-score an execution's eligibility priority (row 17).
1567    #[cfg(feature = "core")]
1568    async fn change_priority(
1569        &self,
1570        _args: ChangePriorityArgs,
1571    ) -> Result<ChangePriorityResult, EngineError> {
1572        Err(EngineError::Unavailable {
1573            op: "change_priority",
1574        })
1575    }
1576
1577    /// Replay a terminal execution (row 22). Variadic KEYS handling
1578    /// (inbound-edge pre-read) is hidden inside the Valkey impl per
1579    /// RFC-017 §4 row 3.
1580    #[cfg(feature = "core")]
1581    async fn replay_execution(
1582        &self,
1583        _args: ReplayExecutionArgs,
1584    ) -> Result<ReplayExecutionResult, EngineError> {
1585        Err(EngineError::Unavailable {
1586            op: "replay_execution",
1587        })
1588    }
1589
1590    /// Operator-initiated lease revoke (row 19).
1591    #[cfg(feature = "core")]
1592    async fn revoke_lease(
1593        &self,
1594        _args: RevokeLeaseArgs,
1595    ) -> Result<RevokeLeaseResult, EngineError> {
1596        Err(EngineError::Unavailable { op: "revoke_lease" })
1597    }
1598
1599    // ── cairn #389 — service-layer typed FCALL surface ────────────
1600    //
1601    // These methods mirror `complete`/`fail`/`renew` (which take a
1602    // worker [`Handle`]) but dispatch against `(execution_id, fence)`
1603    // tuples supplied directly. Service-layer callers (cairn's
1604    // `valkey_control_plane_impl.rs`, future consumers that hold a
1605    // run/lease descriptor without a `Handle`) use these to avoid
1606    // going through the raw `ferriskey::Value` FCALL escape hatch.
1607    //
1608    // Same shape / same precedent as `suspend_by_triple` (cairn #322).
1609    //
1610    // Default impl returns [`EngineError::Unavailable`] so the trait
1611    // addition is non-breaking for out-of-tree backends. The in-tree
1612    // Valkey backend overrides; Postgres + SQLite keep the default
1613    // until follow-up parity work lands (consistent with
1614    // `issue_reclaim_grant` / `reclaim_execution` precedent).
1615
1616    /// Service-layer `complete_execution` — peer of [`Self::complete`]
1617    /// that takes a fence triple instead of a worker [`Handle`]. See
1618    /// the group preamble above for cairn-migration context.
1619    #[cfg(feature = "core")]
1620    async fn complete_execution(
1621        &self,
1622        _args: CompleteExecutionArgs,
1623    ) -> Result<CompleteExecutionResult, EngineError> {
1624        Err(EngineError::Unavailable {
1625            op: "complete_execution",
1626        })
1627    }
1628
1629    /// Service-layer `fail_execution` — peer of [`Self::fail`] that
1630    /// takes a fence triple instead of a worker [`Handle`].
1631    #[cfg(feature = "core")]
1632    async fn fail_execution(
1633        &self,
1634        _args: FailExecutionArgs,
1635    ) -> Result<FailExecutionResult, EngineError> {
1636        Err(EngineError::Unavailable {
1637            op: "fail_execution",
1638        })
1639    }
1640
1641    /// Service-layer `renew_lease` — peer of [`Self::renew`] that
1642    /// takes a fence triple instead of a worker [`Handle`].
1643    #[cfg(feature = "core")]
1644    async fn renew_lease(
1645        &self,
1646        _args: RenewLeaseArgs,
1647    ) -> Result<RenewLeaseResult, EngineError> {
1648        Err(EngineError::Unavailable { op: "renew_lease" })
1649    }
1650
1651    /// Service-layer `resume_execution` — transitions a suspended
1652    /// execution back to runnable. Distinct from
1653    /// [`Self::claim_from_resume_grant`] (which mints a worker handle
1654    /// against an already-eligible resumed execution): this method is
1655    /// the lifecycle transition primitive the control plane calls
1656    /// when an operator / auto-resume policy moves a suspended
1657    /// execution forward.
1658    ///
1659    /// The Valkey impl pre-reads `current_waitpoint_id` + `lane_id`
1660    /// from `exec_core` so callers only need the execution id + the
1661    /// trigger type — same ergonomics as `revoke_lease` reading
1662    /// `current_worker_instance_id` when callers omit it.
1663    #[cfg(feature = "core")]
1664    async fn resume_execution(
1665        &self,
1666        _args: ResumeExecutionArgs,
1667    ) -> Result<ResumeExecutionResult, EngineError> {
1668        Err(EngineError::Unavailable {
1669            op: "resume_execution",
1670        })
1671    }
1672
1673    /// Service-layer `check_admission_and_record` — atomic admission
1674    /// check against a quota policy. Callers supply the policy id +
1675    /// dimension (quota keys live on their own `{q:<policy>}`
1676    /// partition that cannot be derived from `execution_id`, so these
1677    /// travel outside [`CheckAdmissionArgs`]). `dimension` defaults
1678    /// to `"default"` inside the Valkey body when the caller passes
1679    /// an empty string — matches cairn's pre-migration default.
1680    #[cfg(feature = "core")]
1681    async fn check_admission(
1682        &self,
1683        _quota_policy_id: &crate::types::QuotaPolicyId,
1684        _dimension: &str,
1685        _args: CheckAdmissionArgs,
1686    ) -> Result<CheckAdmissionResult, EngineError> {
1687        Err(EngineError::Unavailable {
1688            op: "check_admission",
1689        })
1690    }
1691
1692    /// Service-layer `evaluate_flow_eligibility` — read-only check
1693    /// that returns the execution's current eligibility state
1694    /// (`eligible`, `blocked_by_dependencies`, or a backend-specific
1695    /// status string). Called by cairn's dependency-resolution path
1696    /// to decide whether a downstream execution can proceed.
1697    #[cfg(feature = "core")]
1698    async fn evaluate_flow_eligibility(
1699        &self,
1700        _args: EvaluateFlowEligibilityArgs,
1701    ) -> Result<EvaluateFlowEligibilityResult, EngineError> {
1702        Err(EngineError::Unavailable {
1703            op: "evaluate_flow_eligibility",
1704        })
1705    }
1706
1707    // ── #454 — cairn typed-FCALL additions (4) ─────────────────
1708
1709    /// Per-execution budget spend with tenant-open dimensions.
1710    ///
1711    /// Cairn #454 Q1/Q2: takes an open-set `BTreeMap<String, u64>` of
1712    /// deltas (distinct from the fixed-shape
1713    /// [`Self::report_usage`]/[`Self::report_usage_admin`] which use
1714    /// [`UsageDimensions`]). Return shape reuses [`ReportUsageResult`]
1715    /// — same `Ok` / `SoftBreach` / `HardBreach` / `AlreadyApplied`
1716    /// variants cairn's UI already branches on.
1717    ///
1718    /// The default impl returns [`EngineError::Unavailable`] so the
1719    /// trait remains additive for backends that have not landed the
1720    /// #454 body yet.
1721    #[cfg(feature = "core")]
1722    async fn record_spend(
1723        &self,
1724        _args: RecordSpendArgs,
1725    ) -> Result<ReportUsageResult, EngineError> {
1726        Err(EngineError::Unavailable {
1727            op: "record_spend",
1728        })
1729    }
1730
1731    /// Per-execution budget attribution release.
1732    ///
1733    /// Called on execution termination to reverse this execution's
1734    /// contribution to a budget counter. Per cairn #454 clarification
1735    /// this is **per-execution**, not a whole-budget flush — the
1736    /// budget persists across executions.
1737    ///
1738    /// The default impl returns [`EngineError::Unavailable`].
1739    #[cfg(feature = "core")]
1740    async fn release_budget(
1741        &self,
1742        _args: ReleaseBudgetArgs,
1743    ) -> Result<(), EngineError> {
1744        Err(EngineError::Unavailable {
1745            op: "release_budget",
1746        })
1747    }
1748
1749    /// Operator-driven approval-signal delivery.
1750    ///
1751    /// Pre-shaped variant of [`Self::deliver_signal`] where the caller
1752    /// **does not carry the waitpoint token**. The backend reads the
1753    /// token from `ff_waitpoint_pending`, HMAC-verifies server-side,
1754    /// and dispatches. Operator API never sees the token bytes.
1755    ///
1756    /// Cairn #454 Q3 — `signal_name` is a flat string (conventional
1757    /// values `"approved"` / `"rejected"`); audit metadata lives in
1758    /// cairn's audit log, not on the FF surface.
1759    ///
1760    /// The default impl returns [`EngineError::Unavailable`].
1761    #[cfg(feature = "core")]
1762    async fn deliver_approval_signal(
1763        &self,
1764        _args: DeliverApprovalSignalArgs,
1765    ) -> Result<DeliverSignalResult, EngineError> {
1766        Err(EngineError::Unavailable {
1767            op: "deliver_approval_signal",
1768        })
1769    }
1770
1771    /// Backend-atomic `issue_claim_grant` + `claim_execution`.
1772    ///
1773    /// Cairn #454 Q4 — composing the two primitives caller-side risks
1774    /// leaking a grant if `claim_execution` fails after
1775    /// `issue_claim_grant` succeeded. This method's contract is that
1776    /// the composition is backend-atomic: Valkey fuses them in one
1777    /// FCALL; PG/SQLite fuse them in one tx.
1778    ///
1779    /// The default impl **must not** be a chained call — it returns
1780    /// [`EngineError::Unavailable`] so consumers cannot accidentally
1781    /// use a non-atomic fallback.
1782    #[cfg(feature = "core")]
1783    async fn issue_grant_and_claim(
1784        &self,
1785        _args: IssueGrantAndClaimArgs,
1786    ) -> Result<ClaimGrantOutcome, EngineError> {
1787        Err(EngineError::Unavailable {
1788            op: "issue_grant_and_claim",
1789        })
1790    }
1791
1792    // ── RFC-017 Stage A — Budget + quota admin (5) ─────────────
1793
1794    /// Create a budget definition (row 6).
1795    #[cfg(feature = "core")]
1796    async fn create_budget(
1797        &self,
1798        _args: CreateBudgetArgs,
1799    ) -> Result<CreateBudgetResult, EngineError> {
1800        Err(EngineError::Unavailable {
1801            op: "create_budget",
1802        })
1803    }
1804
1805    /// Reset a budget's usage counters (row 10).
1806    #[cfg(feature = "core")]
1807    async fn reset_budget(
1808        &self,
1809        _args: ResetBudgetArgs,
1810    ) -> Result<ResetBudgetResult, EngineError> {
1811        Err(EngineError::Unavailable { op: "reset_budget" })
1812    }
1813
1814    /// Create a quota policy (row 7).
1815    #[cfg(feature = "core")]
1816    async fn create_quota_policy(
1817        &self,
1818        _args: CreateQuotaPolicyArgs,
1819    ) -> Result<CreateQuotaPolicyResult, EngineError> {
1820        Err(EngineError::Unavailable {
1821            op: "create_quota_policy",
1822        })
1823    }
1824
1825    /// Read-only budget status for operator visibility (row 8).
1826    #[cfg(feature = "core")]
1827    async fn get_budget_status(
1828        &self,
1829        _id: &BudgetId,
1830    ) -> Result<BudgetStatus, EngineError> {
1831        Err(EngineError::Unavailable {
1832            op: "get_budget_status",
1833        })
1834    }
1835
1836    /// Admin-path `report_usage` (row 9 + RFC-017 §5 round-1 F4).
1837    /// Distinct from the existing [`Self::report_usage`] which takes
1838    /// a worker handle — the admin path has no lease context.
1839    #[cfg(feature = "core")]
1840    async fn report_usage_admin(
1841        &self,
1842        _budget: &BudgetId,
1843        _args: ReportUsageAdminArgs,
1844    ) -> Result<ReportUsageResult, EngineError> {
1845        Err(EngineError::Unavailable {
1846            op: "report_usage_admin",
1847        })
1848    }
1849
1850    // ── RFC-017 Stage A — Read + diagnostics (3) ───────────────
1851
1852    /// Fetch the stored result payload for a completed execution
1853    /// (row 4). Returns `Ok(None)` when the execution is missing, not
1854    /// yet complete, or its payload was trimmed by retention policy.
1855    async fn get_execution_result(
1856        &self,
1857        _id: &ExecutionId,
1858    ) -> Result<Option<Vec<u8>>, EngineError> {
1859        Err(EngineError::Unavailable {
1860            op: "get_execution_result",
1861        })
1862    }
1863
1864    /// List the pending-or-active waitpoints for an execution, cursor
1865    /// paginated (row 5 / §8). Stage A preserves the existing
1866    /// `PendingWaitpointInfo` shape; Stage D ships the §8 HMAC
1867    /// sanitisation + `(token_kid, token_fingerprint)` schema.
1868    #[cfg(feature = "core")]
1869    async fn list_pending_waitpoints(
1870        &self,
1871        _args: ListPendingWaitpointsArgs,
1872    ) -> Result<ListPendingWaitpointsResult, EngineError> {
1873        Err(EngineError::Unavailable {
1874            op: "list_pending_waitpoints",
1875        })
1876    }
1877
1878    /// Backend-level reachability probe (row 1). Valkey: `PING`;
1879    /// Postgres: `SELECT 1`.
1880    async fn ping(&self) -> Result<(), EngineError> {
1881        Err(EngineError::Unavailable { op: "ping" })
1882    }
1883
1884    // ── RFC-025 worker registry ────────────────────────────────
1885    //
1886    // Four core primitives + Phase-6 `list_workers` readback. See
1887    // `rfcs/RFC-025-worker-registry.md` for the full design. Default
1888    // impls return `Unavailable` so out-of-tree backends keep
1889    // compiling; every in-tree backend overrides.
1890
1891    /// Register (or idempotently refresh) a worker instance. See RFC-025 §4.
1892    #[cfg(feature = "core")]
1893    async fn register_worker(
1894        &self,
1895        _args: RegisterWorkerArgs,
1896    ) -> Result<RegisterWorkerOutcome, EngineError> {
1897        Err(EngineError::Unavailable {
1898            op: "register_worker",
1899        })
1900    }
1901
1902    /// Refresh the worker-instance liveness TTL.
1903    #[cfg(feature = "core")]
1904    async fn heartbeat_worker(
1905        &self,
1906        _args: HeartbeatWorkerArgs,
1907    ) -> Result<HeartbeatWorkerOutcome, EngineError> {
1908        Err(EngineError::Unavailable {
1909            op: "heartbeat_worker",
1910        })
1911    }
1912
1913    /// Operator-driven worker death (distinct from passive TTL expiry).
1914    #[cfg(feature = "core")]
1915    async fn mark_worker_dead(
1916        &self,
1917        _args: MarkWorkerDeadArgs,
1918    ) -> Result<MarkWorkerDeadOutcome, EngineError> {
1919        Err(EngineError::Unavailable {
1920            op: "mark_worker_dead",
1921        })
1922    }
1923
1924    /// Enumerate expired leases for reclaim-decision tooling.
1925    #[cfg(feature = "suspension")]
1926    async fn list_expired_leases(
1927        &self,
1928        _args: ListExpiredLeasesArgs,
1929    ) -> Result<ListExpiredLeasesResult, EngineError> {
1930        Err(EngineError::Unavailable {
1931            op: "list_expired_leases",
1932        })
1933    }
1934
1935    /// Enumerate live workers (RFC-025 Phase 6, §9.4).
1936    #[cfg(feature = "core")]
1937    async fn list_workers(
1938        &self,
1939        _args: ListWorkersArgs,
1940    ) -> Result<ListWorkersResult, EngineError> {
1941        Err(EngineError::Unavailable {
1942            op: "list_workers",
1943        })
1944    }
1945
1946    // ── RFC-017 Stage A — Scheduling (1) ───────────────────────
1947
1948    /// Scheduler-routed claim entrypoint (row 18, RFC-017 §7). Valkey
1949    /// forwards to its `ff_scheduler::Scheduler` cursor; Postgres
1950    /// forwards to `PostgresScheduler`'s `FOR UPDATE SKIP LOCKED`
1951    /// path.
1952    ///
1953    /// Backends that carry an embedded scheduler (e.g. `ValkeyBackend`
1954    /// constructed via `with_embedded_scheduler`, or `PostgresBackend`
1955    /// with its `with_scanners` sibling) route the claim through it.
1956    /// Backends without a wired scheduler return
1957    /// [`EngineError::Unavailable`]. HTTP consumers use
1958    /// `FlowFabricWorker::claim_via_server` instead.
1959    #[cfg(feature = "core")]
1960    async fn claim_for_worker(
1961        &self,
1962        _args: ClaimForWorkerArgs,
1963    ) -> Result<ClaimForWorkerOutcome, EngineError> {
1964        Err(EngineError::Unavailable {
1965            op: "claim_for_worker",
1966        })
1967    }
1968
1969    // ── Cross-cutting (RFC-017 Stage B trait-lift) ──────────────
1970
1971    /// Static observability label identifying the backend family in
1972    /// logs + metrics (RFC-017 §5.4 + §9 Stage B). Default impl
1973    /// returns `"unknown"` so legacy `impl EngineBackend` blocks that
1974    /// have not upgraded keep compiling; every in-tree backend
1975    /// overrides — `ValkeyBackend` → `"valkey"`, `PostgresBackend` →
1976    /// `"postgres"`.
1977    fn backend_label(&self) -> &'static str {
1978        "unknown"
1979    }
1980
1981    /// Backend downcast escape hatch (v0.12 PR-7a transitional).
1982    ///
1983    /// Scanner supervisors in `ff-engine` still dispatch through a
1984    /// concrete `ferriskey::Client`; to keep the engine's public
1985    /// boundary backend-agnostic (`Arc<dyn EngineBackend>`) while the
1986    /// scanner internals remain Valkey-shaped, the engine downcasts
1987    /// via this method and reaches in for the embedded client. Every
1988    /// backend that wants to be consumed by `Engine::start_with_completions`
1989    /// overrides this to return `self` as `&dyn Any`; the default
1990    /// returns a placeholder so a stray `downcast_ref` fails cleanly
1991    /// rather than risking unsound behaviour.
1992    ///
1993    /// v0.13 (PR-7b) will trait-ify individual scanners onto
1994    /// `EngineBackend` and retire `ff-engine`'s dependence on this
1995    /// downcast path. The method itself will remain on the trait
1996    /// (likely deprecated) rather than be removed — removing a
1997    /// public trait method is a breaking change for external
1998    /// `impl EngineBackend` blocks.
1999    fn as_any(&self) -> &(dyn std::any::Any + 'static) {
2000        // Placeholder so the default does not expose `Self` for
2001        // downcast. Backends override to return `self`.
2002        &()
2003    }
2004
2005    /// RFC-018 Stage A: snapshot of this backend's identity + the
2006    /// flat `Supports` surface it can actually service. Consumers use
2007    /// this at startup to gate UI features / choose between alternative
2008    /// code paths before dispatching. See
2009    /// `rfcs/RFC-018-backend-capability-discovery.md` for the full
2010    /// discovery contract and the four owner-adjudicated open
2011    /// questions (granularity: coarse; version: struct; sync; no
2012    /// event stream).
2013    ///
2014    /// Default: returns a value tagged `family = "unknown"` with every
2015    /// `supports.*` bool `false`, so pre-RFC-018 out-of-tree backends
2016    /// keep compiling and consumers treat "all false" as "dispatch
2017    /// and catch [`EngineError::Unavailable`]" (pre-RFC-018 behaviour).
2018    /// Concrete in-tree backends (`ValkeyBackend`, `PostgresBackend`)
2019    /// override to populate a real value.
2020    ///
2021    /// Sync (no `.await`): backend-static info should not require a
2022    /// probe on every query. Dynamic probes happen once at
2023    /// `connect*` time and cache the result.
2024    fn capabilities(&self) -> crate::capability::Capabilities {
2025        crate::capability::Capabilities::new(
2026            crate::capability::BackendIdentity::new(
2027                "unknown",
2028                crate::capability::Version::new(0, 0, 0),
2029                "unknown",
2030            ),
2031            crate::capability::Supports::none(),
2032        )
2033    }
2034
2035    /// Issue #281: run one-time backend-specific boot preparation.
2036    ///
2037    /// Intended to run ONCE per deployment startup — NOT per request.
2038    /// Idempotent and safe for consumers to call on every application
2039    /// boot; backends that have nothing to do return
2040    /// [`PrepareOutcome::NoOp`] without side effects.
2041    ///
2042    /// Per-backend behaviour:
2043    ///
2044    /// * **Valkey** — issues `FUNCTION LOAD REPLACE` for the
2045    ///   `flowfabric` Lua library (with bounded retry on transient
2046    ///   transport faults; permanent compile errors surface as
2047    ///   [`EngineError::Transport`] without retry). Returns
2048    ///   [`PrepareOutcome::Applied`] carrying
2049    ///   `"FUNCTION LOAD (flowfabric lib v<N>)"`.
2050    /// * **Postgres** — returns [`PrepareOutcome::NoOp`]. Schema
2051    ///   migrations are applied out-of-band per
2052    ///   `rfcs/drafts/v0.7-migration-master.md §Q12`; the backend
2053    ///   runs a schema-version check at connect time and refuses to
2054    ///   start on mismatch, so no boot-side prepare work remains.
2055    /// * **Default impl** — returns [`PrepareOutcome::NoOp`] so
2056    ///   out-of-tree backends without preparation work compile
2057    ///   without boilerplate.
2058    ///
2059    /// # Relationship to the in-tree boot path
2060    ///
2061    /// `ValkeyBackend::initialize_deployment` (called from
2062    /// `Server::start_with_metrics`) already invokes
2063    /// [`ensure_library`](ff_script::loader::ensure_library) inline as
2064    /// its step 4; that path is unchanged. `prepare()` exists as a
2065    /// **trait-surface entry point** so consumers that construct an
2066    /// `Arc<dyn EngineBackend>` outside of `Server` (e.g.
2067    /// cairn-fabric's boot path at `cairn-fabric/src/boot.rs`) can
2068    /// run the same preparation without reaching into
2069    /// backend-specific modules. The overlap is intentional: calling
2070    /// both `prepare()` and `initialize_deployment` is safe because
2071    /// `FUNCTION LOAD REPLACE` is idempotent under the version
2072    /// check.
2073    ///
2074    /// # Layer forwarding
2075    ///
2076    /// Layer impls (`HookedBackend`, ff-sdk layers) do NOT forward
2077    /// `prepare` today — consistent with `backend_label` / `ping` /
2078    /// `shutdown_prepare`. Consumers that wrap a backend in layers
2079    /// MUST call `prepare()` on the raw backend before wrapping, or
2080    /// accept the default [`PrepareOutcome::NoOp`].
2081    async fn prepare(&self) -> Result<PrepareOutcome, EngineError> {
2082        Ok(PrepareOutcome::NoOp)
2083    }
2084
2085    /// Drain-before-shutdown hook (RFC-017 §5.4). The server calls
2086    /// this before draining its own background tasks so backend-
2087    /// scoped primitives (Valkey stream semaphore, Postgres sqlx
2088    /// pool, …) can close their gates and await in-flight work up to
2089    /// `grace`.
2090    ///
2091    /// Default impl returns `Ok(())` — a no-op backend has nothing
2092    /// backend-scoped to drain. Concrete backends whose data plane
2093    /// owns resources (connection pools, semaphores, listeners)
2094    /// override with a real body.
2095    async fn shutdown_prepare(&self, _grace: Duration) -> Result<(), EngineError> {
2096        Ok(())
2097    }
2098
2099    // ── RFC-017 Stage E2 — `Server::client` removal (header + reads) ───
2100
2101    /// RFC-017 Stage E2: the "header" portion of `cancel_flow` — run the
2102    /// atomic flow-state flip (Valkey: `ff_cancel_flow` FCALL; Postgres:
2103    /// `cancel_flow_once` tx), decode policy + membership, and surface
2104    /// the `flow_already_terminal` idempotency branch as a first-class
2105    /// [`CancelFlowHeader::AlreadyTerminal`] so the Server can build
2106    /// the wire [`CancelFlowResult`] without reaching for a raw
2107    /// `Client`. Separate from the existing
2108    /// [`EngineBackend::cancel_flow`] entry point (which takes the
2109    /// enum-typed `(policy, wait)` split and returns the wait-collapsed
2110    /// `CancelFlowResult`) because the Server owns its own
2111    /// wait-dispatch + member-cancel machinery via
2112    /// [`EngineBackend::cancel_execution`] + backlog ack.
2113    ///
2114    /// Default impl returns [`EngineError::Unavailable`] so un-migrated
2115    /// backends surface the miss loudly.
2116    #[cfg(feature = "core")]
2117    async fn cancel_flow_header(
2118        &self,
2119        _args: CancelFlowArgs,
2120    ) -> Result<crate::contracts::CancelFlowHeader, EngineError> {
2121        Err(EngineError::Unavailable {
2122            op: "cancel_flow_header",
2123        })
2124    }
2125
2126    /// RFC-017 Stage E2: best-effort acknowledgement that one member of
2127    /// a `cancel_all` flow has completed its per-member cancel. Drains
2128    /// the member from the flow's `pending_cancels` set and, if empty,
2129    /// removes the flow from the partition-level `cancel_backlog`
2130    /// (Valkey: `ff_ack_cancel_member` FCALL; Postgres: table write —
2131    /// default `Unavailable` until Wave 9).
2132    ///
2133    /// Failures are swallowed by the caller — the cancel-backlog
2134    /// reconciler is the authoritative drain — but a typed error here
2135    /// lets the caller log a backend-scoped context string.
2136    #[cfg(feature = "core")]
2137    async fn ack_cancel_member(
2138        &self,
2139        _flow_id: &FlowId,
2140        _execution_id: &ExecutionId,
2141    ) -> Result<(), EngineError> {
2142        Err(EngineError::Unavailable {
2143            op: "ack_cancel_member",
2144        })
2145    }
2146
2147    /// RFC-017 Stage E2: full-shape execution read used by the
2148    /// `GET /v1/executions/{id}` HTTP route. Returns the legacy
2149    /// [`ExecutionInfo`] wire shape (not the decoupled
2150    /// [`ExecutionSnapshot`]) so the existing HTTP response bytes stay
2151    /// identical across the migration.
2152    ///
2153    /// `Ok(None)` ⇒ no such execution. Default `Unavailable` because
2154    /// the Valkey HGETALL-and-parse is backend-specific.
2155    #[cfg(feature = "core")]
2156    async fn read_execution_info(
2157        &self,
2158        _id: &ExecutionId,
2159    ) -> Result<Option<ExecutionInfo>, EngineError> {
2160        Err(EngineError::Unavailable {
2161            op: "read_execution_info",
2162        })
2163    }
2164
2165    /// RFC-017 Stage E2: narrow `public_state` read used by the
2166    /// `GET /v1/executions/{id}/state` HTTP route. Returns `Ok(None)`
2167    /// when the execution is missing. Default `Unavailable`.
2168    #[cfg(feature = "core")]
2169    async fn read_execution_state(
2170        &self,
2171        _id: &ExecutionId,
2172    ) -> Result<Option<PublicState>, EngineError> {
2173        Err(EngineError::Unavailable {
2174            op: "read_execution_state",
2175        })
2176    }
2177
2178    // ── RFC-019 Stage A/B/C — Stream-cursor subscriptions ─────────
2179    //
2180    // Four owner-adjudicated families (RFC-019 §Open Questions #5):
2181    // `lease_history`, `completion`, `signal_delivery`,
2182    // `instance_tags`. Stage C (this crate) promotes each family to
2183    // a typed event enum; consumers `match` on variants instead of
2184    // parsing a backend-shaped byte blob.
2185    //
2186    // Each method returns a family-specific subscription alias (see
2187    // [`crate::stream_events`]). All defaults return
2188    // `EngineError::Unavailable` per RFC-017 trait-growth conventions.
2189
2190    /// Subscribe to lease lifecycle events (acquired / renewed /
2191    /// expired / reclaimed / revoked) for the partition this backend
2192    /// is configured with.
2193    ///
2194    /// Cross-partition fan-out is consumer-side merge: subscribe
2195    /// per-partition backend instance and interleave on the read
2196    /// side. Yields
2197    /// `Err(EngineError::StreamDisconnected { cursor })` on backend
2198    /// disconnect; resume by calling this method again with the
2199    /// returned cursor.
2200    ///
2201    /// `filter` (#282): when `filter.instance_tag` is `Some((k, v))`,
2202    /// only events whose execution carries tag `k = v` are yielded
2203    /// (matching the [`crate::backend::ScannerFilter`] surface from
2204    /// #122). Pass `&ScannerFilter::default()` for unfiltered
2205    /// behaviour. Filtering happens inside the backend stream; the
2206    /// [`crate::stream_events::LeaseHistorySubscription`] return type
2207    /// is unchanged.
2208    async fn subscribe_lease_history(
2209        &self,
2210        _cursor: crate::stream_subscribe::StreamCursor,
2211        _filter: &crate::backend::ScannerFilter,
2212    ) -> Result<crate::stream_events::LeaseHistorySubscription, EngineError> {
2213        Err(EngineError::Unavailable {
2214            op: "subscribe_lease_history",
2215        })
2216    }
2217
2218    /// Subscribe to completion events (terminal state transitions).
2219    ///
2220    /// - **Postgres**: wraps the `ff_completion_event` outbox +
2221    ///   LISTEN/NOTIFY machinery. Durable via event-id cursor.
2222    /// - **Valkey**: wraps the RESP3 `ff:dag:completions` pubsub
2223    ///   subscriber. Pubsub is at-most-once over the live
2224    ///   subscription window; the cursor is always the empty
2225    ///   sentinel. If you need at-least-once replay with durable
2226    ///   cursor resume, use the Postgres backend (see
2227    ///   `docs/POSTGRES_PARITY_MATRIX.md` row `subscribe_completion`).
2228    ///
2229    /// `filter` (#282): see [`Self::subscribe_lease_history`]. Valkey
2230    /// reuses the `subscribe_completions_filtered` per-event HGET
2231    /// gate; Postgres filters inline against the outbox's denormalised
2232    /// `instance_tag` column.
2233    async fn subscribe_completion(
2234        &self,
2235        _cursor: crate::stream_subscribe::StreamCursor,
2236        _filter: &crate::backend::ScannerFilter,
2237    ) -> Result<crate::stream_events::CompletionSubscription, EngineError> {
2238        Err(EngineError::Unavailable {
2239            op: "subscribe_completion",
2240        })
2241    }
2242
2243    /// Subscribe to signal-delivery events (satisfied / buffered /
2244    /// deduped).
2245    ///
2246    /// `filter` (#282): see [`Self::subscribe_lease_history`].
2247    async fn subscribe_signal_delivery(
2248        &self,
2249        _cursor: crate::stream_subscribe::StreamCursor,
2250        _filter: &crate::backend::ScannerFilter,
2251    ) -> Result<crate::stream_events::SignalDeliverySubscription, EngineError> {
2252        Err(EngineError::Unavailable {
2253            op: "subscribe_signal_delivery",
2254        })
2255    }
2256
2257    /// Subscribe to instance-tag events (tag attached / cleared).
2258    ///
2259    /// Producer wiring is deferred per #311 audit ("no concrete
2260    /// demand"); the trait method exists for API uniformity across
2261    /// the four families. Backends currently return
2262    /// `EngineError::Unavailable`.
2263    async fn subscribe_instance_tags(
2264        &self,
2265        _cursor: crate::stream_subscribe::StreamCursor,
2266    ) -> Result<crate::stream_events::InstanceTagSubscription, EngineError> {
2267        Err(EngineError::Unavailable {
2268            op: "subscribe_instance_tags",
2269        })
2270    }
2271
2272    // ── PR-7b Cluster 1 — Foundation scanner operations ─────────
2273    //
2274    // Per-execution write hooks invoked by the engine scanner loop.
2275    // Scanner bodies discover candidate executions via partition
2276    // indices (Valkey ZSETs / PG index scans) and, for each candidate,
2277    // call through to one of the methods below to perform the atomic
2278    // state-flip. Defaults return `EngineError::Unavailable { op }`;
2279    // Valkey impls wrap the corresponding `ff_*` FCALL; Postgres impls
2280    // run a single-row tx mirroring the Lua semantic.
2281
2282    /// Lease-expiry scanner hook — mark an expired lease as reclaimable
2283    /// so another worker can redeem the execution. Atomic per call.
2284    ///
2285    /// Valkey: `FCALL ff_mark_lease_expired_if_due`.
2286    /// Postgres: single-row tx on `ff_attempt` + `ff_exec_core` (see
2287    /// `ff_backend_postgres::reconcilers::lease_expiry`).
2288    async fn mark_lease_expired_if_due(
2289        &self,
2290        _partition: crate::partition::Partition,
2291        _execution_id: &ExecutionId,
2292    ) -> Result<(), EngineError> {
2293        Err(EngineError::Unavailable {
2294            op: "mark_lease_expired_if_due",
2295        })
2296    }
2297
2298    /// Delayed-promoter scanner hook — promote a delayed execution to
2299    /// `eligible_now` once its `delay_until` has passed.
2300    ///
2301    /// Valkey: `FCALL ff_promote_delayed`.
2302    /// Postgres: Wave 9 schema scope (no current impl).
2303    async fn promote_delayed(
2304        &self,
2305        _partition: crate::partition::Partition,
2306        _lane: &LaneId,
2307        _execution_id: &ExecutionId,
2308        _now_ms: TimestampMs,
2309    ) -> Result<(), EngineError> {
2310        Err(EngineError::Unavailable {
2311            op: "promote_delayed",
2312        })
2313    }
2314
2315    /// Pending-waitpoint-expiry scanner hook — close a pending
2316    /// waitpoint whose deadline has passed (wake the suspended
2317    /// execution with a timeout signal).
2318    ///
2319    /// Valkey: `FCALL ff_close_waitpoint`.
2320    /// Postgres: Wave 9 schema scope (no current impl).
2321    async fn close_waitpoint(
2322        &self,
2323        _partition: crate::partition::Partition,
2324        _execution_id: &ExecutionId,
2325        _waitpoint_id: &str,
2326        _now_ms: TimestampMs,
2327    ) -> Result<(), EngineError> {
2328        Err(EngineError::Unavailable {
2329            op: "close_waitpoint",
2330        })
2331    }
2332
2333    /// Shared hook for the attempt-timeout and execution-deadline
2334    /// scanners — terminate or retry an execution whose wall-clock
2335    /// budget has elapsed. `phase` discriminates which of the two
2336    /// scanner paths is calling so the backend can preserve diagnostic
2337    /// breadcrumbs without forking the surface.
2338    ///
2339    /// Valkey: `FCALL ff_expire_execution` (with `phase` passed through
2340    /// as an ARGV discriminator).
2341    /// Postgres: single-row tx mirror of the Lua semantic for
2342    /// `AttemptTimeout`; `ExecutionDeadline` is Wave 9 schema scope.
2343    async fn expire_execution(
2344        &self,
2345        _partition: crate::partition::Partition,
2346        _execution_id: &ExecutionId,
2347        _phase: ExpirePhase,
2348        _now_ms: TimestampMs,
2349    ) -> Result<(), EngineError> {
2350        Err(EngineError::Unavailable {
2351            op: "expire_execution",
2352        })
2353    }
2354
2355    /// Suspension-timeout scanner hook — expire a suspended execution
2356    /// whose suspension deadline has passed (wake with timeout).
2357    ///
2358    /// Valkey: `FCALL ff_expire_suspension`.
2359    /// Postgres: single-row tx on `ff_suspend` + `ff_exec_core`.
2360    async fn expire_suspension(
2361        &self,
2362        _partition: crate::partition::Partition,
2363        _execution_id: &ExecutionId,
2364        _now_ms: TimestampMs,
2365    ) -> Result<(), EngineError> {
2366        Err(EngineError::Unavailable {
2367            op: "expire_suspension",
2368        })
2369    }
2370
2371    // ── PR-7b Cluster 2 — Reconciler scanner operations ─────────
2372    //
2373    // Per-execution write hooks invoked by the `unblock` scanner. The
2374    // other two cluster-2 scanners (`budget_reset`, `dependency_reconciler`)
2375    // route through `reset_budget` + `resolve_dependency`, which are
2376    // already part of the RFC-017 service-layer surface and live above.
2377
2378    /// Unblock-scanner hook — move an execution from a blocked ZSET back
2379    /// to `eligible_now` once its blocking condition has cleared (budget
2380    /// under limit, quota window drained, or a capable worker has come
2381    /// online). `expected_blocking_reason` discriminates which of the
2382    /// `blocked:{budget,quota,route}` sets the execution is leaving and
2383    /// also fences against a stale unblock (Lua rejects if the core's
2384    /// `blocking_reason` no longer matches).
2385    ///
2386    /// # Backend status
2387    ///
2388    /// - **Valkey:** wraps `ff_unblock_execution` (RFC-010 §6). Atomic
2389    ///   single-slot FCALL on `{p:N}`.
2390    /// - **Postgres:** `Unavailable` (structural). PG does not persist a
2391    ///   per-reason `blocked:{budget,quota,route}` index — scheduler
2392    ///   eligibility is re-evaluated live via SQL predicates on
2393    ///   `ff_exec_core` + budget / quota tables (see
2394    ///   `ff_backend_postgres::scheduler`). Nothing to reconcile, so the
2395    ///   engine's PG scanner loop does not run this path; callers on
2396    ///   a PG deployment receive the typed `Unavailable` per PR-7b/final
2397    ///   contract.
2398    /// - **SQLite:** `Unavailable` for the same reason as Postgres.
2399    async fn unblock_execution(
2400        &self,
2401        _partition: crate::partition::Partition,
2402        _lane_id: &LaneId,
2403        _execution_id: &ExecutionId,
2404        _expected_blocking_reason: &str,
2405        _now_ms: TimestampMs,
2406    ) -> Result<(), EngineError> {
2407        Err(EngineError::Unavailable {
2408            op: "unblock_execution",
2409        })
2410    }
2411
2412    /// RFC-016 Stage C — sibling-cancel group drain.
2413    ///
2414    /// After `edge_cancel_dispatcher` has issued
2415    /// [`EngineBackend::cancel_execution`] against every listed
2416    /// sibling of a `(flow_id, downstream_eid)` group, this trait
2417    /// method atomically removes the tuple from the partition-local
2418    /// pending-cancel-groups index and clears the per-group flag +
2419    /// members state.
2420    ///
2421    /// Valkey: `FCALL ff_drain_sibling_cancel_group` (SREM +
2422    /// HDEL in one Lua unit).
2423    /// Postgres: `Unavailable` — PG's Wave-6b
2424    /// `reconcilers::edge_cancel_dispatcher::dispatcher_tick`
2425    /// owns the equivalent row drain inside its own per-group
2426    /// transaction; the Valkey-shaped per-tuple call is not used on
2427    /// the PG scanner path.
2428    /// SQLite: `Unavailable` (mirrors PG; RFC-023 scope).
2429    #[cfg(feature = "core")]
2430    async fn drain_sibling_cancel_group(
2431        &self,
2432        _flow_partition: crate::partition::Partition,
2433        _flow_id: &FlowId,
2434        _downstream_eid: &ExecutionId,
2435    ) -> Result<(), EngineError> {
2436        Err(EngineError::Unavailable {
2437            op: "drain_sibling_cancel_group",
2438        })
2439    }
2440
2441    /// RFC-016 Stage D — sibling-cancel group reconcile (Invariant
2442    /// Q6 safety net).
2443    ///
2444    /// Re-examines a `(flow_id, downstream_eid)` tuple in the
2445    /// partition-local pending-cancel-groups index and returns one
2446    /// of three atomic dispositions — see
2447    /// [`SiblingCancelReconcileAction`] — so the crash-recovery
2448    /// reconciler can drain stale or completed tuples without
2449    /// fighting the Stage-C dispatcher.
2450    ///
2451    /// Valkey: `FCALL ff_reconcile_sibling_cancel_group`.
2452    /// Postgres: `Unavailable` — PG's
2453    /// `reconcilers::edge_cancel_reconciler::reconciler_tick`
2454    /// owns the row-level reconcile inside its own batched tx.
2455    /// SQLite: `Unavailable` (mirrors PG).
2456    #[cfg(feature = "core")]
2457    async fn reconcile_sibling_cancel_group(
2458        &self,
2459        _flow_partition: crate::partition::Partition,
2460        _flow_id: &FlowId,
2461        _downstream_eid: &ExecutionId,
2462    ) -> Result<SiblingCancelReconcileAction, EngineError> {
2463        Err(EngineError::Unavailable {
2464            op: "reconcile_sibling_cancel_group",
2465        })
2466    }
2467
2468    // ── PR-7b Cluster 2b-A — Tally-recompute reconciler scanners ─────
2469    //
2470    // Coarse trait methods: each runs a full scan-and-fix pass over
2471    // one partition. Unlike cluster-2's per-candidate write hooks,
2472    // these scanners do multi-round-trip discovery (SSCAN / HMGET /
2473    // HGETALL / ZSCORE / ZREMRANGEBYSCORE) interleaved with conditional
2474    // writes. Moving the whole pass into the trait impl achieves the
2475    // PR-7b/final contract ("scanner trait leaks no `ferriskey::Client`")
2476    // without atomising operations that are inherently not atomic.
2477    //
2478    // Backend status (all three methods):
2479    //
2480    // - **Valkey:** real scan-loop in the trait impl; identical command
2481    //   shape to the pre-routing scanner path.
2482    // - **Postgres:** default `Unavailable`. PG mutates budget / quota
2483    //   counters and scheduling state inside SERIALIZABLE transactions,
2484    //   and the scheduler evaluates eligibility via live SQL predicates
2485    //   on `ff_exec_core` + budget / quota tables — no persistent index
2486    //   or counter hash that can drift, hence nothing to reconcile. The
2487    //   engine's PG scanner supervisor does not run these paths.
2488    // - **SQLite:** same as Postgres (RFC-023: SQLite runs its own
2489    //   scanner supervisor; these FF-owned tally-recompute scanners
2490    //   are not registered).
2491
2492    /// Index-reconciler pass — walks `ff:idx:{p:N}:all_executions`
2493    /// and verifies each execution appears in the correct scheduling
2494    /// sorted set (`eligible` / `delayed` / `blocked:*` / `active` /
2495    /// `suspended` / `terminal`) for its current `(lifecycle_phase,
2496    /// eligibility_state, ownership_state)` triple. Phase 1 is
2497    /// log-only; auto-fix is deferred to a later phase (RFC-010 §6.14).
2498    async fn reconcile_execution_index(
2499        &self,
2500        _partition: crate::partition::Partition,
2501        _lanes: &[LaneId],
2502        _filter: &crate::backend::ScannerFilter,
2503    ) -> Result<ReconcileCounts, EngineError> {
2504        Err(EngineError::Unavailable {
2505            op: "reconcile_execution_index",
2506        })
2507    }
2508
2509    /// Budget-reconciler pass — walks `ff:budget:{b:M}:policies_idx`,
2510    /// reads each budget's definition / usage / limits, and reconciles
2511    /// the `breached_at` marker against hard limits. Resetting budgets
2512    /// (non-zero `reset_interval_ms`) are skipped — they are handled
2513    /// by the `budget_reset` scanner (cluster 2). Drops index entries
2514    /// for budgets whose definition hash has been deleted (retention
2515    /// purge / manual). RFC-008 §Budget Reconciliation, RFC-010 §6.5.
2516    async fn reconcile_budget_counters(
2517        &self,
2518        _partition: crate::partition::Partition,
2519        _now_ms: TimestampMs,
2520    ) -> Result<ReconcileCounts, EngineError> {
2521        Err(EngineError::Unavailable {
2522            op: "reconcile_budget_counters",
2523        })
2524    }
2525
2526    /// Quota-reconciler pass — walks `ff:quota:{q:M}:policies_idx`,
2527    /// trims expired entries from rate-limit sliding windows, and
2528    /// recomputes each policy's concurrency counter by walking its
2529    /// `admitted_set` and pruning entries whose admission guard key
2530    /// has TTLed out. RFC-008 §Quota Reconciliation, RFC-010 §6.6.
2531    async fn reconcile_quota_counters(
2532        &self,
2533        _partition: crate::partition::Partition,
2534        _now_ms: TimestampMs,
2535    ) -> Result<ReconcileCounts, EngineError> {
2536        Err(EngineError::Unavailable {
2537            op: "reconcile_quota_counters",
2538        })
2539    }
2540
2541    // ── PR-7b Cluster 2b-B — Projection + retention scanners ──────
2542    //
2543    // Two additional non-FCALL scanners routed through the trait so
2544    // engine-side scanners stop touching `ferriskey::Client` directly.
2545    // Both ride on aggregate reads + derived writes; neither is a thin
2546    // single-FCALL passthrough.
2547
2548    /// Flow-projector scanner hook — sample flow members, derive an
2549    /// aggregate `public_flow_state` + per-state counts, and write them
2550    /// to the flow summary projection. Returns `Ok(true)` when the
2551    /// summary was updated, `Ok(false)` when the flow had no members
2552    /// or the index entry was defensively pruned (core missing).
2553    ///
2554    /// The derived `public_flow_state` written here is distinct from
2555    /// `ff_flow_core.public_flow_state` — the former is a rollup
2556    /// dashboard field, the latter is the authoritative mutation-guard
2557    /// state owned by `create_flow` / `cancel_flow`. See
2558    /// `ff_engine::scanner::flow_projector` module doc for the
2559    /// two-sources contract.
2560    ///
2561    /// # Backend status
2562    ///
2563    /// - **Valkey:** lifts the pre-PR-7b Rust-composed
2564    ///   SCARD + SRANDMEMBER + per-member HGET + HSET summary pattern.
2565    ///   No new Lua function; the aggregation is inherently
2566    ///   multi-round-trip (cross-partition member reads) and atomicity
2567    ///   is neither required nor achievable against 256 partitions.
2568    /// - **Postgres:** SELECT aggregates from `ff_exec_core` grouped by
2569    ///   `public_state` for the flow's members, INSERT ... ON CONFLICT
2570    ///   DO UPDATE into `ff_flow_summary` (migration 0019). One query
2571    ///   per flow; partition-local aggregation.
2572    /// - **SQLite:** `Unavailable` per RFC-023 Phase 3.5 (flow summary
2573    ///   projection is a shared-deployment dashboard feature; local
2574    ///   single-tenant SQLite deployments don't need it).
2575    #[cfg(feature = "core")]
2576    async fn project_flow_summary(
2577        &self,
2578        _partition: crate::partition::Partition,
2579        _flow_id: &FlowId,
2580        _now_ms: TimestampMs,
2581    ) -> Result<bool, EngineError> {
2582        Err(EngineError::Unavailable {
2583            op: "project_flow_summary",
2584        })
2585    }
2586
2587    /// Retention-trimmer scanner hook — delete terminal executions and
2588    /// all their subordinate keys/rows once they are older than the
2589    /// configured retention window. Returns the number of executions
2590    /// actually purged in this call (so the scanner can loop when it
2591    /// hits `batch_size`).
2592    ///
2593    /// Retention trimming is inherently a scan-and-delete loop over
2594    /// time; the trait surface exists to remove engine-side Valkey
2595    /// coupling, **not** to atomise the operation into a single
2596    /// round-trip. Implementations may issue multiple round-trips
2597    /// (e.g. per-execution cascade across sibling tables); the trait
2598    /// contract is "make progress, bounded by batch_size".
2599    ///
2600    /// # Backend status
2601    ///
2602    /// - **Valkey:** lifts the pre-PR-7b ZRANGEBYSCORE + per-exec
2603    ///   cascade-delete loop verbatim. Cluster-safe (all keys for one
2604    ///   execution live on the same `{p:N}` slot).
2605    /// - **Postgres:** `DELETE FROM ff_exec_core` for terminal rows
2606    ///   past the cutoff plus explicit cascade DELETEs on every
2607    ///   execution-scoped sibling table (no FK CASCADE in the schema).
2608    ///   Single transaction per batch.
2609    /// - **SQLite:** `Unavailable` per RFC-023 Phase 3.5. Single-tenant
2610    ///   local deployments typically manage their own DB lifecycle and
2611    ///   do not need a retention scanner.
2612    #[cfg(feature = "core")]
2613    async fn trim_retention(
2614        &self,
2615        _partition: crate::partition::Partition,
2616        _lane_id: &LaneId,
2617        _retention_ms: u64,
2618        _now_ms: TimestampMs,
2619        _batch_size: u32,
2620        _filter: &crate::backend::ScannerFilter,
2621    ) -> Result<u32, EngineError> {
2622        Err(EngineError::Unavailable {
2623            op: "trim_retention",
2624        })
2625    }
2626
2627    // ── PR-7b Wave 0a: exec_core field read primitive ──
2628
2629    /// Point-read N fields from the `exec_core` hash for a given
2630    /// execution. Returns a map of field-name → Option<String>
2631    /// (None for fields absent or stored as NULL). Scanner call
2632    /// sites formerly issuing raw `HGET`/`HMGET` on `ExecKeyContext::core()`
2633    /// route through this trait method (cairn #436 / PR-7b Wave 0a).
2634    ///
2635    /// Field values are coerced to String at the trait boundary for
2636    /// wire compatibility with the Valkey HGET shape. Consumers parse
2637    /// specific fields (`lane_id`, `current_attempt_index`, etc.) from
2638    /// the returned strings as needed.
2639    ///
2640    /// - **Valkey:** `HMGET exec_core_key f1 f2 ...`, zipping names to values.
2641    /// - **Postgres:** `SELECT` against `ff_exec_core` with dynamic column
2642    ///   extraction; fields in `raw_fields` JSONB are extracted via `->>`.
2643    /// - **SQLite:** equivalent with `json_extract(raw_fields, '$.field')`.
2644    ///
2645    /// Default body returns `Unavailable` so non-v0.13 backends remain
2646    /// compile-compatible.
2647    #[cfg(feature = "core")]
2648    async fn read_exec_core_fields(
2649        &self,
2650        _partition: crate::partition::Partition,
2651        _execution_id: &crate::types::ExecutionId,
2652        _fields: &[&str],
2653    ) -> Result<std::collections::HashMap<String, Option<String>>, EngineError> {
2654        Err(EngineError::Unavailable {
2655            op: "read_exec_core_fields",
2656        })
2657    }
2658
2659    // ── PR-7b Wave 0a: backend clock primitive ──
2660
2661    /// Returns the backend's current wall-clock epoch milliseconds.
2662    ///
2663    /// Used by 15 scanners to compute "due" thresholds before issuing
2664    /// per-partition due-scans. Previously a Valkey-only helper in
2665    /// `ff-engine`'s `scanner::lease_expiry` issuing `TIME`; this
2666    /// trait method is the backend-agnostic replacement (cairn #436 /
2667    /// PR-7b Wave 0a).
2668    ///
2669    /// Every in-tree backend overrides. The default falls back to
2670    /// `SystemTime::now()` so out-of-tree `EngineBackend` impls (e.g.
2671    /// cairn mocks, test doubles) stay source-compatible across v0.12
2672    /// → v0.13.
2673    ///
2674    /// - **Valkey:** `TIME` command.
2675    /// - **Postgres:** `SELECT (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint`
2676    ///   (not `now()` — `now()` is the transaction start timestamp,
2677    ///   which is stale under any long-running tx; scanners need the
2678    ///   true wall-clock read).
2679    /// - **SQLite:** `SELECT CAST((julianday('now') - 2440587.5) * 86400000 AS INTEGER)`.
2680    #[cfg(feature = "core")]
2681    async fn server_time_ms(&self) -> Result<u64, EngineError> {
2682        use std::time::{SystemTime, UNIX_EPOCH};
2683        let d = SystemTime::now()
2684            .duration_since(UNIX_EPOCH)
2685            .map_err(|e| EngineError::Transport {
2686                backend: "system-clock",
2687                source: format!("server_time_ms default: {e}").into(),
2688            })?;
2689        Ok(d.as_millis() as u64)
2690    }
2691}
2692
2693/// RFC-016 Stage D outcome of a single
2694/// [`EngineBackend::reconcile_sibling_cancel_group`] call.
2695///
2696/// Cardinality is intentionally bounded to three so the
2697/// `ff_sibling_cancel_reconcile_total{action}` metric label stays
2698/// closed.
2699#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2700pub enum SiblingCancelReconcileAction {
2701    /// Pending tuple was stale (flag cleared or edgegroup absent);
2702    /// SREM'd / DELETE'd without touching the group.
2703    SremmedStale,
2704    /// Flag still set but every listed sibling is already terminal —
2705    /// an interrupted drain. Flag cleared + tuple drained.
2706    CompletedDrain,
2707    /// Flag set and at least one sibling non-terminal — dispatcher
2708    /// owns this tuple; left untouched.
2709    NoOp,
2710}
2711
2712impl SiblingCancelReconcileAction {
2713    /// Short label for observability (matches the Lua + PG action
2714    /// strings exactly).
2715    pub fn as_str(&self) -> &'static str {
2716        match self {
2717            Self::SremmedStale => "sremmed_stale",
2718            Self::CompletedDrain => "completed_drain",
2719            Self::NoOp => "no_op",
2720        }
2721    }
2722}
2723
2724/// Result of one reconciler scan pass over a single partition.
2725///
2726/// `processed` counts candidates where the reconciler detected a
2727/// correctable condition (drift, breach flip, stale guard). `errors`
2728/// counts transport / parse failures. Scanners sum these into the
2729/// engine's per-pass metrics exactly as they did before PR-7b/2b-A.
2730#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
2731pub struct ReconcileCounts {
2732    /// Items the scanner corrected (drift fix, breach flip, guard
2733    /// eviction). Zero when the partition is healthy.
2734    pub processed: u32,
2735    /// Items the scanner could not process due to transport or parse
2736    /// errors. Non-zero values surface through scanner metrics and
2737    /// should be investigated.
2738    pub errors: u32,
2739}
2740
2741/// Which scanner invoked [`EngineBackend::expire_execution`].
2742///
2743/// The attempt-timeout and execution-deadline scanners share a single
2744/// trait method — their underlying state flip is identical (terminate
2745/// or retry per retry policy); the distinction is purely diagnostic
2746/// (which deadline elapsed) and carried through to the backend so the
2747/// same Lua / SQL path can log or tag appropriately.
2748#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2749pub enum ExpirePhase {
2750    /// Invoked by `attempt_timeout` scanner — the per-attempt
2751    /// wall-clock budget elapsed.
2752    AttemptTimeout,
2753    /// Invoked by `execution_deadline` scanner — the whole-execution
2754    /// wall-clock deadline elapsed.
2755    ExecutionDeadline,
2756}
2757
2758impl ExpirePhase {
2759    /// Short string tag suitable for Lua ARGV or Postgres breadcrumbs.
2760    pub fn as_str(&self) -> &'static str {
2761        match self {
2762            Self::AttemptTimeout => "attempt_timeout",
2763            Self::ExecutionDeadline => "execution_deadline",
2764        }
2765    }
2766}
2767
2768/// Object-safety assertion: `dyn EngineBackend` compiles iff every
2769/// method is dyn-compatible. Kept as a compile-time guard so a future
2770/// trait change that accidentally breaks dyn-safety fails the build
2771/// at this site rather than at every downstream `Arc<dyn
2772/// EngineBackend>` use.
2773#[allow(dead_code)]
2774fn _assert_dyn_compatible(_: &dyn EngineBackend) {}
2775
2776/// Polling interval for [`wait_for_flow_cancellation`]. Tight enough
2777/// that a local single-node cancel cascade observes `cancelled` within
2778/// one or two polls; slack enough that a `WaitIndefinite` caller does
2779/// not hammer `describe_flow` on a live cluster.
2780const CANCEL_WAIT_POLL_INTERVAL: Duration = Duration::from_millis(100);
2781
2782/// Defensive ceiling for [`CancelFlowWait::WaitIndefinite`] — if the
2783/// reconciler cascade has not converged in five minutes, something is
2784/// wedged and returning `Timeout` is strictly more useful than blocking
2785/// forever. RFC-012 §3.1.1 expects real-world cascades to finish within
2786/// `reconciler_interval + grace`, which is orders of magnitude below
2787/// this.
2788const CANCEL_WAIT_INDEFINITE_CEILING: Duration = Duration::from_secs(300);
2789
2790/// Poll `backend.describe_flow(flow_id)` until `public_flow_state` is
2791/// `"cancelled"` or `deadline` elapses.
2792///
2793/// Shared by every backend's `cancel_flow` trait impl that honours
2794/// [`CancelFlowWait::WaitTimeout`] / [`CancelFlowWait::WaitIndefinite`].
2795/// The underlying `cancel_flow` FCALL / SQL transaction flips the
2796/// flow-level state synchronously; member cancellations dispatch
2797/// asynchronously via the reconciler, which also flips
2798/// `public_flow_state` to `cancelled` once the cascade completes. This
2799/// helper waits for that terminal flip.
2800///
2801/// Returns:
2802/// * `Ok(())` once `public_flow_state = "cancelled"` is observed.
2803/// * `Err(EngineError::Timeout { op: "cancel_flow", elapsed })` when
2804///   `deadline` elapses first. `elapsed` is the wait budget (the
2805///   requested timeout), not wall-clock precision.
2806/// * `Err(e)` if `describe_flow` itself errors (propagated).
2807pub async fn wait_for_flow_cancellation<B: EngineBackend + ?Sized>(
2808    backend: &B,
2809    flow_id: &crate::types::FlowId,
2810    deadline: Duration,
2811) -> Result<(), EngineError> {
2812    let start = std::time::Instant::now();
2813    loop {
2814        match backend.describe_flow(flow_id).await? {
2815            Some(snap) if snap.public_flow_state == "cancelled" => return Ok(()),
2816            // `None` (flow removed) is also terminal from the caller's
2817            // perspective — nothing left to wait on.
2818            None => return Ok(()),
2819            Some(_) => {}
2820        }
2821        if start.elapsed() >= deadline {
2822            return Err(EngineError::Timeout {
2823                op: "cancel_flow",
2824                elapsed: deadline,
2825            });
2826        }
2827        tokio::time::sleep(CANCEL_WAIT_POLL_INTERVAL).await;
2828    }
2829}
2830
2831/// Convert a [`CancelFlowWait`] into the deadline passed to
2832/// [`wait_for_flow_cancellation`]. `NoWait` returns `None` — the caller
2833/// must skip the wait entirely.
2834pub fn cancel_flow_wait_deadline(wait: CancelFlowWait) -> Option<Duration> {
2835    // `CancelFlowWait` is `#[non_exhaustive]`; this match lives in the
2836    // defining crate so the exhaustiveness check keeps the compiler
2837    // honest. Future variants must be wired here explicitly.
2838    match wait {
2839        CancelFlowWait::NoWait => None,
2840        CancelFlowWait::WaitTimeout(d) => Some(d),
2841        CancelFlowWait::WaitIndefinite => Some(CANCEL_WAIT_INDEFINITE_CEILING),
2842    }
2843}
2844
2845/// Validate a caller-namespaced tag key against the regex
2846/// `^[a-z][a-z0-9_]*\.[a-z0-9_][a-z0-9_.]*$`.
2847///
2848/// The Rust trait-side check is **stricter than the Valkey Lua
2849/// contracts** (`ff_set_execution_tags` / `ff_set_flow_tags`), which
2850/// only check `^[a-z][a-z0-9_]*%.[^.]` — namespace prefix + first
2851/// suffix char, with the rest of the suffix unvalidated. Every
2852/// backend-side impl (`ff-backend-{valkey,postgres,sqlite}`) calls
2853/// this helper **before** the wire hop so the effective parity
2854/// contract is this full-key regex; Valkey's Lua is an additional,
2855/// more permissive server-side guard, not the parity-of-record.
2856///
2857/// A key passes iff:
2858///
2859/// * it begins with an ASCII lowercase letter;
2860/// * all characters up to the first `.` are lowercase alnum or `_`;
2861/// * the first `.` is followed by at least one non-`.` character
2862///   (so `cairn.` and `cairn..x` fail — the `<field>` must be non-empty).
2863///
2864/// Shared entry point for [`EngineBackend::set_execution_tag`] /
2865/// [`EngineBackend::set_flow_tag`] / [`EngineBackend::get_execution_tag`] /
2866/// [`EngineBackend::get_flow_tag`] so every backend rejects the same
2867/// keyspace. On rejection, returns
2868/// [`EngineError::Validation { kind: ValidationKind::InvalidInput, .. }`](crate::engine_error::EngineError::Validation)
2869/// with the offending key in `detail`.
2870///
2871/// `#[allow(clippy::result_large_err)]` — `EngineError` is the uniform
2872/// error across the whole `EngineBackend` trait surface (see trait method
2873/// signatures above); boxing it here alone would introduce a
2874/// gratuitous signature deviation. Clippy 1.95 flags free functions but
2875/// not trait methods; this function mirrors the trait-method convention.
2876#[allow(clippy::result_large_err)]
2877pub fn validate_tag_key(key: &str) -> Result<(), EngineError> {
2878    use crate::engine_error::ValidationKind;
2879
2880    let bad = || EngineError::Validation {
2881        kind: ValidationKind::InvalidInput,
2882        detail: format!(
2883            "invalid tag key: {key:?} (must match ^[a-z][a-z0-9_]*\\.[a-z0-9_][a-z0-9_.]*$)"
2884        ),
2885    };
2886
2887    let mut chars = key.chars();
2888    let first = chars.next().ok_or_else(bad)?;
2889    if !first.is_ascii_lowercase() {
2890        return Err(bad());
2891    }
2892    // Phase 1: consume the namespace prefix up to (but not including) the first `.`.
2893    // Chars must be `[a-z0-9_]`.
2894    let mut saw_dot = false;
2895    for c in chars.by_ref() {
2896        if c == '.' {
2897            saw_dot = true;
2898            break;
2899        }
2900        if !(c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_') {
2901            return Err(bad());
2902        }
2903    }
2904    if !saw_dot {
2905        return Err(bad());
2906    }
2907    // Phase 2: the first char after the first `.` must exist and be a
2908    // non-dot member char `[a-z0-9_]` (matches Valkey's Lua regex
2909    // `^[a-z][a-z0-9_]*%.[^.]` — a second consecutive dot is rejected).
2910    let second = chars.next().ok_or_else(bad)?;
2911    if !(second.is_ascii_lowercase() || second.is_ascii_digit() || second == '_') {
2912        return Err(bad());
2913    }
2914    // Phase 3 (Finding 2 tightening): every remaining char MUST be
2915    // `[a-z0-9_.]`. Before this, the suffix was unvalidated beyond
2916    // its first char — so `cairn.foo bar`, `cairn.Foo`,
2917    // `cairn.foo"bar`, `cairn.foo-bar` all passed trait-side validation
2918    // even though they would break SQLite JSON-path quoting, the Lua
2919    // HSET field-name conventions, or consumer grep patterns. Valkey's
2920    // Lua prefix regex is identically permissive on the suffix; this
2921    // tightens both layers from the Rust side. Dots in the suffix
2922    // remain legal (`app.sub.field` is valid) to preserve the existing
2923    // accepted shape.
2924    for c in chars {
2925        if !(c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_' || c == '.') {
2926            return Err(bad());
2927        }
2928    }
2929    Ok(())
2930}
2931
2932#[cfg(test)]
2933mod tests {
2934    use super::*;
2935
2936    /// A zero-state backend stub used to exercise the default
2937    /// `capabilities()` impl without pulling in a real
2938    /// transport. Only the default method is under test here; every
2939    /// other method is unreachable on this type.
2940    struct DefaultBackend;
2941
2942    #[async_trait]
2943    impl EngineBackend for DefaultBackend {
2944        async fn claim(
2945            &self,
2946            _lane: &LaneId,
2947            _capabilities: &CapabilitySet,
2948            _policy: ClaimPolicy,
2949        ) -> Result<Option<Handle>, EngineError> {
2950            unreachable!()
2951        }
2952        async fn renew(&self, _handle: &Handle) -> Result<LeaseRenewal, EngineError> {
2953            unreachable!()
2954        }
2955        async fn progress(
2956            &self,
2957            _handle: &Handle,
2958            _percent: Option<u8>,
2959            _message: Option<String>,
2960        ) -> Result<(), EngineError> {
2961            unreachable!()
2962        }
2963        async fn append_frame(
2964            &self,
2965            _handle: &Handle,
2966            _frame: Frame,
2967        ) -> Result<AppendFrameOutcome, EngineError> {
2968            unreachable!()
2969        }
2970        async fn complete(
2971            &self,
2972            _handle: &Handle,
2973            _payload: Option<Vec<u8>>,
2974        ) -> Result<(), EngineError> {
2975            unreachable!()
2976        }
2977        async fn fail(
2978            &self,
2979            _handle: &Handle,
2980            _reason: FailureReason,
2981            _classification: FailureClass,
2982        ) -> Result<FailOutcome, EngineError> {
2983            unreachable!()
2984        }
2985        async fn cancel(&self, _handle: &Handle, _reason: &str) -> Result<(), EngineError> {
2986            unreachable!()
2987        }
2988        async fn suspend(
2989            &self,
2990            _handle: &Handle,
2991            _args: SuspendArgs,
2992        ) -> Result<SuspendOutcome, EngineError> {
2993            unreachable!()
2994        }
2995        async fn create_waitpoint(
2996            &self,
2997            _handle: &Handle,
2998            _waitpoint_key: &str,
2999            _expires_in: Duration,
3000        ) -> Result<PendingWaitpoint, EngineError> {
3001            unreachable!()
3002        }
3003        async fn observe_signals(
3004            &self,
3005            _handle: &Handle,
3006        ) -> Result<Vec<ResumeSignal>, EngineError> {
3007            unreachable!()
3008        }
3009        async fn claim_from_resume_grant(
3010            &self,
3011            _token: ResumeToken,
3012        ) -> Result<Option<Handle>, EngineError> {
3013            unreachable!()
3014        }
3015        async fn delay(
3016            &self,
3017            _handle: &Handle,
3018            _delay_until: TimestampMs,
3019        ) -> Result<(), EngineError> {
3020            unreachable!()
3021        }
3022        async fn wait_children(&self, _handle: &Handle) -> Result<(), EngineError> {
3023            unreachable!()
3024        }
3025        async fn describe_execution(
3026            &self,
3027            _id: &ExecutionId,
3028        ) -> Result<Option<ExecutionSnapshot>, EngineError> {
3029            unreachable!()
3030        }
3031        async fn read_execution_context(
3032            &self,
3033            _execution_id: &ExecutionId,
3034        ) -> Result<ExecutionContext, EngineError> {
3035            Ok(ExecutionContext::new(
3036                Vec::new(),
3037                String::new(),
3038                std::collections::HashMap::new(),
3039            ))
3040        }
3041        async fn read_current_attempt_index(
3042            &self,
3043            _execution_id: &ExecutionId,
3044        ) -> Result<AttemptIndex, EngineError> {
3045            Ok(AttemptIndex::new(0))
3046        }
3047        async fn read_total_attempt_count(
3048            &self,
3049            _execution_id: &ExecutionId,
3050        ) -> Result<AttemptIndex, EngineError> {
3051            Ok(AttemptIndex::new(0))
3052        }
3053        async fn describe_flow(
3054            &self,
3055            _id: &FlowId,
3056        ) -> Result<Option<FlowSnapshot>, EngineError> {
3057            unreachable!()
3058        }
3059        #[cfg(feature = "core")]
3060        async fn list_edges(
3061            &self,
3062            _flow_id: &FlowId,
3063            _direction: EdgeDirection,
3064        ) -> Result<Vec<EdgeSnapshot>, EngineError> {
3065            unreachable!()
3066        }
3067        #[cfg(feature = "core")]
3068        async fn describe_edge(
3069            &self,
3070            _flow_id: &FlowId,
3071            _edge_id: &EdgeId,
3072        ) -> Result<Option<EdgeSnapshot>, EngineError> {
3073            unreachable!()
3074        }
3075        #[cfg(feature = "core")]
3076        async fn resolve_execution_flow_id(
3077            &self,
3078            _eid: &ExecutionId,
3079        ) -> Result<Option<FlowId>, EngineError> {
3080            unreachable!()
3081        }
3082        #[cfg(feature = "core")]
3083        async fn list_flows(
3084            &self,
3085            _partition: PartitionKey,
3086            _cursor: Option<FlowId>,
3087            _limit: usize,
3088        ) -> Result<ListFlowsPage, EngineError> {
3089            unreachable!()
3090        }
3091        #[cfg(feature = "core")]
3092        async fn list_lanes(
3093            &self,
3094            _cursor: Option<LaneId>,
3095            _limit: usize,
3096        ) -> Result<ListLanesPage, EngineError> {
3097            unreachable!()
3098        }
3099        #[cfg(feature = "core")]
3100        async fn list_suspended(
3101            &self,
3102            _partition: PartitionKey,
3103            _cursor: Option<ExecutionId>,
3104            _limit: usize,
3105        ) -> Result<ListSuspendedPage, EngineError> {
3106            unreachable!()
3107        }
3108        #[cfg(feature = "core")]
3109        async fn list_executions(
3110            &self,
3111            _partition: PartitionKey,
3112            _cursor: Option<ExecutionId>,
3113            _limit: usize,
3114        ) -> Result<ListExecutionsPage, EngineError> {
3115            unreachable!()
3116        }
3117        #[cfg(feature = "core")]
3118        async fn deliver_signal(
3119            &self,
3120            _args: DeliverSignalArgs,
3121        ) -> Result<DeliverSignalResult, EngineError> {
3122            unreachable!()
3123        }
3124        #[cfg(feature = "core")]
3125        async fn claim_resumed_execution(
3126            &self,
3127            _args: ClaimResumedExecutionArgs,
3128        ) -> Result<ClaimResumedExecutionResult, EngineError> {
3129            unreachable!()
3130        }
3131        async fn cancel_flow(
3132            &self,
3133            _id: &FlowId,
3134            _policy: CancelFlowPolicy,
3135            _wait: CancelFlowWait,
3136        ) -> Result<CancelFlowResult, EngineError> {
3137            unreachable!()
3138        }
3139        #[cfg(feature = "core")]
3140        async fn set_edge_group_policy(
3141            &self,
3142            _flow_id: &FlowId,
3143            _downstream_execution_id: &ExecutionId,
3144            _policy: crate::contracts::EdgeDependencyPolicy,
3145        ) -> Result<crate::contracts::SetEdgeGroupPolicyResult, EngineError> {
3146            unreachable!()
3147        }
3148        async fn report_usage(
3149            &self,
3150            _handle: &Handle,
3151            _budget: &BudgetId,
3152            _dimensions: crate::backend::UsageDimensions,
3153        ) -> Result<ReportUsageResult, EngineError> {
3154            unreachable!()
3155        }
3156        #[cfg(feature = "streaming")]
3157        async fn read_stream(
3158            &self,
3159            _execution_id: &ExecutionId,
3160            _attempt_index: AttemptIndex,
3161            _from: StreamCursor,
3162            _to: StreamCursor,
3163            _count_limit: u64,
3164        ) -> Result<StreamFrames, EngineError> {
3165            unreachable!()
3166        }
3167        #[cfg(feature = "streaming")]
3168        async fn tail_stream(
3169            &self,
3170            _execution_id: &ExecutionId,
3171            _attempt_index: AttemptIndex,
3172            _after: StreamCursor,
3173            _block_ms: u64,
3174            _count_limit: u64,
3175            _visibility: TailVisibility,
3176        ) -> Result<StreamFrames, EngineError> {
3177            unreachable!()
3178        }
3179        #[cfg(feature = "streaming")]
3180        async fn read_summary(
3181            &self,
3182            _execution_id: &ExecutionId,
3183            _attempt_index: AttemptIndex,
3184        ) -> Result<Option<SummaryDocument>, EngineError> {
3185            unreachable!()
3186        }
3187    }
3188
3189    /// The default `capabilities()` impl returns a value tagged
3190    /// `family = "unknown"` with every `supports.*` bool false, so
3191    /// pre-RFC-018 out-of-tree backends keep compiling and consumers
3192    /// can distinguish "backend predates RFC-018" from "backend
3193    /// reports concrete bools." Every concrete in-tree backend
3194    /// overrides.
3195    #[test]
3196    fn default_capabilities_is_unknown_family_all_false() {
3197        let b = DefaultBackend;
3198        let caps = b.capabilities();
3199        assert_eq!(caps.identity.family, "unknown");
3200        assert_eq!(
3201            caps.identity.version,
3202            crate::capability::Version::new(0, 0, 0)
3203        );
3204        assert_eq!(caps.identity.rfc017_stage, "unknown");
3205        // Every field false on the default (matches `Supports::none()`).
3206        assert_eq!(caps.supports, crate::capability::Supports::none());
3207    }
3208
3209    // ── resolve_dependency (PR-7b Step 0) ──
3210
3211    /// Default impl returns `Unavailable { op: "resolve_dependency" }`
3212    /// so pre-migration backends surface a typed Unsupported-grade
3213    /// error rather than panic. Both cluster 2's dependency_reconciler
3214    /// and cluster 4's completion_listener route through this method;
3215    /// the default is the safety net for non-Valkey deployments.
3216    #[cfg(feature = "core")]
3217    #[tokio::test]
3218    async fn default_resolve_dependency_is_unavailable() {
3219        use crate::contracts::ResolveDependencyArgs;
3220        use crate::partition::{Partition, PartitionFamily};
3221        use crate::types::{AttemptIndex, EdgeId, FlowId, LaneId};
3222
3223        let b = DefaultBackend;
3224        let partition = Partition {
3225            family: PartitionFamily::Flow,
3226            index: 0,
3227        };
3228        let args = ResolveDependencyArgs::new(
3229            partition,
3230            FlowId::parse("11111111-1111-1111-1111-111111111111").unwrap(),
3231            ExecutionId::parse("{fp:0}:22222222-2222-2222-2222-222222222222").unwrap(),
3232            ExecutionId::parse("{fp:0}:33333333-3333-3333-3333-333333333333").unwrap(),
3233            EdgeId::parse("44444444-4444-4444-4444-444444444444").unwrap(),
3234            LaneId::new("default"),
3235            AttemptIndex::new(0),
3236            "success".to_owned(),
3237            TimestampMs::now(),
3238        );
3239        match b.resolve_dependency(args).await {
3240            Err(EngineError::Unavailable { op }) => {
3241                assert_eq!(op, "resolve_dependency");
3242            }
3243            other => panic!("expected Unavailable, got {other:?}"),
3244        }
3245    }
3246
3247    // ── cascade_completion (PR-7b Cluster 4) ──
3248
3249    /// Default impl returns `Unavailable { op: "cascade_completion" }`
3250    /// so pre-migration / non-core builds surface a typed error rather
3251    /// than panic. The push-based completion listener routes through
3252    /// this method; the default is the safety net for deployments
3253    /// whose backend doesn't cascade (e.g. SQLite in the current
3254    /// Wave 9 scope).
3255    #[cfg(feature = "core")]
3256    #[tokio::test]
3257    async fn default_cascade_completion_is_unavailable() {
3258        use crate::backend::CompletionPayload;
3259
3260        let b = DefaultBackend;
3261        let eid = ExecutionId::parse(
3262            "{fp:0}:66666666-6666-6666-6666-666666666666",
3263        )
3264        .unwrap();
3265        let payload = CompletionPayload::new(eid, "success", None, TimestampMs::now());
3266        match b.cascade_completion(&payload).await {
3267            Err(EngineError::Unavailable { op }) => {
3268                assert_eq!(op, "cascade_completion");
3269            }
3270            other => panic!("expected Unavailable, got {other:?}"),
3271        }
3272    }
3273
3274    // ── unblock_execution (PR-7b Cluster 2) ──
3275
3276    /// Default impl returns `Unavailable { op: "unblock_execution" }`
3277    /// so non-Valkey deployments get a typed `Unavailable` rather than
3278    /// a panic. The engine scanner loop on PG/SQLite skips this path
3279    /// entirely (scheduler eligibility is re-evaluated live via SQL
3280    /// predicates), but a stray direct call must still fail gracefully.
3281    #[cfg(feature = "core")]
3282    #[tokio::test]
3283    async fn default_unblock_execution_is_unavailable() {
3284        use crate::partition::{Partition, PartitionFamily};
3285
3286        let b = DefaultBackend;
3287        let partition = Partition {
3288            family: PartitionFamily::Execution,
3289            index: 0,
3290        };
3291        let eid = ExecutionId::parse(
3292            "{fp:0}:55555555-5555-5555-5555-555555555555",
3293        )
3294        .unwrap();
3295        let lane = LaneId::new("default");
3296        match b
3297            .unblock_execution(
3298                partition,
3299                &lane,
3300                &eid,
3301                "waiting_for_budget",
3302                TimestampMs::from_millis(0),
3303            )
3304            .await
3305        {
3306            Err(EngineError::Unavailable { op }) => {
3307                assert_eq!(op, "unblock_execution");
3308            }
3309            other => panic!("expected Unavailable, got {other:?}"),
3310        }
3311    }
3312
3313    // ── project_flow_summary (PR-7b Cluster 2b-B) ──
3314
3315    /// Default impl returns `Unavailable { op: "project_flow_summary" }`
3316    /// so non-Valkey deployments get a typed `Unavailable` rather than
3317    /// a panic. SQLite rides the default per RFC-023 Phase 3.5.
3318    #[cfg(feature = "core")]
3319    #[tokio::test]
3320    async fn default_project_flow_summary_is_unavailable() {
3321        use crate::partition::{Partition, PartitionFamily};
3322        use crate::types::FlowId;
3323
3324        let b = DefaultBackend;
3325        let partition = Partition {
3326            family: PartitionFamily::Flow,
3327            index: 0,
3328        };
3329        let fid = FlowId::parse("11111111-1111-1111-1111-111111111111").unwrap();
3330        match b
3331            .project_flow_summary(partition, &fid, TimestampMs::from_millis(0))
3332            .await
3333        {
3334            Err(EngineError::Unavailable { op }) => {
3335                assert_eq!(op, "project_flow_summary");
3336            }
3337            other => panic!("expected Unavailable, got {other:?}"),
3338        }
3339    }
3340
3341    // ── trim_retention (PR-7b Cluster 2b-B) ──
3342
3343    /// Default impl returns `Unavailable { op: "trim_retention" }` so
3344    /// non-Valkey deployments get a typed `Unavailable` rather than a
3345    /// panic. SQLite rides the default per RFC-023 Phase 3.5.
3346    #[cfg(feature = "core")]
3347    #[tokio::test]
3348    async fn default_trim_retention_is_unavailable() {
3349        use crate::partition::{Partition, PartitionFamily};
3350
3351        let b = DefaultBackend;
3352        let partition = Partition {
3353            family: PartitionFamily::Execution,
3354            index: 0,
3355        };
3356        let lane = LaneId::new("default");
3357        match b
3358            .trim_retention(
3359                partition,
3360                &lane,
3361                60_000,
3362                TimestampMs::from_millis(0),
3363                20,
3364                &crate::backend::ScannerFilter::NOOP,
3365            )
3366            .await
3367        {
3368            Err(EngineError::Unavailable { op }) => {
3369                assert_eq!(op, "trim_retention");
3370            }
3371            other => panic!("expected Unavailable, got {other:?}"),
3372        }
3373    }
3374
3375    // ── read_exec_core_fields (PR-7b Wave 0a) ──
3376
3377    /// Default impl returns `Unavailable { op: "read_exec_core_fields" }`
3378    /// so out-of-tree backends that pre-date v0.13 keep compiling while
3379    /// surfacing a typed error on call. Empty-field input short-circuits
3380    /// to `Ok(empty map)` in all in-tree impls; the default is triggered
3381    /// only when the trait method itself is un-overridden.
3382    #[cfg(feature = "core")]
3383    #[tokio::test]
3384    async fn default_read_exec_core_fields_is_unavailable() {
3385        use crate::partition::{Partition, PartitionFamily};
3386
3387        let b = DefaultBackend;
3388        let partition = Partition {
3389            family: PartitionFamily::Execution,
3390            index: 0,
3391        };
3392        let eid = ExecutionId::parse(
3393            "{fp:0}:66666666-6666-6666-6666-666666666666",
3394        )
3395        .unwrap();
3396        match b
3397            .read_exec_core_fields(partition, &eid, &["lane_id"])
3398            .await
3399        {
3400            Err(EngineError::Unavailable { op }) => {
3401                assert_eq!(op, "read_exec_core_fields");
3402            }
3403            other => panic!("expected Unavailable, got {other:?}"),
3404        }
3405    }
3406
3407    // ── validate_tag_key (issue #433) ──
3408
3409    #[test]
3410    fn validate_tag_key_accepts_valid() {
3411        for k in [
3412            "cairn.session_id",
3413            "cairn.project",
3414            "a.b",
3415            "a1_2.x",
3416            "app.sub.field",
3417            "x.y_z",
3418        ] {
3419            validate_tag_key(k).unwrap_or_else(|e| panic!("{k:?} should pass: {e:?}"));
3420        }
3421    }
3422
3423    #[test]
3424    fn validate_tag_key_rejects_invalid() {
3425        for k in [
3426            "",                // empty
3427            "Cairn.x",         // uppercase first
3428            "1cairn.x",        // leading digit
3429            "cairn",           // no dot
3430            "cairn.",          // empty suffix
3431            "cairn..x",        // dot immediately after first dot
3432            ".cairn",          // leading dot
3433            "cair n.x",        // space before dot
3434            "ca-irn.x",        // hyphen in prefix
3435            // Finding 2 tightening — suffix now fully validated.
3436            "cairn.Foo",       // uppercase in suffix
3437            "cairn.foo bar",   // space in suffix
3438            "cairn.foo\"bar",  // double-quote in suffix (would break SQLite JSON-path quoting)
3439            "cairn.foo-bar",   // hyphen in suffix
3440            "cairn.foo\\bar",  // backslash in suffix
3441        ] {
3442            let err = validate_tag_key(k)
3443                .err()
3444                .unwrap_or_else(|| panic!("{k:?} should fail"));
3445            match err {
3446                EngineError::Validation {
3447                    kind: crate::engine_error::ValidationKind::InvalidInput,
3448                    ..
3449                } => {}
3450                other => panic!("{k:?}: unexpected err {other:?}"),
3451            }
3452        }
3453    }
3454
3455    // ── cairn #389: service-layer FCALL trait-method defaults ──
3456    //
3457    // Each method MUST return `EngineError::Unavailable { op: "<name>" }`
3458    // on backends that haven't overridden it, so out-of-tree backends
3459    // keep compiling and consumers get a terminal-classified error
3460    // rather than a panic. Mirrors the precedent used by
3461    // `issue_reclaim_grant` / `reclaim_execution` / `suspend_by_triple`.
3462    //
3463    // Feature-gated on `core` because the methods under test only
3464    // exist on the trait under that feature — matches the precedent
3465    // of `default_resolve_dependency_is_unavailable` above.
3466
3467    #[cfg(feature = "core")]
3468    #[tokio::test]
3469    async fn default_complete_execution_is_unavailable() {
3470        use crate::contracts::CompleteExecutionArgs;
3471        use crate::types::{ExecutionId, FlowId};
3472        let b = DefaultBackend;
3473        let config = crate::partition::PartitionConfig::default();
3474        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3475        let args = CompleteExecutionArgs {
3476            execution_id: eid,
3477            fence: None,
3478            attempt_index: AttemptIndex::new(0),
3479            result_payload: None,
3480            result_encoding: None,
3481            source: crate::types::CancelSource::default(),
3482            now: TimestampMs::from_millis(0),
3483        };
3484        match b.complete_execution(args).await.unwrap_err() {
3485            EngineError::Unavailable { op } => assert_eq!(op, "complete_execution"),
3486            other => panic!("expected Unavailable, got {other:?}"),
3487        }
3488    }
3489
3490    #[cfg(feature = "core")]
3491    #[tokio::test]
3492    async fn default_fail_execution_is_unavailable() {
3493        use crate::contracts::FailExecutionArgs;
3494        use crate::types::{ExecutionId, FlowId};
3495        let b = DefaultBackend;
3496        let config = crate::partition::PartitionConfig::default();
3497        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3498        let args = FailExecutionArgs {
3499            execution_id: eid,
3500            fence: None,
3501            attempt_index: AttemptIndex::new(0),
3502            failure_reason: String::new(),
3503            failure_category: String::new(),
3504            retry_policy_json: String::new(),
3505            next_attempt_policy_json: String::new(),
3506            source: crate::types::CancelSource::default(),
3507        };
3508        match b.fail_execution(args).await.unwrap_err() {
3509            EngineError::Unavailable { op } => assert_eq!(op, "fail_execution"),
3510            other => panic!("expected Unavailable, got {other:?}"),
3511        }
3512    }
3513
3514    #[cfg(feature = "core")]
3515    #[tokio::test]
3516    async fn default_renew_lease_is_unavailable() {
3517        use crate::contracts::RenewLeaseArgs;
3518        use crate::types::{ExecutionId, FlowId};
3519        let b = DefaultBackend;
3520        let config = crate::partition::PartitionConfig::default();
3521        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3522        let args = RenewLeaseArgs {
3523            execution_id: eid,
3524            attempt_index: AttemptIndex::new(0),
3525            fence: None,
3526            lease_ttl_ms: 1_000,
3527            lease_history_grace_ms: 60_000,
3528        };
3529        match b.renew_lease(args).await.unwrap_err() {
3530            EngineError::Unavailable { op } => assert_eq!(op, "renew_lease"),
3531            other => panic!("expected Unavailable, got {other:?}"),
3532        }
3533    }
3534
3535    #[cfg(feature = "core")]
3536    #[tokio::test]
3537    async fn default_resume_execution_is_unavailable() {
3538        use crate::contracts::ResumeExecutionArgs;
3539        use crate::types::{ExecutionId, FlowId};
3540        let b = DefaultBackend;
3541        let config = crate::partition::PartitionConfig::default();
3542        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3543        let args = ResumeExecutionArgs {
3544            execution_id: eid,
3545            trigger_type: "signal".to_owned(),
3546            resume_delay_ms: 0,
3547        };
3548        match b.resume_execution(args).await.unwrap_err() {
3549            EngineError::Unavailable { op } => assert_eq!(op, "resume_execution"),
3550            other => panic!("expected Unavailable, got {other:?}"),
3551        }
3552    }
3553
3554    #[cfg(feature = "core")]
3555    #[tokio::test]
3556    async fn default_check_admission_is_unavailable() {
3557        use crate::contracts::CheckAdmissionArgs;
3558        use crate::types::{ExecutionId, FlowId, QuotaPolicyId};
3559        let b = DefaultBackend;
3560        let config = crate::partition::PartitionConfig::default();
3561        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3562        let args = CheckAdmissionArgs {
3563            execution_id: eid,
3564            now: TimestampMs::from_millis(0),
3565            window_seconds: 60,
3566            rate_limit: 10,
3567            concurrency_cap: 1,
3568            jitter_ms: None,
3569        };
3570        let qid = QuotaPolicyId::new();
3571        match b
3572            .check_admission(&qid, "default", args)
3573            .await
3574            .unwrap_err()
3575        {
3576            EngineError::Unavailable { op } => assert_eq!(op, "check_admission"),
3577            other => panic!("expected Unavailable, got {other:?}"),
3578        }
3579    }
3580
3581    #[cfg(feature = "core")]
3582    #[tokio::test]
3583    async fn default_evaluate_flow_eligibility_is_unavailable() {
3584        use crate::contracts::EvaluateFlowEligibilityArgs;
3585        use crate::types::{ExecutionId, FlowId};
3586        let b = DefaultBackend;
3587        let config = crate::partition::PartitionConfig::default();
3588        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3589        let args = EvaluateFlowEligibilityArgs { execution_id: eid };
3590        match b.evaluate_flow_eligibility(args).await.unwrap_err() {
3591            EngineError::Unavailable { op } => assert_eq!(op, "evaluate_flow_eligibility"),
3592            other => panic!("expected Unavailable, got {other:?}"),
3593        }
3594    }
3595
3596    // ── Cluster 2b-A tally-recompute reconcilers ──
3597
3598    #[cfg(feature = "core")]
3599    #[tokio::test]
3600    async fn default_reconcile_execution_index_is_unavailable() {
3601        use crate::backend::ScannerFilter;
3602        use crate::partition::{Partition, PartitionFamily};
3603        let b = DefaultBackend;
3604        let partition = Partition { family: PartitionFamily::Execution, index: 0 };
3605        let lanes = [LaneId::new("default")];
3606        let filter = ScannerFilter::default();
3607        match b
3608            .reconcile_execution_index(partition, &lanes, &filter)
3609            .await
3610            .unwrap_err()
3611        {
3612            EngineError::Unavailable { op } => assert_eq!(op, "reconcile_execution_index"),
3613            other => panic!("expected Unavailable, got {other:?}"),
3614        }
3615    }
3616
3617    #[cfg(feature = "core")]
3618    #[tokio::test]
3619    async fn default_reconcile_budget_counters_is_unavailable() {
3620        use crate::partition::{Partition, PartitionFamily};
3621        let b = DefaultBackend;
3622        let partition = Partition { family: PartitionFamily::Budget, index: 0 };
3623        match b
3624            .reconcile_budget_counters(partition, TimestampMs::from_millis(0))
3625            .await
3626            .unwrap_err()
3627        {
3628            EngineError::Unavailable { op } => assert_eq!(op, "reconcile_budget_counters"),
3629            other => panic!("expected Unavailable, got {other:?}"),
3630        }
3631    }
3632
3633    #[cfg(feature = "core")]
3634    #[tokio::test]
3635    async fn default_reconcile_quota_counters_is_unavailable() {
3636        use crate::partition::{Partition, PartitionFamily};
3637        let b = DefaultBackend;
3638        let partition = Partition { family: PartitionFamily::Quota, index: 0 };
3639        match b
3640            .reconcile_quota_counters(partition, TimestampMs::from_millis(0))
3641            .await
3642            .unwrap_err()
3643        {
3644            EngineError::Unavailable { op } => assert_eq!(op, "reconcile_quota_counters"),
3645            other => panic!("expected Unavailable, got {other:?}"),
3646        }
3647    }
3648
3649    // ── #454 trait-additions default-impl tests (4) ────────────
3650
3651    #[cfg(feature = "core")]
3652    #[tokio::test]
3653    async fn default_record_spend_is_unavailable() {
3654        use crate::contracts::RecordSpendArgs;
3655        use crate::types::{BudgetId, ExecutionId, FlowId};
3656        let b = DefaultBackend;
3657        let config = crate::partition::PartitionConfig::default();
3658        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3659        let args = RecordSpendArgs::new(
3660            BudgetId::new(),
3661            eid,
3662            std::collections::BTreeMap::new(),
3663            "k",
3664        );
3665        match b.record_spend(args).await.unwrap_err() {
3666            EngineError::Unavailable { op } => assert_eq!(op, "record_spend"),
3667            other => panic!("expected Unavailable, got {other:?}"),
3668        }
3669    }
3670
3671    #[cfg(feature = "core")]
3672    #[tokio::test]
3673    async fn default_release_budget_is_unavailable() {
3674        use crate::contracts::ReleaseBudgetArgs;
3675        use crate::types::{BudgetId, ExecutionId, FlowId};
3676        let b = DefaultBackend;
3677        let config = crate::partition::PartitionConfig::default();
3678        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3679        let args = ReleaseBudgetArgs::new(BudgetId::new(), eid);
3680        match b.release_budget(args).await.unwrap_err() {
3681            EngineError::Unavailable { op } => assert_eq!(op, "release_budget"),
3682            other => panic!("expected Unavailable, got {other:?}"),
3683        }
3684    }
3685
3686    #[cfg(feature = "core")]
3687    #[tokio::test]
3688    async fn default_deliver_approval_signal_is_unavailable() {
3689        use crate::contracts::DeliverApprovalSignalArgs;
3690        use crate::types::{ExecutionId, FlowId, LaneId, WaitpointId};
3691        let b = DefaultBackend;
3692        let config = crate::partition::PartitionConfig::default();
3693        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3694        let args = DeliverApprovalSignalArgs::new(
3695            eid,
3696            LaneId::new("default"),
3697            WaitpointId::new(),
3698            "approved",
3699            "sfx",
3700            1_000,
3701            Some(100),
3702            Some(10),
3703        );
3704        match b.deliver_approval_signal(args).await.unwrap_err() {
3705            EngineError::Unavailable { op } => assert_eq!(op, "deliver_approval_signal"),
3706            other => panic!("expected Unavailable, got {other:?}"),
3707        }
3708    }
3709
3710    #[cfg(feature = "core")]
3711    #[tokio::test]
3712    async fn default_issue_grant_and_claim_is_unavailable() {
3713        use crate::contracts::IssueGrantAndClaimArgs;
3714        use crate::types::{ExecutionId, FlowId, LaneId};
3715        let b = DefaultBackend;
3716        let config = crate::partition::PartitionConfig::default();
3717        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3718        let args = IssueGrantAndClaimArgs::new(eid, LaneId::new("default"), 30_000);
3719        match b.issue_grant_and_claim(args).await.unwrap_err() {
3720            EngineError::Unavailable { op } => assert_eq!(op, "issue_grant_and_claim"),
3721            other => panic!("expected Unavailable, got {other:?}"),
3722        }
3723    }
3724}