ff_core/engine_backend.rs
1//! The `EngineBackend` trait — abstracting FlowFabric's write surface.
2//!
3//! **RFC-012 Stage 1a:** this is the trait landing. The
4//! Valkey-backed impl lives in `ff-backend-valkey`; future backends
5//! (Postgres) add a sibling crate with their own impl. ff-sdk's
6//! `FlowFabricWorker` gains `connect_with(backend)` /
7//! `backend(&self)` accessors so consumers that want to bring their
8//! own backend (tests, future non-Valkey deployments) can hand one
9//! in. The hot-path migration of `ClaimedTask` / `FlowFabricWorker`
10//! to forward through the trait lands across Stages 1b-1d.
11//!
12//! # Object safety
13//!
14//! `EngineBackend` is object-safe: all methods are `async fn` behind
15//! `#[async_trait]` and take `&self`. Consumers can hold
16//! `Arc<dyn EngineBackend>` for heterogenous-backend deployments.
17//! The trait is `Send + Sync + 'static` per RFC-012 §4.1; every impl
18//! must honour that bound.
19//!
20//! # Error surface
21//!
22//! Every method returns [`Result<_, EngineError>`]. `EngineError`'s
23//! `Transport` variant carries a boxed `dyn Error + Send + Sync`;
24//! Valkey-backed transport faults box a
25//! `ff_script::error::ScriptError` (downcast via
26//! `ff_script::engine_error_ext::transport_script_ref`). Other
27//! backends box their native error type and set the `backend` tag
28//! accordingly.
29//!
30//! # Atomicity contract
31//!
32//! Per-op state transitions MUST be atomic (RFC-012 §3.4). On Valkey
33//! this is the single-FCALL-per-op property; on Postgres it is the
34//! per-transaction property. A backend that cannot honour atomicity
35//! for a given op either MUST NOT implement `EngineBackend` or MUST
36//! return `EngineError::Unavailable { op }` for the affected method.
37//!
38//! # Replay semantics
39//!
40//! `complete`, `fail`, `cancel`, `suspend`, `delay`, `wait_children`
41//! are idempotent under replay — calling twice with the same handle
42//! and args returns the same outcome (success on first call, typed
43//! `State` / `Contention` on subsequent calls where the fence triple
44//! no longer matches a live lease).
45
46use std::time::Duration;
47
48use async_trait::async_trait;
49
50use crate::backend::{
51 AppendFrameOutcome, CancelFlowPolicy, CancelFlowWait, CapabilitySet, ClaimPolicy,
52 FailOutcome, FailureClass, FailureReason, Frame, Handle, LeaseRenewal, PendingWaitpoint,
53 PrepareOutcome, ResumeSignal, ResumeToken,
54};
55// `SummaryDocument` and `TailVisibility` are referenced only inside
56// `#[cfg(feature = "streaming")]` trait methods below, so the imports
57// must be gated to avoid an unused-imports warning on the non-streaming
58// build.
59#[cfg(feature = "streaming")]
60use crate::backend::{SummaryDocument, TailVisibility};
61use crate::contracts::{
62 CancelFlowResult, ExecutionContext, ExecutionSnapshot, FlowSnapshot, IssueReclaimGrantArgs,
63 IssueReclaimGrantOutcome, ReclaimExecutionArgs, ReclaimExecutionOutcome, ReportUsageResult,
64 RotateWaitpointHmacSecretAllArgs, RotateWaitpointHmacSecretAllResult, SeedOutcome,
65 SeedWaitpointHmacSecretArgs, SuspendArgs, SuspendOutcome,
66};
67#[cfg(feature = "core")]
68use crate::contracts::{
69 AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
70 ApplyDependencyToChildResult, BudgetStatus, CancelExecutionArgs, CancelExecutionResult,
71 CancelFlowArgs, ChangePriorityArgs, ChangePriorityResult, ClaimExecutionArgs,
72 ClaimExecutionResult, ClaimForWorkerArgs, ClaimForWorkerOutcome, ClaimResumedExecutionArgs,
73 ClaimResumedExecutionResult,
74 BlockRouteArgs, BlockRouteOutcome, CheckAdmissionArgs, CheckAdmissionResult,
75 ClaimGrantOutcome,
76 CompleteExecutionArgs, CompleteExecutionResult, CreateBudgetArgs, CreateBudgetResult,
77 CreateExecutionArgs, CreateExecutionResult, CreateFlowArgs, CreateFlowResult,
78 CreateQuotaPolicyArgs, CreateQuotaPolicyResult,
79 DeliverApprovalSignalArgs, DeliverSignalArgs, DeliverSignalResult, EdgeDirection, EdgeSnapshot,
80 EvaluateFlowEligibilityArgs, EvaluateFlowEligibilityResult, ExecutionInfo,
81 FailExecutionArgs, FailExecutionResult,
82 IssueClaimGrantArgs, IssueClaimGrantOutcome, IssueGrantAndClaimArgs,
83 RecordSpendArgs, ReleaseBudgetArgs, ScanEligibleArgs,
84 ListExecutionsPage, ListFlowsPage, ListLanesPage, ListPendingWaitpointsArgs,
85 ListPendingWaitpointsResult, ListSuspendedPage, RenewLeaseArgs, RenewLeaseResult,
86 ReplayExecutionArgs, ReplayExecutionResult,
87 ReportUsageAdminArgs, ResetBudgetArgs, ResetBudgetResult, ResumeExecutionArgs,
88 ResumeExecutionResult, RevokeLeaseArgs, RevokeLeaseResult,
89 StageDependencyEdgeArgs, StageDependencyEdgeResult,
90};
91// RFC-025 worker registry.
92#[cfg(feature = "core")]
93use crate::contracts::{
94 HeartbeatWorkerArgs, HeartbeatWorkerOutcome, ListWorkersArgs, ListWorkersResult,
95 MarkWorkerDeadArgs, MarkWorkerDeadOutcome, RegisterWorkerArgs, RegisterWorkerOutcome,
96};
97#[cfg(feature = "suspension")]
98use crate::contracts::{ListExpiredLeasesArgs, ListExpiredLeasesResult};
99#[cfg(feature = "core")]
100use crate::state::PublicState;
101#[cfg(feature = "core")]
102use crate::partition::PartitionKey;
103#[cfg(feature = "streaming")]
104use crate::contracts::{StreamCursor, StreamFrames};
105use crate::engine_error::EngineError;
106#[cfg(feature = "core")]
107use crate::types::EdgeId;
108#[cfg(feature = "core")]
109use crate::types::WaitpointId;
110use crate::types::{AttemptIndex, BudgetId, ExecutionId, FlowId, LaneId, LeaseFence, TimestampMs};
111
112/// The engine write surface — a single trait a backend implementation
113/// honours to serve a `FlowFabricWorker`.
114///
115/// See RFC-012 §3.1 for the inventory rationale and §3.3 for the
116/// type-level shape. 16 methods (Round-7 added `create_waitpoint`;
117/// `append_frame` return widened; `report_usage` return replaced —
118/// RFC-012 §R7). Issue #150 added the two trigger-surface methods
119/// (`deliver_signal` / `claim_resumed_execution`).
120///
121/// # Note on `complete` payload shape
122///
123/// The RFC §3.3 sketch uses `Option<Bytes>`; the Stage 1a trait uses
124/// `Option<Vec<u8>>` to match the existing
125/// `ff_sdk::ClaimedTask::complete` signature and avoid adding a
126/// `bytes` public-type dep for zero consumer benefit. Round-4 §7.17
127/// resolved the payload container debate to `Box<[u8]>` in the
128/// public type (see `HandleOpaque`); `Option<Vec<u8>>` is the
129/// zero-churn choice consistent with today's code. Consumers that
130/// need `&[u8]` can borrow via `.as_deref()` on the Option.
131#[async_trait]
132pub trait EngineBackend: Send + Sync + 'static {
133 // ── Claim + lifecycle ──
134
135 /// Fresh-work claim. Returns `Ok(None)` when no work is currently
136 /// available; `Err` only on transport or input-validation faults.
137 async fn claim(
138 &self,
139 lane: &LaneId,
140 capabilities: &CapabilitySet,
141 policy: ClaimPolicy,
142 ) -> Result<Option<Handle>, EngineError>;
143
144 /// Renew a held lease. Returns the updated expiry + epoch on
145 /// success; typed `State::StaleLease` / `State::LeaseExpired`
146 /// when the lease has been stolen or timed out.
147 async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError>;
148
149 /// Numeric-progress heartbeat.
150 ///
151 /// Writes scalar `progress_percent` / `progress_message` fields on
152 /// `exec_core`; each call overwrites the previous value. This does
153 /// NOT append to the output stream — stream-frame producers must use
154 /// [`append_frame`](Self::append_frame) instead.
155 async fn progress(
156 &self,
157 handle: &Handle,
158 percent: Option<u8>,
159 message: Option<String>,
160 ) -> Result<(), EngineError>;
161
162 /// Append one stream frame. Distinct from [`progress`](Self::progress)
163 /// per RFC-012 §3.1.1 K#6. Returns the backend-assigned stream entry
164 /// id and post-append frame count (RFC-012 §R7.2.1).
165 ///
166 /// Stream-frame producers (arbitrary `frame_type` + payload, consumed
167 /// via the read/tail surfaces) MUST use this method rather than
168 /// [`progress`](Self::progress); the latter updates scalar fields on
169 /// `exec_core` and is invisible to stream consumers.
170 async fn append_frame(
171 &self,
172 handle: &Handle,
173 frame: Frame,
174 ) -> Result<AppendFrameOutcome, EngineError>;
175
176 /// Terminal success. Borrows `handle` (round-4 M-D2) so callers
177 /// can retry under `EngineError::Transport` without losing the
178 /// cookie. Payload is `Option<Vec<u8>>` per the note above.
179 async fn complete(&self, handle: &Handle, payload: Option<Vec<u8>>) -> Result<(), EngineError>;
180
181 /// Terminal failure with classification. Returns [`FailOutcome`]
182 /// so the caller learns whether a retry was scheduled.
183 async fn fail(
184 &self,
185 handle: &Handle,
186 reason: FailureReason,
187 classification: FailureClass,
188 ) -> Result<FailOutcome, EngineError>;
189
190 /// Cooperative cancel by the worker holding the lease.
191 async fn cancel(&self, handle: &Handle, reason: &str) -> Result<(), EngineError>;
192
193 /// Suspend the execution awaiting a typed resume condition
194 /// (RFC-013 Stage 1d).
195 ///
196 /// Borrows `handle` (round-4 M-D2). Terminal-looking behaviour is
197 /// expressed through [`SuspendOutcome`]:
198 ///
199 /// * [`SuspendOutcome::Suspended`] — the pre-suspend handle is
200 /// logically invalidated; the fresh `HandleKind::Suspended`
201 /// handle inside the variant supersedes it. Runtime enforcement
202 /// via the fence triple: subsequent ops against the stale handle
203 /// surface as `Contention(LeaseConflict)`.
204 /// * [`SuspendOutcome::AlreadySatisfied`] — buffered signals on a
205 /// pending waitpoint already matched the resume condition at
206 /// suspension time. The lease is NOT released; the caller's
207 /// pre-suspend handle remains valid.
208 ///
209 /// See RFC-013 §2 for the type shapes, §3 for the replay /
210 /// idempotency contract, §4 for the error taxonomy.
211 async fn suspend(
212 &self,
213 handle: &Handle,
214 args: SuspendArgs,
215 ) -> Result<SuspendOutcome, EngineError>;
216
217 /// Suspend by execution id + lease fence triple, for service-layer
218 /// callers that hold a run record / lease-claim descriptor but no
219 /// worker [`Handle`] (cairn issue #322).
220 ///
221 /// Semantics mirror [`Self::suspend`] exactly — the same
222 /// [`SuspendArgs`] validation, the same [`SuspendOutcome`]
223 /// lifecycle, the same RFC-013 §3 dedup / replay contract. The
224 /// only difference is the fencing source: instead of the
225 /// `(lease_id, lease_epoch, attempt_id)` fields embedded in a
226 /// `Handle`, the backend fences against the triple passed directly.
227 /// Attempt-index, lane, and worker-instance metadata that
228 /// [`Self::suspend`] reads from the handle payload are recovered
229 /// from the backend's authoritative execution record (Valkey:
230 /// `exec_core` HGETs; Postgres: `ff_attempt` row lookup).
231 ///
232 /// The default impl returns [`EngineError::Unavailable`] so
233 /// existing backend impls remain non-breaking. Production backends
234 /// (Valkey, Postgres) override.
235 async fn suspend_by_triple(
236 &self,
237 exec_id: ExecutionId,
238 triple: LeaseFence,
239 args: SuspendArgs,
240 ) -> Result<SuspendOutcome, EngineError> {
241 let _ = (exec_id, triple, args);
242 Err(EngineError::Unavailable {
243 op: "suspend_by_triple",
244 })
245 }
246
247 /// Issue a pending waitpoint for future signal delivery.
248 ///
249 /// Waitpoints have two states in the Valkey wire contract:
250 /// **pending** (token issued, not yet backing a suspension) and
251 /// **active** (bound to a suspension). This method creates a
252 /// waitpoint in the **pending** state. A later `suspend` call
253 /// transitions a pending waitpoint to active (see Lua
254 /// `use_pending_waitpoint` ARGV flag at
255 /// `flowfabric.lua:3603,3641,3690`) — or, if buffered signals
256 /// already satisfy its condition, the suspend call returns
257 /// `SuspendOutcome::AlreadySatisfied` and the waitpoint activates
258 /// without ever releasing the lease.
259 ///
260 /// Pending-waitpoint expiry is a first-class terminal error on
261 /// the wire (`PendingWaitpointExpired` at
262 /// `ff-script/src/error.rs:170,403-408`). The attempt retains its
263 /// lease while the waitpoint is pending; signals delivered to
264 /// this waitpoint are buffered server-side (RFC-012 §R7.2.2).
265 async fn create_waitpoint(
266 &self,
267 handle: &Handle,
268 waitpoint_key: &str,
269 expires_in: Duration,
270 ) -> Result<PendingWaitpoint, EngineError>;
271
272 /// Read the HMAC token stored on a waitpoint record, keyed by
273 /// `(partition, waitpoint_id)`.
274 ///
275 /// Returns `Ok(Some(token))` when the waitpoint exists and carries
276 /// a token, `Ok(None)` when the waitpoint does not exist or no
277 /// token field is present. A missing waitpoint is not an error —
278 /// signals can legitimately arrive after a waitpoint has been
279 /// consumed or expired, and the signal-bridge authenticates on the
280 /// presence of a matching token, not on the waitpoint's liveness.
281 ///
282 /// # Use case
283 ///
284 /// Control-plane signal delivery (cairn signal-bridge): at
285 /// signal-resume time the bridge reads the token off the
286 /// waitpoint hash / row to authenticate the resume request it
287 /// subsequently issues. Previously implemented as direct
288 /// `ferriskey::Client::hget(waitpoint_key, "waitpoint_token")` —
289 /// Valkey-only. This method routes the read through the trait so
290 /// the pattern works on Postgres + SQLite as well.
291 ///
292 /// # Per-backend shape
293 ///
294 /// * **Valkey** — `HGET ff:wp:<tag>:<waitpoint_id> waitpoint_token`
295 /// on the waitpoint's partition. Empty string / missing field
296 /// maps to `None`.
297 /// * **Postgres** — `SELECT token FROM ff_waitpoint_pending
298 /// WHERE partition_key = $1 AND waitpoint_id = $2 LIMIT 1`.
299 /// Row-absent → `None`; empty `token` → `None`.
300 /// * **SQLite** — same shape as Postgres.
301 ///
302 /// The `partition` argument is the opaque [`PartitionKey`]
303 /// produced by FlowFabric (typically extracted from the
304 /// `Handle` / `ResumeToken` the waitpoint was minted against).
305 ///
306 /// # Default impl
307 ///
308 /// Returns [`EngineError::Unavailable`] with
309 /// `op = "read_waitpoint_token"` so out-of-tree backends and
310 /// in-tree backends not yet overriding this method continue to
311 /// compile. Mirrors the precedent used by
312 /// [`Self::issue_reclaim_grant`] / [`Self::reclaim_execution`].
313 #[cfg(feature = "core")]
314 async fn read_waitpoint_token(
315 &self,
316 _partition: PartitionKey,
317 _waitpoint_id: &WaitpointId,
318 ) -> Result<Option<String>, EngineError> {
319 Err(EngineError::Unavailable {
320 op: "read_waitpoint_token",
321 })
322 }
323
324 /// Non-mutating observation of signals that satisfied the handle's
325 /// resume condition.
326 async fn observe_signals(&self, handle: &Handle) -> Result<Vec<ResumeSignal>, EngineError>;
327
328 /// Consume a resume grant (via [`ResumeToken`]) to mint a
329 /// resumed-kind handle. Routes to `ff_claim_resumed_execution` on
330 /// Valkey / the epoch-bump reconciler on PG/SQLite. Returns
331 /// `Ok(None)` when the grant's target execution is no longer
332 /// resumable (already reclaimed, terminal, etc.).
333 ///
334 /// **Renamed from `claim_from_reclaim` (RFC-024 PR-B+C).** The
335 /// pre-rename name advertised "reclaim" but the semantic has
336 /// always been resume-after-suspend. The new lease-reclaim path
337 /// lives on [`Self::reclaim_execution`].
338 async fn claim_from_resume_grant(
339 &self,
340 token: ResumeToken,
341 ) -> Result<Option<Handle>, EngineError>;
342
343 /// Issue a lease-reclaim grant (RFC-024 §3.2). Admits executions
344 /// in `lease_expired_reclaimable` or `lease_revoked` state to the
345 /// reclaim path; the returned [`IssueReclaimGrantOutcome::Granted`]
346 /// carries a [`crate::contracts::ReclaimGrant`] which is then fed
347 /// to [`Self::reclaim_execution`] to mint a fresh attempt.
348 ///
349 /// Default impl returns [`EngineError::Unavailable`] — PR-D (PG),
350 /// PR-E (SQLite), and PR-F (Valkey) override with real bodies.
351 async fn issue_reclaim_grant(
352 &self,
353 _args: IssueReclaimGrantArgs,
354 ) -> Result<IssueReclaimGrantOutcome, EngineError> {
355 Err(EngineError::Unavailable {
356 op: "issue_reclaim_grant",
357 })
358 }
359
360 /// Consume a [`crate::contracts::ReclaimGrant`] to mint a fresh
361 /// attempt for a previously lease-expired / lease-revoked
362 /// execution (RFC-024 §3.2). Creates a new attempt row, bumps the
363 /// execution's `lease_reclaim_count`, and mints a
364 /// [`crate::backend::HandleKind::Reclaimed`] handle.
365 ///
366 /// Default impl returns [`EngineError::Unavailable`] — PR-D (PG),
367 /// PR-E (SQLite), and PR-F (Valkey) override with real bodies.
368 async fn reclaim_execution(
369 &self,
370 _args: ReclaimExecutionArgs,
371 ) -> Result<ReclaimExecutionOutcome, EngineError> {
372 Err(EngineError::Unavailable {
373 op: "reclaim_execution",
374 })
375 }
376
377 // Round-5 amendment: lease-releasing peers of `suspend`.
378
379 /// Park the execution until `delay_until`, releasing the lease.
380 async fn delay(&self, handle: &Handle, delay_until: TimestampMs) -> Result<(), EngineError>;
381
382 /// Mark the execution as waiting for its child flow to complete,
383 /// releasing the lease.
384 async fn wait_children(&self, handle: &Handle) -> Result<(), EngineError>;
385
386 // ── Read / admin ──
387
388 /// Snapshot an execution by id. `Ok(None)` ⇒ no such execution.
389 async fn describe_execution(
390 &self,
391 id: &ExecutionId,
392 ) -> Result<Option<ExecutionSnapshot>, EngineError>;
393
394 /// Point-read of the execution-scoped `(input_payload,
395 /// execution_kind, tags)` bundle used by the SDK worker when
396 /// assembling a `ClaimedTask` (see `ff_sdk::ClaimedTask`) after a
397 /// successful claim.
398 ///
399 /// No default impl — every `EngineBackend` must answer this
400 /// explicitly. Distinct from [`Self::describe_execution`]
401 /// (read-model projection) because the SDK needs the raw payload
402 /// bytes + kind + tags immediately post-claim, and the snapshot
403 /// projection deliberately omits the payload bytes.
404 ///
405 /// Per-backend shape:
406 ///
407 /// * **Valkey** — pipelined `GET :payload` + `HGETALL :core`
408 /// + `HGETALL :tags` on the execution's partition (same pattern
409 /// as [`Self::describe_execution`]).
410 /// * **Postgres** — single `SELECT payload, raw_fields` on
411 /// `ff_exec_core` keyed by `(partition_key, execution_id)`;
412 /// `execution_kind` + `tags` live in `raw_fields` JSONB.
413 /// * **SQLite** — identical shape to Postgres.
414 ///
415 /// Returns [`EngineError::Validation { kind: ValidationKind::InvalidInput, .. }`](crate::engine_error::EngineError::Validation)
416 /// when the execution does not exist — the SDK worker only calls
417 /// this after a successful claim, so a missing row is a loud
418 /// storage-tier invariant violation rather than a routine `Ok(None)`.
419 async fn read_execution_context(
420 &self,
421 execution_id: &ExecutionId,
422 ) -> Result<ExecutionContext, EngineError>;
423
424 /// Point-read of the execution's current attempt-index **pointer**
425 /// — the index of the currently-leased attempt row.
426 ///
427 /// Distinct from [`Self::read_total_attempt_count`]: this method
428 /// names the attempt that *already exists* (pointer), whereas
429 /// `read_total_attempt_count` is the monotonic claim counter used
430 /// to compute the next fresh attempt index. See the sibling's
431 /// rustdoc for the retry-path scenario that motivates the split.
432 ///
433 /// Used on the SDK worker's `claim_from_resume_grant` path —
434 /// specifically the private `claim_resumed_execution` helper —
435 /// immediately before dispatching [`Self::claim_resumed_execution`].
436 /// The returned index is fed into
437 /// [`ClaimResumedExecutionArgs::current_attempt_index`](crate::contracts::ClaimResumedExecutionArgs)
438 /// so the backend's script / transaction targets the correct
439 /// existing attempt row (KEYS[6] on Valkey; `ff_attempt` PK tuple
440 /// on PG/SQLite).
441 ///
442 /// Per-backend shape:
443 ///
444 /// * **Valkey** — `HGET {exec}:core current_attempt_index` on the
445 /// execution's partition. Single command. Both the
446 /// **missing-field** case (`exec_core` present but
447 /// `current_attempt_index` absent or empty-string, i.e. pre-claim
448 /// state) **and** the **missing-row** case (no `exec_core` hash
449 /// at all) read back as `AttemptIndex(0)`. This preserves the
450 /// pre-PR-3 inline-`HGET` semantic and is safe because Valkey's
451 /// happy path requires `exec_core` to exist before this method
452 /// is reached — the SDK only calls `read_current_attempt_index`
453 /// post-grant, and grant issuance is gated on `exec_core`
454 /// presence. A genuinely absent row would surface as the proper
455 /// business-logic error (`NotAResumedExecution` /
456 /// `ExecutionNotLeaseable`) on the downstream FCALL.
457 /// * **Postgres** — `SELECT attempt_index FROM ff_exec_core
458 /// WHERE partition_key = $1 AND execution_id = $2`. The column
459 /// is `NOT NULL DEFAULT 0` so a pre-claim row reads back as `0`
460 /// (matching Valkey's missing-field case). **Missing row**
461 /// surfaces as [`EngineError::Validation { kind:
462 /// ValidationKind::InvalidInput, .. }`](crate::engine_error::EngineError::Validation)
463 /// — diverges from Valkey's missing-row `→ 0` mapping.
464 /// * **SQLite** — `SELECT attempt_index FROM ff_exec_core
465 /// WHERE partition_key = ? AND execution_id = ?`; identical
466 /// semantics to Postgres (missing-row → `InvalidInput`).
467 ///
468 /// **Cross-backend asymmetry on missing row is intentional.** The
469 /// SDK happy path never observes it (grant issuance on Valkey
470 /// requires `exec_core`, and PG/SQLite currently return
471 /// `Unavailable` from `claim_from_grant` per
472 /// `project_claim_from_grant_pg_sqlite_gap.md`). Consumers writing
473 /// backend-agnostic tooling against this method directly must
474 /// treat the missing-row case as backend-dependent — match on
475 /// `InvalidInput` for PG/SQLite, and treat an unexpected `0` as
476 /// the Valkey equivalent signal.
477 ///
478 /// The default impl returns [`EngineError::Unavailable`] so the
479 /// trait addition is non-breaking for out-of-tree backends (same
480 /// precedent as [`Self::read_execution_context`] landing in v0.12
481 /// PR-1).
482 async fn read_current_attempt_index(
483 &self,
484 _execution_id: &ExecutionId,
485 ) -> Result<AttemptIndex, EngineError> {
486 Err(EngineError::Unavailable {
487 op: "read_current_attempt_index",
488 })
489 }
490
491 /// Point-read of the execution's **total attempt counter** — the
492 /// monotonic count of claims that have ever fired against this
493 /// execution (including the in-flight one once claimed).
494 ///
495 /// Used on the SDK worker's `claim_from_grant` / `claim_execution`
496 /// path — the next attempt-index for a fresh claim is this
497 /// counter's current value (so `1` on the second retry after the
498 /// first attempt failed terminally). This is semantically distinct
499 /// from [`Self::read_current_attempt_index`], which is a *pointer*
500 /// at the currently-leased attempt row and is only meaningful on
501 /// the `claim_from_resume_grant` path (where a live attempt already
502 /// exists and we want to re-seat its lease rather than mint a new
503 /// attempt row).
504 ///
505 /// Reading the pointer on the `claim_from_grant` path was a live
506 /// bug: on the retry-of-a-retry scenario the pointer still named
507 /// the *previous* terminal-failed attempt, so the newly-minted
508 /// attempt collided with it (Valkey KEYS[6]) or mis-targeted the
509 /// PG/SQLite `ff_attempt` PK tuple. This method fixes that by
510 /// reading the counter that Lua 5920 / PG `ff_claim_execution` /
511 /// SQLite `claim_impl` all already consult when computing the
512 /// next attempt index.
513 ///
514 /// Per-backend shape:
515 ///
516 /// * **Valkey** — `HGET {exec}:core total_attempt_count` on the
517 /// execution's partition. Single command; pre-claim read (field
518 /// absent or empty) maps to `0`.
519 /// * **Postgres** — `SELECT raw_fields->>'total_attempt_count'
520 /// FROM ff_exec_core WHERE (partition_key, execution_id) = ...`.
521 /// The field lives in the JSONB `raw_fields` bag rather than a
522 /// dedicated column (mirrors how `create_execution_impl` seeds
523 /// it on row creation). Missing row → `InvalidInput`; missing
524 /// field → `0`.
525 /// * **SQLite** — `SELECT CAST(json_extract(raw_fields,
526 /// '$.total_attempt_count') AS INTEGER) FROM ff_exec_core
527 /// WHERE ...`. Same JSON-in-`raw_fields` shape as PG; uses the
528 /// same `json_extract` idiom already employed in
529 /// `ff-backend-sqlite/src/queries/operator.rs` for replay_count.
530 ///
531 /// The default impl returns [`EngineError::Unavailable`] so the
532 /// trait addition is non-breaking for out-of-tree backends (same
533 /// precedent as [`Self::read_current_attempt_index`] landing in
534 /// v0.12 PR-3).
535 async fn read_total_attempt_count(
536 &self,
537 _execution_id: &ExecutionId,
538 ) -> Result<AttemptIndex, EngineError> {
539 Err(EngineError::Unavailable {
540 op: "read_total_attempt_count",
541 })
542 }
543
544 /// Snapshot a flow by id. `Ok(None)` ⇒ no such flow.
545 async fn describe_flow(&self, id: &FlowId) -> Result<Option<FlowSnapshot>, EngineError>;
546
547 // ── Namespaced tag point-writes / reads (issue #433) ──
548
549 /// Set a single namespaced tag on an execution. Tag `key` MUST match
550 /// the reserved caller-namespace pattern `^[a-z][a-z0-9_]*\.[a-z0-9_][a-z0-9_.]*$` —
551 /// i.e. `<caller>.<field>` — or the call returns
552 /// [`EngineError::Validation { kind: ValidationKind::InvalidInput, .. }`](crate::engine_error::EngineError::Validation)
553 /// with the offending key in `detail`. `value` is arbitrary UTF-8.
554 ///
555 /// The namespace prefix is carried inline in `key` (e.g.
556 /// `"cairn.session_id"`) — there is no separate `namespace` arg.
557 /// This matches the existing `ff_set_execution_tags` wire shape and
558 /// the flow-tag projection in [`ExecutionSnapshot::tags`].
559 ///
560 /// Validation is performed by each overriding backend impl via
561 /// [`validate_tag_key`] **before** the wire hop so PG / SQLite /
562 /// Valkey reject the same set of keys. The default trait impl
563 /// returns [`EngineError::Unavailable`] without running validation
564 /// — there is no meaningful storage to validate against on an
565 /// unsupported backend, and surfacing `Unavailable` before
566 /// `Validation` matches the precedence used elsewhere on the trait.
567 /// Backends MAY additionally validate on the storage tier (Valkey's
568 /// Lua path does, with a more permissive prefix-only check).
569 ///
570 /// Per-backend shape:
571 ///
572 /// * **Valkey** — `ff_set_execution_tags` FCALL with a single
573 /// `{key → value}` pair. Routes through the existing Lua
574 /// contract (no new wire format).
575 /// * **Postgres** — `UPDATE ff_exec_core SET raw_fields = jsonb_set(
576 /// coalesce(raw_fields, '{}'::jsonb), '{tags,<key>}', to_jsonb($value))
577 /// WHERE (partition_key, execution_id) = ...`. Same storage shape
578 /// read by [`Self::describe_execution`] / [`Self::read_execution_context`].
579 /// * **SQLite** — `UPDATE ff_exec_core SET raw_fields = json_set(
580 /// coalesce(raw_fields, '{}'), '$.tags."<key>"', $value) WHERE ...`.
581 /// The key is quoted in the JSON path so dots inside the
582 /// namespaced key (e.g. `cairn.session_id`) are treated as a
583 /// single literal member name rather than JSON-path separators —
584 /// yielding the same flat `raw_fields.tags` shape as PG.
585 ///
586 /// Missing execution surfaces as
587 /// [`EngineError::NotFound { entity: "execution" }`](crate::engine_error::EngineError::NotFound)
588 /// — matches the Valkey FCALL's `execution_not_found` mapping and
589 /// the existing `ScriptError::ExecutionNotFound` → `EngineError`
590 /// conversion (`ff_script::engine_error_ext`).
591 ///
592 /// The default impl returns [`EngineError::Unavailable`] so the
593 /// trait addition is non-breaking for out-of-tree backends.
594 async fn set_execution_tag(
595 &self,
596 _execution_id: &ExecutionId,
597 _key: &str,
598 _value: &str,
599 ) -> Result<(), EngineError> {
600 Err(EngineError::Unavailable {
601 op: "set_execution_tag",
602 })
603 }
604
605 /// Set a single namespaced tag on a flow. Same namespace rule as
606 /// [`Self::set_execution_tag`]: `key` MUST match
607 /// `^[a-z][a-z0-9_]*\.[a-z0-9_][a-z0-9_.]*$`.
608 ///
609 /// Per-backend shape:
610 ///
611 /// * **Valkey** — `ff_set_flow_tags` FCALL with a single pair.
612 /// Tags land on the dedicated `ff:flow:{fp:N}:<flow_id>:tags`
613 /// hash, not on the `flow_core` hash (diverges from the
614 /// execution shape — execution tags live on `ff:exec:...:tags`
615 /// by the same split). **Lazy migration on first write**: the
616 /// Lua (`ff_script::flowfabric.lua`, `ff_set_flow_tags`) scans
617 /// `flow_core` once per flow for pre-58.4 inline namespaced
618 /// fields (anything matching `^[a-z][a-z0-9_]*\.`), HSETs them
619 /// onto `:tags`, HDELs them from `flow_core`, and stamps
620 /// `tags_migrated=1` on `flow_core` so subsequent calls
621 /// short-circuit to O(1). This heals flows created before
622 /// RFC-058.4 landed; well-formed flows pay the migration cost
623 /// only on their very first tag write. Callers MUST read tags
624 /// via [`Self::get_flow_tag`] (`HGET :tags <key>`) — direct
625 /// `HGETALL` against `flow_core` will not see post-migration
626 /// values.
627 ///
628 /// **Cross-backend parity caveat on `describe_flow`**: the
629 /// pre-existing `ValkeyBackend::describe_flow` /
630 /// `FlowSnapshot::tags` read path snapshots `flow_core` fields
631 /// only and does NOT today merge the `:tags` sub-hash, whereas
632 /// Postgres `describe_flow` DOES surface flow tags via
633 /// `ff_backend_postgres::flow::extract_tags` (which reads them
634 /// off `raw_fields` — the same store `set_flow_tag` writes on
635 /// PG). Trait consumers MUST NOT assume a tag written here
636 /// will be visible via `describe_flow` on every backend: on
637 /// Valkey, callers that need the full tag set should
638 /// complement the snapshot with per-key [`Self::get_flow_tag`]
639 /// reads. Extending Valkey `describe_flow` to merge `:tags`
640 /// is additive and out of scope for this trait addition.
641 /// * **Postgres** — `UPDATE ff_flow_core SET raw_fields =
642 /// jsonb_set(..., '{<key>}', ...)` — flow tags are stored as
643 /// top-level `raw_fields` keys (matches
644 /// `ff_backend_postgres::flow::extract_tags`). No `tags` nesting
645 /// on flows, which diverges from the execution shape.
646 /// * **SQLite** — mirrors PG: `UPDATE ff_flow_core SET raw_fields =
647 /// json_set(..., '$."<key>"', $value) WHERE ...`. The key is
648 /// quoted so the dotted namespaced key lands as a single flat
649 /// top-level member of `raw_fields`.
650 ///
651 /// Missing flow surfaces as
652 /// [`EngineError::NotFound { entity: "flow" }`](crate::engine_error::EngineError::NotFound)
653 /// (matches the Valkey FCALL's `flow_not_found` mapping).
654 ///
655 /// The default impl returns [`EngineError::Unavailable`].
656 async fn set_flow_tag(
657 &self,
658 _flow_id: &FlowId,
659 _key: &str,
660 _value: &str,
661 ) -> Result<(), EngineError> {
662 Err(EngineError::Unavailable {
663 op: "set_flow_tag",
664 })
665 }
666
667 /// Read a single namespaced execution tag. Returns `Ok(None)` when
668 /// the tag is absent **or** the execution row does not exist —
669 /// the two cases are not distinguished on the read path. Callers
670 /// that need to distinguish should call [`Self::describe_execution`]
671 /// first (an `Ok(None)` from that method proves the execution is
672 /// absent). This matches Valkey's native `HGET` semantics and
673 /// keeps the read path at a single round-trip on every backend.
674 ///
675 /// `key` must pass [`validate_tag_key`] — a malformed key can
676 /// never be present in storage so the call short-circuits with
677 /// [`EngineError::Validation { kind: ValidationKind::InvalidInput, .. }`](crate::engine_error::EngineError::Validation)
678 /// rather than round-tripping.
679 ///
680 /// Per-backend shape:
681 ///
682 /// * **Valkey** — `HGET :tags <key>` on the execution's partition.
683 /// * **Postgres** — `SELECT raw_fields->'tags'->><key> FROM ff_exec_core
684 /// WHERE ...` with `fetch_optional` → missing row collapses to `None`.
685 /// * **SQLite** — `SELECT json_extract(raw_fields, '$.tags."<key>"')
686 /// FROM ff_exec_core WHERE ...` with the same collapse. The key is
687 /// quoted in the JSON path so dotted namespaced keys resolve to
688 /// the flat literal member written by `set_execution_tag`.
689 ///
690 /// The default impl returns [`EngineError::Unavailable`].
691 async fn get_execution_tag(
692 &self,
693 _execution_id: &ExecutionId,
694 _key: &str,
695 ) -> Result<Option<String>, EngineError> {
696 Err(EngineError::Unavailable {
697 op: "get_execution_tag",
698 })
699 }
700
701 /// Read an execution's `namespace` scalar. Returns `Ok(None)` when
702 /// the row is absent or the field is unset. Dedicated point-read
703 /// used by the scanner per-candidate filter (`should_skip_candidate`)
704 /// to preserve the 1-HGET cost contract documented in
705 /// `ff_engine::scanner::should_skip_candidate` — `describe_execution`
706 /// is heavier (HGETALL / full snapshot) and unnecessary when only
707 /// the namespace scalar is needed.
708 ///
709 /// Per-backend shape:
710 ///
711 /// * **Valkey** — `HGET :core namespace` on the execution's partition
712 /// (single field read on the already-hot exec_core hash).
713 /// * **Postgres** — `SELECT raw_fields->>'namespace' FROM ff_exec_core
714 /// WHERE partition_key = $1 AND execution_id = $2`.
715 /// * **SQLite** — `SELECT json_extract(raw_fields, '$.namespace')
716 /// FROM ff_exec_core WHERE ...`.
717 ///
718 /// The default impl returns [`EngineError::Unavailable`].
719 async fn get_execution_namespace(
720 &self,
721 _execution_id: &ExecutionId,
722 ) -> Result<Option<String>, EngineError> {
723 Err(EngineError::Unavailable {
724 op: "get_execution_namespace",
725 })
726 }
727
728 /// Read a single namespaced flow tag. Returns `Ok(None)` when
729 /// the tag is absent **or** the flow row does not exist (same
730 /// collapse semantics as [`Self::get_execution_tag`]). Symmetry
731 /// partner — consumers like cairn read `cairn.session_id` off
732 /// flows for archival.
733 ///
734 /// `key` must pass [`validate_tag_key`].
735 ///
736 /// Per-backend shape:
737 ///
738 /// * **Valkey** — `HGET :tags <key>` on the flow's partition.
739 /// * **Postgres** — `SELECT raw_fields->><key> FROM ff_flow_core
740 /// WHERE ...` (top-level `raw_fields` key, matches the flow-tag
741 /// storage shape).
742 /// * **SQLite** — `SELECT json_extract(raw_fields, '$."<key>"')
743 /// FROM ff_flow_core WHERE ...` (quoted key — see
744 /// `set_flow_tag`).
745 ///
746 /// The default impl returns [`EngineError::Unavailable`].
747 async fn get_flow_tag(
748 &self,
749 _flow_id: &FlowId,
750 _key: &str,
751 ) -> Result<Option<String>, EngineError> {
752 Err(EngineError::Unavailable {
753 op: "get_flow_tag",
754 })
755 }
756
757 /// List dependency edges adjacent to an execution. Read-only; the
758 /// backend resolves the subject execution's flow, reads the
759 /// direction-specific adjacency SET, and decodes each member's
760 /// flow-scoped `edge:<edge_id>` hash.
761 ///
762 /// Returns an empty `Vec` when the subject has no edges on the
763 /// requested side — including standalone executions (no owning
764 /// flow). Ordering is unspecified: the underlying adjacency SET
765 /// is an unordered SMEMBERS read. Callers that need deterministic
766 /// order should sort by [`EdgeSnapshot::edge_id`] /
767 /// [`EdgeSnapshot::created_at`] themselves.
768 ///
769 /// Parse failures on the edge hash surface as
770 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]
771 /// — unknown fields, missing required fields, endpoint mismatches
772 /// against the adjacency SET all fail loud rather than silently
773 /// returning partial results.
774 ///
775 /// Gated on the `core` feature — edge reads are part of the
776 /// minimal engine surface a Postgres-style backend must honour.
777 ///
778 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
779 #[cfg(feature = "core")]
780 async fn list_edges(
781 &self,
782 _flow_id: &FlowId,
783 _direction: EdgeDirection,
784 ) -> Result<Vec<EdgeSnapshot>, EngineError> {
785 Err(EngineError::Unavailable { op: "list_edges" })
786 }
787
788 /// Snapshot a single dependency edge by its owning flow + edge id.
789 ///
790 /// `Ok(None)` when the edge hash is absent (never staged, or
791 /// staged under a different flow than `flow_id`). Parse failures
792 /// on a present edge hash surface as
793 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]
794 /// — the stored `flow_id` field is cross-checked against the
795 /// caller's expected `flow_id` so a wrong-key read fails loud
796 /// rather than returning an unrelated edge.
797 ///
798 /// Gated on the `core` feature — single-edge reads are part of
799 /// the minimal snapshot surface an alternate backend must honour
800 /// alongside [`Self::describe_execution`] / [`Self::describe_flow`]
801 /// / [`Self::list_edges`].
802 ///
803 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
804 #[cfg(feature = "core")]
805 async fn describe_edge(
806 &self,
807 _flow_id: &FlowId,
808 _edge_id: &EdgeId,
809 ) -> Result<Option<EdgeSnapshot>, EngineError> {
810 Err(EngineError::Unavailable {
811 op: "describe_edge",
812 })
813 }
814
815 /// Resolve an execution's owning flow id, if any.
816 ///
817 /// `Ok(None)` when the execution's core record is absent or has
818 /// no associated flow (standalone execution). A present-but-
819 /// malformed `flow_id` field surfaces as
820 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`].
821 ///
822 /// Gated on the `core` feature. Used by ff-sdk's
823 /// `list_outgoing_edges` / `list_incoming_edges` to pivot from a
824 /// consumer-supplied `ExecutionId` to the `FlowId` required by
825 /// [`Self::list_edges`]. A Valkey backend serves this with a
826 /// single `HGET exec_core flow_id`; a Postgres backend serves it
827 /// with the equivalent single-column row lookup.
828 ///
829 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
830 #[cfg(feature = "core")]
831 async fn resolve_execution_flow_id(
832 &self,
833 _eid: &ExecutionId,
834 ) -> Result<Option<FlowId>, EngineError> {
835 Err(EngineError::Unavailable {
836 op: "resolve_execution_flow_id",
837 })
838 }
839
840 /// List flows on a partition with cursor-based pagination (issue
841 /// #185).
842 ///
843 /// Returns a [`ListFlowsPage`] of [`FlowSummary`](crate::contracts::FlowSummary)
844 /// rows ordered by `flow_id` (UUID byte-lexicographic). `cursor`
845 /// is `None` for the first page; callers forward the returned
846 /// `next_cursor` verbatim to continue iteration, and the listing
847 /// is exhausted when `next_cursor` is `None`. `limit` is the
848 /// maximum number of rows to return on this page — implementations
849 /// MAY return fewer (end of partition) but MUST NOT exceed it.
850 ///
851 /// Ordering rationale: flow ids are UUIDs, and both Valkey
852 /// (sort after-the-fact) and Postgres (`ORDER BY flow_id`) can
853 /// agree on byte-lexicographic order — the same order
854 /// `FlowId::to_string()` produces for canonical hyphenated UUIDs.
855 /// Mapping to `cursor > flow_id` keeps the contract backend-
856 /// independent.
857 ///
858 /// # Postgres implementation pattern
859 ///
860 /// A Postgres-backed implementation serves this directly with
861 ///
862 /// ```sql
863 /// SELECT flow_id, created_at_ms, public_flow_state
864 /// FROM ff_flow
865 /// WHERE partition_key = $1
866 /// AND ($2::uuid IS NULL OR flow_id > $2)
867 /// ORDER BY flow_id
868 /// LIMIT $3 + 1;
869 /// ```
870 ///
871 /// — reading one extra row to decide whether `next_cursor` should
872 /// be set to the last row's `flow_id`. The Valkey implementation
873 /// maintains the `ff:idx:{fp:N}:flow_index` SET and performs the
874 /// sort + slice client-side (SMEMBERS then sort-by-UUID-bytes),
875 /// pipelining `HGETALL flow_core` for each row on the page.
876 ///
877 /// Gated on the `core` feature — flow listing is part of the
878 /// minimal engine surface a Postgres-style backend must honour.
879 #[cfg(feature = "core")]
880 async fn list_flows(
881 &self,
882 _partition: PartitionKey,
883 _cursor: Option<FlowId>,
884 _limit: usize,
885 ) -> Result<ListFlowsPage, EngineError> {
886 Err(EngineError::Unavailable { op: "list_flows" })
887 }
888
889 /// Enumerate registered lanes with cursor-based pagination.
890 ///
891 /// Lanes are global (not partition-scoped) — the backend serves
892 /// this from its lane registry and does NOT accept a
893 /// [`crate::partition::Partition`] argument. Results are sorted
894 /// by [`LaneId`] name so the ordering is stable across calls and
895 /// cursors address a deterministic position in the sort.
896 ///
897 /// * `cursor` — exclusive lower bound. `None` starts from the
898 /// first lane. To continue a walk, pass the previous page's
899 /// [`ListLanesPage::next_cursor`].
900 /// * `limit` — hard cap on the number of lanes returned in the
901 /// page. Backends MAY round this down when the registry size
902 /// is smaller; they MUST NOT return more than `limit`.
903 ///
904 /// [`ListLanesPage::next_cursor`] is `Some(last_lane_in_page)`
905 /// iff at least one more lane exists after the returned page,
906 /// and `None` on the final page. Callers loop until `next_cursor`
907 /// is `None` to read the full registry.
908 ///
909 /// Gated on the `core` feature — lane enumeration is part of the
910 /// minimal snapshot surface an alternate backend must honour
911 /// alongside [`Self::describe_flow`] / [`Self::list_edges`].
912 #[cfg(feature = "core")]
913 async fn list_lanes(
914 &self,
915 _cursor: Option<LaneId>,
916 _limit: usize,
917 ) -> Result<ListLanesPage, EngineError> {
918 Err(EngineError::Unavailable { op: "list_lanes" })
919 }
920
921 /// List suspended executions in one partition, cursor-paginated,
922 /// with each entry's suspension `reason_code` populated (issue
923 /// #183).
924 ///
925 /// Consumer-facing "what's blocked on what?" panels (ff-board's
926 /// suspended-executions view, operator CLIs) need the reason in
927 /// the list response so the UI does not round-trip per row to
928 /// `describe_execution` for a field it knows it needs. `reason`
929 /// on [`SuspendedExecutionEntry`] carries the free-form
930 /// `suspension:current.reason_code` field — see the type rustdoc
931 /// for the String-not-enum rationale.
932 ///
933 /// `cursor` is opaque to callers; pass `None` to start a fresh
934 /// scan and feed the returned [`ListSuspendedPage::next_cursor`]
935 /// back in on subsequent pages until it comes back `None`.
936 /// `limit` bounds the `entries` count; backends MAY return fewer
937 /// when the partition is exhausted.
938 ///
939 /// Ordering is by ascending `suspended_at_ms` (the per-lane
940 /// suspended ZSET score == `timeout_at` or the no-timeout
941 /// sentinel) with execution id as a lex tiebreak, so cursor
942 /// continuation is deterministic across calls.
943 ///
944 /// Gated on the `core` feature — suspended-list enumeration is
945 /// part of the minimal engine surface a Postgres-style backend
946 /// must honour.
947 #[cfg(feature = "core")]
948 async fn list_suspended(
949 &self,
950 _partition: PartitionKey,
951 _cursor: Option<ExecutionId>,
952 _limit: usize,
953 ) -> Result<ListSuspendedPage, EngineError> {
954 Err(EngineError::Unavailable {
955 op: "list_suspended",
956 })
957 }
958
959 /// Forward-only paginated listing of the executions indexed under
960 /// one partition.
961 ///
962 /// Reads the partition-wide `ff:idx:{p:N}:all_executions` set,
963 /// sorts lexicographically on `ExecutionId`, and returns the page
964 /// of ids strictly greater than `cursor` (or starting from the
965 /// smallest id when `cursor = None`). The returned
966 /// [`ListExecutionsPage::next_cursor`] is the last id on the page
967 /// iff at least one more id exists past it; `None` signals
968 /// end-of-stream.
969 ///
970 /// `limit` is the maximum number of ids returned on this page. A
971 /// `limit` of `0` returns an empty page with `next_cursor = None`.
972 /// Backends MAY cap `limit` internally (Valkey: 1000) and return
973 /// fewer ids than requested; callers continue paginating until
974 /// `next_cursor == None`.
975 ///
976 /// Ordering is stable under concurrent inserts for already-emitted
977 /// ids (an id less-than-or-equal-to the caller's cursor is never
978 /// re-emitted in later pages) but new inserts past the cursor WILL
979 /// appear in subsequent pages — consistent with forward-only
980 /// cursor semantics.
981 ///
982 /// Gated on the `core` feature — partition-scoped listing is part
983 /// of the minimal engine surface every backend must honour.
984 #[cfg(feature = "core")]
985 async fn list_executions(
986 &self,
987 _partition: PartitionKey,
988 _cursor: Option<ExecutionId>,
989 _limit: usize,
990 ) -> Result<ListExecutionsPage, EngineError> {
991 Err(EngineError::Unavailable {
992 op: "list_executions",
993 })
994 }
995
996 // ── Trigger ops (issue #150) ──
997
998 /// Deliver an external signal to a suspended execution's waitpoint.
999 ///
1000 /// The backend atomically records the signal, evaluates the resume
1001 /// condition, and — when satisfied — transitions the execution
1002 /// from `suspended` to `runnable` (or buffers the signal when the
1003 /// waitpoint is still `pending`). Duplicate delivery — same
1004 /// `idempotency_key` + waitpoint — surfaces as
1005 /// [`DeliverSignalResult::Duplicate`] with the pre-existing
1006 /// `signal_id` rather than mutating state twice.
1007 ///
1008 /// Input validation (HMAC token presence, payload size limits,
1009 /// signal-name shape) is the backend's responsibility; callers
1010 /// pass a fully populated [`DeliverSignalArgs`] and receive typed
1011 /// outcomes or typed errors (`ScriptError::invalid_token`,
1012 /// `ScriptError::token_expired`, `ScriptError::ExecutionNotFound`
1013 /// surfaced via [`EngineError::Transport`] on the Valkey backend).
1014 ///
1015 /// Gated on the `core` feature — signal delivery is part of the
1016 /// minimal trigger surface every backend must honour so ff-server
1017 /// / REST handlers can dispatch against `Arc<dyn EngineBackend>`
1018 /// without knowing which backend is running underneath.
1019 #[cfg(feature = "core")]
1020 async fn deliver_signal(
1021 &self,
1022 _args: DeliverSignalArgs,
1023 ) -> Result<DeliverSignalResult, EngineError> {
1024 Err(EngineError::Unavailable {
1025 op: "deliver_signal",
1026 })
1027 }
1028
1029 /// Claim a resumed execution — a previously-suspended attempt that
1030 /// has cleared its resume condition (e.g. via
1031 /// [`Self::deliver_signal`]) and now needs a worker to pick up the
1032 /// same attempt index.
1033 ///
1034 /// Distinct from [`Self::claim`] (fresh work) and
1035 /// [`Self::claim_from_resume_grant`] (grant-based ownership transfer
1036 /// after a crash): the resumed-claim path re-binds an existing
1037 /// attempt rather than minting a new one. The backend issues a
1038 /// fresh `lease_id` + bumps the `lease_epoch`, preserving
1039 /// `attempt_id` / `attempt_index` so stream frames and progress
1040 /// updates continue on the same attempt.
1041 ///
1042 /// Typed failures surface via `ScriptError` → `EngineError`:
1043 /// `NotAResumedExecution` when the attempt state is not
1044 /// `attempt_interrupted`, `ExecutionNotLeaseable` when the
1045 /// lifecycle phase is not `runnable`, and `InvalidClaimGrant`
1046 /// when the grant key is missing or was already consumed.
1047 ///
1048 /// Gated on the `core` feature — resumed-claim is part of the
1049 /// minimal trigger surface every backend must honour.
1050 #[cfg(feature = "core")]
1051 async fn claim_resumed_execution(
1052 &self,
1053 _args: ClaimResumedExecutionArgs,
1054 ) -> Result<ClaimResumedExecutionResult, EngineError> {
1055 Err(EngineError::Unavailable {
1056 op: "claim_resumed_execution",
1057 })
1058 }
1059
1060 /// Scan a lane's eligible ZSET on one partition for
1061 /// highest-priority executions awaiting a worker (v0.12 PR-5).
1062 ///
1063 /// Lifted from the SDK-side `ZRANGEBYSCORE` inline on
1064 /// `FlowFabricWorker::claim_next` — the scheduler-bypass scanner
1065 /// gated behind `direct-valkey-claim`. The trait method itself is
1066 /// backend-agnostic; consumers that drive the scanner loop
1067 /// (bench harnesses, single-tenant dev) compose it with
1068 /// [`Self::issue_claim_grant`] + [`Self::claim_execution`] to
1069 /// replicate the pre-PR-5 `claim_next` body.
1070 ///
1071 /// # Backend coverage
1072 ///
1073 /// * **Valkey** — `ZRANGEBYSCORE eligible_zset -inf +inf LIMIT 0 <limit>`
1074 /// on the lane's partition-scoped eligible key. Single
1075 /// command; no script round-trip. Wire shape is byte-for-byte
1076 /// identical to the pre-PR SDK inline call so bench traces
1077 /// match pre-PR without new `#[tracing::instrument]` span names.
1078 /// * **Postgres / SQLite** — use the `Err(Unavailable)` default.
1079 /// PG/SQLite consumers drive work through the scheduler-routed
1080 /// [`Self::claim_for_worker`] path instead of the scanner
1081 /// primitives exposed here; lifting the scheduler itself onto
1082 /// the trait is RFC-024 follow-up scope. See
1083 /// `project_claim_from_grant_pg_sqlite_gap.md` for motivation.
1084 ///
1085 /// Default impl returns [`EngineError::Unavailable`] so the trait
1086 /// addition is non-breaking for out-of-tree backends. Same
1087 /// precedent as [`Self::claim_execution`] landing in v0.12 PR-4.
1088 #[cfg(feature = "core")]
1089 async fn scan_eligible_executions(
1090 &self,
1091 _args: ScanEligibleArgs,
1092 ) -> Result<Vec<ExecutionId>, EngineError> {
1093 Err(EngineError::Unavailable {
1094 op: "scan_eligible_executions",
1095 })
1096 }
1097
1098 /// Issue a claim grant — the scheduler's admission write — for a
1099 /// single execution on a single lane (v0.12 PR-5).
1100 ///
1101 /// Lifted from the SDK-side `ff_issue_claim_grant` inline helper
1102 /// on `FlowFabricWorker::claim_next`. The backend atomically
1103 /// writes the grant hash, appends to the per-worker grant index,
1104 /// and removes the execution from the lane's eligible ZSET.
1105 ///
1106 /// Typed rejects surface via [`EngineError::Validation`]:
1107 /// `CapabilityMismatch` when the worker's capabilities do not
1108 /// cover the execution's `required_capabilities`, `InvalidInput`
1109 /// for malformed args. Transport faults surface via
1110 /// [`EngineError::Transport`].
1111 ///
1112 /// # Backend coverage
1113 ///
1114 /// * **Valkey** — one `ff_issue_claim_grant` FCALL. KEYS/ARGV
1115 /// shape is byte-for-byte identical to the pre-PR SDK inline
1116 /// call; bench traces match pre-PR.
1117 /// * **Postgres / SQLite** — `Err(Unavailable)` default; use
1118 /// [`Self::claim_for_worker`] instead. See
1119 /// [`Self::scan_eligible_executions`] for the cross-link
1120 /// rationale.
1121 #[cfg(feature = "core")]
1122 async fn issue_claim_grant(
1123 &self,
1124 _args: IssueClaimGrantArgs,
1125 ) -> Result<IssueClaimGrantOutcome, EngineError> {
1126 Err(EngineError::Unavailable {
1127 op: "issue_claim_grant",
1128 })
1129 }
1130
1131 /// Move an execution from a lane's eligible ZSET into its
1132 /// blocked_route ZSET (v0.12 PR-5).
1133 ///
1134 /// Lifted from the SDK-side `ff_block_execution_for_admission`
1135 /// inline helper on `FlowFabricWorker::claim_next`. Called after
1136 /// a [`Self::issue_claim_grant`] `CapabilityMismatch` reject —
1137 /// without a block step, the inline scanner would re-pick the
1138 /// same top-of-ZSET every tick (parity with
1139 /// `ff-scheduler::Scheduler::block_candidate`).
1140 ///
1141 /// The engine's unblock scanner periodically promotes
1142 /// blocked_route back to eligible once a worker with matching
1143 /// caps registers.
1144 ///
1145 /// # Backend coverage
1146 ///
1147 /// * **Valkey** — one `ff_block_execution_for_admission` FCALL.
1148 /// * **Postgres / SQLite** — `Err(Unavailable)` default; the
1149 /// scheduler-routed [`Self::claim_for_worker`] path handles
1150 /// admission rejects server-side.
1151 #[cfg(feature = "core")]
1152 async fn block_route(
1153 &self,
1154 _args: BlockRouteArgs,
1155 ) -> Result<BlockRouteOutcome, EngineError> {
1156 Err(EngineError::Unavailable { op: "block_route" })
1157 }
1158
1159 /// Consume a scheduler-issued claim grant to mint a fresh attempt.
1160 ///
1161 /// The SDK's grant-consumer path — paired with `FlowFabricWorker::claim_from_grant`
1162 /// in `ff-sdk` — routes through this method. The scheduler has
1163 /// already validated budget / quota / capabilities and written a
1164 /// grant (Valkey `claim_grant` hash); this call atomically
1165 /// consumes that grant and creates the attempt row, mints
1166 /// `lease_id` + `lease_epoch`, and returns a
1167 /// [`ClaimExecutionResult::Claimed`] carrying the minted lease
1168 /// triple.
1169 ///
1170 /// Distinct from [`Self::claim`] (the scheduler-bypass scanner
1171 /// used by the `direct-valkey-claim` feature) — this method
1172 /// assumes the grant already exists and skips capability / ZSET
1173 /// scanning. The Valkey impl fires exactly one `ff_claim_execution`
1174 /// FCALL.
1175 ///
1176 /// Typed failures surface via `ScriptError` → `EngineError`:
1177 /// `UseClaimResumedExecution` when the attempt is actually
1178 /// `attempt_interrupted` (caller should retry via
1179 /// [`Self::claim_resumed_execution`] — see `ContentionKind` at
1180 /// `ff_core::engine_error`), `InvalidClaimGrant` when the grant is
1181 /// missing / consumed / worker-mismatched, `CapabilityMismatch`
1182 /// when the execution's `required_capabilities` drifted after
1183 /// grant issuance.
1184 ///
1185 /// # Backend coverage
1186 ///
1187 /// * **Valkey** — implemented in `ff-backend-valkey` (one
1188 /// `ff_claim_execution` FCALL).
1189 /// * **Postgres / SQLite** — use the `Err(Unavailable)` default in
1190 /// this PR. Grants on PG / SQLite today flow through
1191 /// `PostgresScheduler::claim_for_worker` (a sibling struct, not
1192 /// an `EngineBackend` method); wiring the default-over-trait
1193 /// behaviour into a PG / SQLite `claim_execution` impl lands
1194 /// with a future RFC-024 grant-consumer extension.
1195 ///
1196 /// The default impl returns [`EngineError::Unavailable`] so the
1197 /// trait addition is non-breaking for out-of-tree backends. Same
1198 /// precedent as [`Self::read_current_attempt_index`] landing in
1199 /// v0.12 PR-3.
1200 #[cfg(feature = "core")]
1201 async fn claim_execution(
1202 &self,
1203 _args: ClaimExecutionArgs,
1204 ) -> Result<ClaimExecutionResult, EngineError> {
1205 Err(EngineError::Unavailable {
1206 op: "claim_execution",
1207 })
1208 }
1209
1210 /// Operator-initiated cancellation of a flow and (optionally) its
1211 /// member executions. See RFC-012 §3.1.1 for the policy /wait
1212 /// matrix.
1213 async fn cancel_flow(
1214 &self,
1215 id: &FlowId,
1216 policy: CancelFlowPolicy,
1217 wait: CancelFlowWait,
1218 ) -> Result<CancelFlowResult, EngineError>;
1219
1220 /// RFC-016 Stage A: set the inbound-edge-group policy for a
1221 /// downstream execution. Must be called before the first
1222 /// `add_dependency(... -> downstream_execution_id)` — the backend
1223 /// rejects with [`EngineError::Conflict`] if edges have already
1224 /// been staged for this group.
1225 ///
1226 /// Stage A honours only
1227 /// [`EdgeDependencyPolicy::AllOf`](crate::contracts::EdgeDependencyPolicy::AllOf);
1228 /// the `AnyOf` / `Quorum` variants return
1229 /// [`EngineError::Validation`] with
1230 /// `detail = "stage A supports AllOf only; AnyOf/Quorum land in stage B"`
1231 /// until Stage B's resolver lands.
1232 #[cfg(feature = "core")]
1233 async fn set_edge_group_policy(
1234 &self,
1235 _flow_id: &FlowId,
1236 _downstream_execution_id: &ExecutionId,
1237 _policy: crate::contracts::EdgeDependencyPolicy,
1238 ) -> Result<crate::contracts::SetEdgeGroupPolicyResult, EngineError> {
1239 Err(EngineError::Unavailable {
1240 op: "set_edge_group_policy",
1241 })
1242 }
1243
1244 // ── HMAC secret rotation (v0.7 migration-master Q4) ──
1245
1246 /// Rotate the waitpoint HMAC signing kid **cluster-wide**.
1247 ///
1248 /// **v0.7 migration-master Q4 (adjudicated 2026-04-24).**
1249 /// Additive trait surface so Valkey and Postgres backends can
1250 /// both expose the "rotate everywhere" semantic under one name.
1251 ///
1252 /// * Valkey impl fans out an `ff_rotate_waitpoint_hmac_secret`
1253 /// FCALL per execution partition. `entries.len() == num_flow_partitions`
1254 /// and per-partition failures are surfaced as inner `Err`
1255 /// entries — the call as a whole does not fail when one
1256 /// partition's FCALL fails, matching
1257 /// [`ff_sdk::admin::rotate_waitpoint_hmac_secret_all_partitions`]'s
1258 /// partial-success contract.
1259 /// * Postgres impl (Wave 4) writes one row to
1260 /// `ff_waitpoint_hmac(kid, secret, rotated_at)` and returns a
1261 /// single-entry vec with `partition = 0`.
1262 ///
1263 /// The default impl returns
1264 /// [`EngineError::Unavailable`] with
1265 /// `op = "rotate_waitpoint_hmac_secret_all"` so backends that
1266 /// haven't implemented the method surface the miss loudly rather
1267 /// than silently no-op'ing. Both concrete backends override.
1268 async fn rotate_waitpoint_hmac_secret_all(
1269 &self,
1270 _args: RotateWaitpointHmacSecretAllArgs,
1271 ) -> Result<RotateWaitpointHmacSecretAllResult, EngineError> {
1272 Err(EngineError::Unavailable {
1273 op: "rotate_waitpoint_hmac_secret_all",
1274 })
1275 }
1276
1277 /// Seed the initial waitpoint HMAC secret for a fresh deployment
1278 /// (issue #280).
1279 ///
1280 /// **Idempotent.** If a `current_kid` (Valkey per-partition) or
1281 /// an active kid row (Postgres) already exists with the given
1282 /// `kid`, the method returns
1283 /// [`SeedOutcome::AlreadySeeded`] without overwriting, reporting
1284 /// whether the stored secret matches the caller-supplied one via
1285 /// `same_secret`. Callers (cairn boot, operator tooling) invoke
1286 /// this on every boot and let the backend decide whether to
1287 /// install — removing the client-side "check then HSET" race that
1288 /// cairn's raw-HSET boot path silently tolerated.
1289 ///
1290 /// For rotation of an already-seeded secret, use
1291 /// [`Self::rotate_waitpoint_hmac_secret_all`] instead; seed is
1292 /// install-only.
1293 ///
1294 /// The default impl returns [`EngineError::Unavailable`] with
1295 /// `op = "seed_waitpoint_hmac_secret"` so backends that haven't
1296 /// implemented the method surface the miss loudly.
1297 async fn seed_waitpoint_hmac_secret(
1298 &self,
1299 _args: SeedWaitpointHmacSecretArgs,
1300 ) -> Result<SeedOutcome, EngineError> {
1301 Err(EngineError::Unavailable {
1302 op: "seed_waitpoint_hmac_secret",
1303 })
1304 }
1305
1306 // ── Budget ──
1307
1308 /// Report usage against a budget and check limits. Returns the
1309 /// typed [`ReportUsageResult`] variant; backends enforce
1310 /// idempotency via the caller-supplied
1311 /// [`UsageDimensions::dedup_key`] (RFC-012 §R7.2.3 — replaces
1312 /// the pre-Round-7 `AdmissionDecision` return).
1313 async fn report_usage(
1314 &self,
1315 handle: &Handle,
1316 budget: &BudgetId,
1317 dimensions: crate::backend::UsageDimensions,
1318 ) -> Result<ReportUsageResult, EngineError>;
1319
1320 // ── Stream reads (RFC-012 Stage 1c tranche-4; issue #87) ──
1321
1322 /// Read frames from a completed or in-flight attempt's stream.
1323 ///
1324 /// `from` / `to` are [`StreamCursor`] values — `StreamCursor::Start`
1325 /// / `StreamCursor::End` are equivalent to XRANGE `-` / `+`, and
1326 /// `StreamCursor::At("<id>")` reads from a concrete entry id.
1327 ///
1328 /// Input validation (count_limit bounds, cursor shape) is the
1329 /// caller's responsibility — SDK-side wrappers in
1330 /// [`ff-sdk`](https://docs.rs/ff-sdk) enforce bounds before
1331 /// forwarding. Backends MAY additionally reject out-of-range
1332 /// input via [`EngineError::Validation`].
1333 ///
1334 /// Gated on the `streaming` feature — stream reads are part of
1335 /// the stream-subset surface a backend without XREAD-like
1336 /// primitives may omit.
1337 #[cfg(feature = "streaming")]
1338 async fn read_stream(
1339 &self,
1340 _execution_id: &ExecutionId,
1341 _attempt_index: AttemptIndex,
1342 _from: StreamCursor,
1343 _to: StreamCursor,
1344 _count_limit: u64,
1345 ) -> Result<StreamFrames, EngineError> {
1346 Err(EngineError::Unavailable { op: "read_stream" })
1347 }
1348
1349 /// Tail a live attempt's stream.
1350 ///
1351 /// `after` is an exclusive [`StreamCursor`] — entries with id
1352 /// strictly greater than `after` are returned. `StreamCursor::Start`
1353 /// / `StreamCursor::End` are NOT accepted here; callers MUST pass
1354 /// a concrete id (or `StreamCursor::from_beginning()`). The SDK
1355 /// wrapper rejects the open markers before reaching the backend.
1356 ///
1357 /// `block_ms == 0` → non-blocking peek. `block_ms > 0` → blocks up
1358 /// to that many ms for a new entry.
1359 ///
1360 /// `visibility` (RFC-015 §6.1) filters the returned entries by
1361 /// their stored [`StreamMode`](crate::backend::StreamMode)
1362 /// `mode` field. Default
1363 /// [`TailVisibility::All`](crate::backend::TailVisibility::All)
1364 /// preserves v1 behaviour.
1365 ///
1366 /// Gated on the `streaming` feature — see [`read_stream`](Self::read_stream).
1367 #[cfg(feature = "streaming")]
1368 async fn tail_stream(
1369 &self,
1370 _execution_id: &ExecutionId,
1371 _attempt_index: AttemptIndex,
1372 _after: StreamCursor,
1373 _block_ms: u64,
1374 _count_limit: u64,
1375 _visibility: TailVisibility,
1376 ) -> Result<StreamFrames, EngineError> {
1377 Err(EngineError::Unavailable { op: "tail_stream" })
1378 }
1379
1380 /// Read the rolling summary document for an attempt (RFC-015 §6.3).
1381 ///
1382 /// Returns `Ok(None)` when no [`StreamMode::DurableSummary`](crate::backend::StreamMode::DurableSummary)
1383 /// frame has ever been appended for the attempt. Non-blocking Hash
1384 /// read; safe to call from any consumer without holding the lease.
1385 ///
1386 /// Gated on the `streaming` feature — summary reads are part of
1387 /// the stream-subset surface.
1388 #[cfg(feature = "streaming")]
1389 async fn read_summary(
1390 &self,
1391 _execution_id: &ExecutionId,
1392 _attempt_index: AttemptIndex,
1393 ) -> Result<Option<SummaryDocument>, EngineError> {
1394 Err(EngineError::Unavailable {
1395 op: "read_summary",
1396 })
1397 }
1398
1399 // ── RFC-017 Stage A — Ingress (5) ──────────────────────────
1400 //
1401 // Every method in this block has a default impl returning
1402 // `EngineError::Unavailable { op }` per RFC-017 §5.3. Concrete
1403 // backends override each method with a real body. A missing
1404 // override surfaces as a loud typed error at the call site rather
1405 // than a silent no-op.
1406
1407 /// Create an execution. Ingress row 6 (RFC-017 §4). Wraps
1408 /// `ff_create_execution` on Valkey; `INSERT INTO ff_execution ...`
1409 /// on Postgres. The `idempotency_key` + backend-side default
1410 /// `dedup_ttl_ms = 86400000` make duplicate submissions idempotent.
1411 #[cfg(feature = "core")]
1412 async fn create_execution(
1413 &self,
1414 _args: CreateExecutionArgs,
1415 ) -> Result<CreateExecutionResult, EngineError> {
1416 Err(EngineError::Unavailable {
1417 op: "create_execution",
1418 })
1419 }
1420
1421 /// Create a flow header. Ingress row 5.
1422 #[cfg(feature = "core")]
1423 async fn create_flow(
1424 &self,
1425 _args: CreateFlowArgs,
1426 ) -> Result<CreateFlowResult, EngineError> {
1427 Err(EngineError::Unavailable { op: "create_flow" })
1428 }
1429
1430 /// Atomically add an execution to a flow (single-FCALL co-located
1431 /// commit on Valkey; single-transaction UPSERT on Postgres).
1432 #[cfg(feature = "core")]
1433 async fn add_execution_to_flow(
1434 &self,
1435 _args: AddExecutionToFlowArgs,
1436 ) -> Result<AddExecutionToFlowResult, EngineError> {
1437 Err(EngineError::Unavailable {
1438 op: "add_execution_to_flow",
1439 })
1440 }
1441
1442 /// Stage a dependency edge between flow members. CAS-guarded on
1443 /// `graph_revision` — stale rev returns `Contention(StaleGraphRevision)`.
1444 #[cfg(feature = "core")]
1445 async fn stage_dependency_edge(
1446 &self,
1447 _args: StageDependencyEdgeArgs,
1448 ) -> Result<StageDependencyEdgeResult, EngineError> {
1449 Err(EngineError::Unavailable {
1450 op: "stage_dependency_edge",
1451 })
1452 }
1453
1454 /// Apply a staged dependency edge to its downstream child.
1455 #[cfg(feature = "core")]
1456 async fn apply_dependency_to_child(
1457 &self,
1458 _args: ApplyDependencyToChildArgs,
1459 ) -> Result<ApplyDependencyToChildResult, EngineError> {
1460 Err(EngineError::Unavailable {
1461 op: "apply_dependency_to_child",
1462 })
1463 }
1464
1465 /// Resolve one dependency edge after its upstream reached a
1466 /// terminal outcome — satisfy on "success", mark impossible
1467 /// otherwise. Idempotent (`AlreadyResolved` on replay).
1468 ///
1469 /// PR-7b Step 0 overlap-resolver: lifted here so cluster 2
1470 /// (`scanner/dependency_reconciler`) could trait-route through
1471 /// `Arc<dyn EngineBackend>` without a merge conflict with cluster
1472 /// 4. Cluster 4 (`completion_listener::spawn_dispatch_loop`)
1473 /// ultimately routed through the coarser
1474 /// [`Self::cascade_completion`] (per-payload) instead of looping
1475 /// over this per-edge method, because the Postgres cascade is
1476 /// outbox-driven rather than per-edge — see `cascade_completion`
1477 /// rustdoc "Timing semantics" for details.
1478 ///
1479 /// # Backend status
1480 ///
1481 /// - **Valkey:** wraps `ff_resolve_dependency` (RFC-016 Stage C
1482 /// signature). Atomic single-slot FCALL.
1483 /// - **Postgres:** `Unavailable`. PG's post-completion cascade is
1484 /// not per-edge; it runs via
1485 /// `ff_backend_postgres::dispatch::dispatch_completion(event_id)`
1486 /// keyed on the `ff_completion_event` outbox row. The Valkey-
1487 /// shaped per-edge resolve does not map cleanly to that model;
1488 /// PG's `dependency_reconciler` already calls `dispatch_completion`
1489 /// directly. The engine's PR-7b/final integration test expects
1490 /// `Unsupported` logs from Valkey-shaped scanners on a PG
1491 /// deployment — this surface honours that contract.
1492 /// - **SQLite:** `Unavailable` for the same reason (mirrors PG).
1493 ///
1494 /// The default impl returns [`EngineError::Unavailable`] so a
1495 /// backend that has not been migrated surfaces a typed
1496 /// `Unsupported`-grade error rather than a panic.
1497 #[cfg(feature = "core")]
1498 async fn resolve_dependency(
1499 &self,
1500 _args: crate::contracts::ResolveDependencyArgs,
1501 ) -> Result<crate::contracts::ResolveDependencyOutcome, EngineError> {
1502 Err(EngineError::Unavailable {
1503 op: "resolve_dependency",
1504 })
1505 }
1506
1507 /// Cascade a terminal-execution completion into its downstream
1508 /// edges. Consumed by
1509 /// `ff-engine::completion_listener::spawn_dispatch_loop` (PR-7b
1510 /// Cluster 4) to trait-route the post-completion DAG-promotion
1511 /// path through `Arc<dyn EngineBackend>`.
1512 ///
1513 /// Distinct from [`Self::resolve_dependency`]: that method is
1514 /// per-edge (one `ff_resolve_dependency` FCALL); this method is
1515 /// per-completion and orchestrates the full outgoing-edge walk
1516 /// plus `child_skipped` recursion (Valkey) or outbox-event
1517 /// dispatch (Postgres).
1518 ///
1519 /// # Timing semantics
1520 ///
1521 /// Backends diverge on *when* the caller observes cascade work.
1522 /// See [`CascadeOutcome`] for the full contract; the short form:
1523 ///
1524 /// - **Valkey:** synchronous. FCALL-driven walk completes inline;
1525 /// `child_skipped` descendants are recursively cascaded up to the
1526 /// internal `MAX_CASCADE_DEPTH` cap before return.
1527 /// - **Postgres:** asynchronous via the `ff_completion_event`
1528 /// outbox. The call resolves `payload` to its `event_id`, runs
1529 /// `ff_backend_postgres::dispatch::dispatch_completion`, and
1530 /// returns when the outbox row has been claimed + its direct
1531 /// hops advanced. Further-descendant cascades ride their own
1532 /// outbox events (emitted by the per-hop tx) — NOT this call.
1533 ///
1534 /// Consumers that depend on synchronous cascade must either target
1535 /// Valkey explicitly or observe PG's `dispatched_at_ms` clearance
1536 /// via the `dependency_reconciler` partial index to verify drain.
1537 ///
1538 /// The default impl returns [`EngineError::Unavailable`] so
1539 /// backends that have not been migrated surface a typed error
1540 /// rather than a panic.
1541 ///
1542 /// [`CascadeOutcome`]: crate::contracts::CascadeOutcome
1543 #[cfg(feature = "core")]
1544 async fn cascade_completion(
1545 &self,
1546 _payload: &crate::backend::CompletionPayload,
1547 ) -> Result<crate::contracts::CascadeOutcome, EngineError> {
1548 Err(EngineError::Unavailable {
1549 op: "cascade_completion",
1550 })
1551 }
1552
1553 // ── RFC-017 Stage A — Operator control (4) ─────────────────
1554
1555 /// Operator-initiated execution cancel (row 2).
1556 #[cfg(feature = "core")]
1557 async fn cancel_execution(
1558 &self,
1559 _args: CancelExecutionArgs,
1560 ) -> Result<CancelExecutionResult, EngineError> {
1561 Err(EngineError::Unavailable {
1562 op: "cancel_execution",
1563 })
1564 }
1565
1566 /// Re-score an execution's eligibility priority (row 17).
1567 #[cfg(feature = "core")]
1568 async fn change_priority(
1569 &self,
1570 _args: ChangePriorityArgs,
1571 ) -> Result<ChangePriorityResult, EngineError> {
1572 Err(EngineError::Unavailable {
1573 op: "change_priority",
1574 })
1575 }
1576
1577 /// Replay a terminal execution (row 22). Variadic KEYS handling
1578 /// (inbound-edge pre-read) is hidden inside the Valkey impl per
1579 /// RFC-017 §4 row 3.
1580 #[cfg(feature = "core")]
1581 async fn replay_execution(
1582 &self,
1583 _args: ReplayExecutionArgs,
1584 ) -> Result<ReplayExecutionResult, EngineError> {
1585 Err(EngineError::Unavailable {
1586 op: "replay_execution",
1587 })
1588 }
1589
1590 /// Operator-initiated lease revoke (row 19).
1591 #[cfg(feature = "core")]
1592 async fn revoke_lease(
1593 &self,
1594 _args: RevokeLeaseArgs,
1595 ) -> Result<RevokeLeaseResult, EngineError> {
1596 Err(EngineError::Unavailable { op: "revoke_lease" })
1597 }
1598
1599 // ── cairn #389 — service-layer typed FCALL surface ────────────
1600 //
1601 // These methods mirror `complete`/`fail`/`renew` (which take a
1602 // worker [`Handle`]) but dispatch against `(execution_id, fence)`
1603 // tuples supplied directly. Service-layer callers (cairn's
1604 // `valkey_control_plane_impl.rs`, future consumers that hold a
1605 // run/lease descriptor without a `Handle`) use these to avoid
1606 // going through the raw `ferriskey::Value` FCALL escape hatch.
1607 //
1608 // Same shape / same precedent as `suspend_by_triple` (cairn #322).
1609 //
1610 // Default impl returns [`EngineError::Unavailable`] so the trait
1611 // addition is non-breaking for out-of-tree backends. The in-tree
1612 // Valkey backend overrides; Postgres + SQLite keep the default
1613 // until follow-up parity work lands (consistent with
1614 // `issue_reclaim_grant` / `reclaim_execution` precedent).
1615
1616 /// Service-layer `complete_execution` — peer of [`Self::complete`]
1617 /// that takes a fence triple instead of a worker [`Handle`]. See
1618 /// the group preamble above for cairn-migration context.
1619 #[cfg(feature = "core")]
1620 async fn complete_execution(
1621 &self,
1622 _args: CompleteExecutionArgs,
1623 ) -> Result<CompleteExecutionResult, EngineError> {
1624 Err(EngineError::Unavailable {
1625 op: "complete_execution",
1626 })
1627 }
1628
1629 /// Service-layer `fail_execution` — peer of [`Self::fail`] that
1630 /// takes a fence triple instead of a worker [`Handle`].
1631 #[cfg(feature = "core")]
1632 async fn fail_execution(
1633 &self,
1634 _args: FailExecutionArgs,
1635 ) -> Result<FailExecutionResult, EngineError> {
1636 Err(EngineError::Unavailable {
1637 op: "fail_execution",
1638 })
1639 }
1640
1641 /// Service-layer `renew_lease` — peer of [`Self::renew`] that
1642 /// takes a fence triple instead of a worker [`Handle`].
1643 #[cfg(feature = "core")]
1644 async fn renew_lease(
1645 &self,
1646 _args: RenewLeaseArgs,
1647 ) -> Result<RenewLeaseResult, EngineError> {
1648 Err(EngineError::Unavailable { op: "renew_lease" })
1649 }
1650
1651 /// Service-layer `resume_execution` — transitions a suspended
1652 /// execution back to runnable. Distinct from
1653 /// [`Self::claim_from_resume_grant`] (which mints a worker handle
1654 /// against an already-eligible resumed execution): this method is
1655 /// the lifecycle transition primitive the control plane calls
1656 /// when an operator / auto-resume policy moves a suspended
1657 /// execution forward.
1658 ///
1659 /// The Valkey impl pre-reads `current_waitpoint_id` + `lane_id`
1660 /// from `exec_core` so callers only need the execution id + the
1661 /// trigger type — same ergonomics as `revoke_lease` reading
1662 /// `current_worker_instance_id` when callers omit it.
1663 #[cfg(feature = "core")]
1664 async fn resume_execution(
1665 &self,
1666 _args: ResumeExecutionArgs,
1667 ) -> Result<ResumeExecutionResult, EngineError> {
1668 Err(EngineError::Unavailable {
1669 op: "resume_execution",
1670 })
1671 }
1672
1673 /// Service-layer `check_admission_and_record` — atomic admission
1674 /// check against a quota policy. Callers supply the policy id +
1675 /// dimension (quota keys live on their own `{q:<policy>}`
1676 /// partition that cannot be derived from `execution_id`, so these
1677 /// travel outside [`CheckAdmissionArgs`]). `dimension` defaults
1678 /// to `"default"` inside the Valkey body when the caller passes
1679 /// an empty string — matches cairn's pre-migration default.
1680 #[cfg(feature = "core")]
1681 async fn check_admission(
1682 &self,
1683 _quota_policy_id: &crate::types::QuotaPolicyId,
1684 _dimension: &str,
1685 _args: CheckAdmissionArgs,
1686 ) -> Result<CheckAdmissionResult, EngineError> {
1687 Err(EngineError::Unavailable {
1688 op: "check_admission",
1689 })
1690 }
1691
1692 /// Service-layer `evaluate_flow_eligibility` — read-only check
1693 /// that returns the execution's current eligibility state
1694 /// (`eligible`, `blocked_by_dependencies`, or a backend-specific
1695 /// status string). Called by cairn's dependency-resolution path
1696 /// to decide whether a downstream execution can proceed.
1697 #[cfg(feature = "core")]
1698 async fn evaluate_flow_eligibility(
1699 &self,
1700 _args: EvaluateFlowEligibilityArgs,
1701 ) -> Result<EvaluateFlowEligibilityResult, EngineError> {
1702 Err(EngineError::Unavailable {
1703 op: "evaluate_flow_eligibility",
1704 })
1705 }
1706
1707 // ── #454 — cairn typed-FCALL additions (4) ─────────────────
1708
1709 /// Per-execution budget spend with tenant-open dimensions.
1710 ///
1711 /// Cairn #454 Q1/Q2: takes an open-set `BTreeMap<String, u64>` of
1712 /// deltas (distinct from the fixed-shape
1713 /// [`Self::report_usage`]/[`Self::report_usage_admin`] which use
1714 /// [`UsageDimensions`]). Return shape reuses [`ReportUsageResult`]
1715 /// — same `Ok` / `SoftBreach` / `HardBreach` / `AlreadyApplied`
1716 /// variants cairn's UI already branches on.
1717 ///
1718 /// The default impl returns [`EngineError::Unavailable`] so the
1719 /// trait remains additive for backends that have not landed the
1720 /// #454 body yet.
1721 #[cfg(feature = "core")]
1722 async fn record_spend(
1723 &self,
1724 _args: RecordSpendArgs,
1725 ) -> Result<ReportUsageResult, EngineError> {
1726 Err(EngineError::Unavailable {
1727 op: "record_spend",
1728 })
1729 }
1730
1731 /// Per-execution budget attribution release.
1732 ///
1733 /// Called on execution termination to reverse this execution's
1734 /// contribution to a budget counter. Per cairn #454 clarification
1735 /// this is **per-execution**, not a whole-budget flush — the
1736 /// budget persists across executions.
1737 ///
1738 /// The default impl returns [`EngineError::Unavailable`].
1739 #[cfg(feature = "core")]
1740 async fn release_budget(
1741 &self,
1742 _args: ReleaseBudgetArgs,
1743 ) -> Result<(), EngineError> {
1744 Err(EngineError::Unavailable {
1745 op: "release_budget",
1746 })
1747 }
1748
1749 /// Operator-driven approval-signal delivery.
1750 ///
1751 /// Pre-shaped variant of [`Self::deliver_signal`] where the caller
1752 /// **does not carry the waitpoint token**. The backend reads the
1753 /// token from `ff_waitpoint_pending`, HMAC-verifies server-side,
1754 /// and dispatches. Operator API never sees the token bytes.
1755 ///
1756 /// Cairn #454 Q3 — `signal_name` is a flat string (conventional
1757 /// values `"approved"` / `"rejected"`); audit metadata lives in
1758 /// cairn's audit log, not on the FF surface.
1759 ///
1760 /// The default impl returns [`EngineError::Unavailable`].
1761 #[cfg(feature = "core")]
1762 async fn deliver_approval_signal(
1763 &self,
1764 _args: DeliverApprovalSignalArgs,
1765 ) -> Result<DeliverSignalResult, EngineError> {
1766 Err(EngineError::Unavailable {
1767 op: "deliver_approval_signal",
1768 })
1769 }
1770
1771 /// Backend-atomic `issue_claim_grant` + `claim_execution`.
1772 ///
1773 /// Cairn #454 Q4 — composing the two primitives caller-side risks
1774 /// leaking a grant if `claim_execution` fails after
1775 /// `issue_claim_grant` succeeded. This method's contract is that
1776 /// the composition is backend-atomic: Valkey fuses them in one
1777 /// FCALL; PG/SQLite fuse them in one tx.
1778 ///
1779 /// The default impl **must not** be a chained call — it returns
1780 /// [`EngineError::Unavailable`] so consumers cannot accidentally
1781 /// use a non-atomic fallback.
1782 #[cfg(feature = "core")]
1783 async fn issue_grant_and_claim(
1784 &self,
1785 _args: IssueGrantAndClaimArgs,
1786 ) -> Result<ClaimGrantOutcome, EngineError> {
1787 Err(EngineError::Unavailable {
1788 op: "issue_grant_and_claim",
1789 })
1790 }
1791
1792 // ── RFC-017 Stage A — Budget + quota admin (5) ─────────────
1793
1794 /// Create a budget definition (row 6).
1795 #[cfg(feature = "core")]
1796 async fn create_budget(
1797 &self,
1798 _args: CreateBudgetArgs,
1799 ) -> Result<CreateBudgetResult, EngineError> {
1800 Err(EngineError::Unavailable {
1801 op: "create_budget",
1802 })
1803 }
1804
1805 /// Reset a budget's usage counters (row 10).
1806 #[cfg(feature = "core")]
1807 async fn reset_budget(
1808 &self,
1809 _args: ResetBudgetArgs,
1810 ) -> Result<ResetBudgetResult, EngineError> {
1811 Err(EngineError::Unavailable { op: "reset_budget" })
1812 }
1813
1814 /// Create a quota policy (row 7).
1815 #[cfg(feature = "core")]
1816 async fn create_quota_policy(
1817 &self,
1818 _args: CreateQuotaPolicyArgs,
1819 ) -> Result<CreateQuotaPolicyResult, EngineError> {
1820 Err(EngineError::Unavailable {
1821 op: "create_quota_policy",
1822 })
1823 }
1824
1825 /// Read-only budget status for operator visibility (row 8).
1826 #[cfg(feature = "core")]
1827 async fn get_budget_status(
1828 &self,
1829 _id: &BudgetId,
1830 ) -> Result<BudgetStatus, EngineError> {
1831 Err(EngineError::Unavailable {
1832 op: "get_budget_status",
1833 })
1834 }
1835
1836 /// Admin-path `report_usage` (row 9 + RFC-017 §5 round-1 F4).
1837 /// Distinct from the existing [`Self::report_usage`] which takes
1838 /// a worker handle — the admin path has no lease context.
1839 #[cfg(feature = "core")]
1840 async fn report_usage_admin(
1841 &self,
1842 _budget: &BudgetId,
1843 _args: ReportUsageAdminArgs,
1844 ) -> Result<ReportUsageResult, EngineError> {
1845 Err(EngineError::Unavailable {
1846 op: "report_usage_admin",
1847 })
1848 }
1849
1850 // ── RFC-017 Stage A — Read + diagnostics (3) ───────────────
1851
1852 /// Fetch the stored result payload for a completed execution
1853 /// (row 4). Returns `Ok(None)` when the execution is missing, not
1854 /// yet complete, or its payload was trimmed by retention policy.
1855 async fn get_execution_result(
1856 &self,
1857 _id: &ExecutionId,
1858 ) -> Result<Option<Vec<u8>>, EngineError> {
1859 Err(EngineError::Unavailable {
1860 op: "get_execution_result",
1861 })
1862 }
1863
1864 /// List the pending-or-active waitpoints for an execution, cursor
1865 /// paginated (row 5 / §8). Stage A preserves the existing
1866 /// `PendingWaitpointInfo` shape; Stage D ships the §8 HMAC
1867 /// sanitisation + `(token_kid, token_fingerprint)` schema.
1868 #[cfg(feature = "core")]
1869 async fn list_pending_waitpoints(
1870 &self,
1871 _args: ListPendingWaitpointsArgs,
1872 ) -> Result<ListPendingWaitpointsResult, EngineError> {
1873 Err(EngineError::Unavailable {
1874 op: "list_pending_waitpoints",
1875 })
1876 }
1877
1878 /// Backend-level reachability probe (row 1). Valkey: `PING`;
1879 /// Postgres: `SELECT 1`.
1880 async fn ping(&self) -> Result<(), EngineError> {
1881 Err(EngineError::Unavailable { op: "ping" })
1882 }
1883
1884 // ── RFC-025 worker registry ────────────────────────────────
1885 //
1886 // Four core primitives + Phase-6 `list_workers` readback. See
1887 // `rfcs/RFC-025-worker-registry.md` for the full design. Default
1888 // impls return `Unavailable` so out-of-tree backends keep
1889 // compiling; every in-tree backend overrides.
1890
1891 /// Register (or idempotently refresh) a worker instance. See RFC-025 §4.
1892 #[cfg(feature = "core")]
1893 async fn register_worker(
1894 &self,
1895 _args: RegisterWorkerArgs,
1896 ) -> Result<RegisterWorkerOutcome, EngineError> {
1897 Err(EngineError::Unavailable {
1898 op: "register_worker",
1899 })
1900 }
1901
1902 /// Refresh the worker-instance liveness TTL.
1903 #[cfg(feature = "core")]
1904 async fn heartbeat_worker(
1905 &self,
1906 _args: HeartbeatWorkerArgs,
1907 ) -> Result<HeartbeatWorkerOutcome, EngineError> {
1908 Err(EngineError::Unavailable {
1909 op: "heartbeat_worker",
1910 })
1911 }
1912
1913 /// Operator-driven worker death (distinct from passive TTL expiry).
1914 #[cfg(feature = "core")]
1915 async fn mark_worker_dead(
1916 &self,
1917 _args: MarkWorkerDeadArgs,
1918 ) -> Result<MarkWorkerDeadOutcome, EngineError> {
1919 Err(EngineError::Unavailable {
1920 op: "mark_worker_dead",
1921 })
1922 }
1923
1924 /// Enumerate expired leases for reclaim-decision tooling.
1925 #[cfg(feature = "suspension")]
1926 async fn list_expired_leases(
1927 &self,
1928 _args: ListExpiredLeasesArgs,
1929 ) -> Result<ListExpiredLeasesResult, EngineError> {
1930 Err(EngineError::Unavailable {
1931 op: "list_expired_leases",
1932 })
1933 }
1934
1935 /// Enumerate live workers (RFC-025 Phase 6, §9.4).
1936 #[cfg(feature = "core")]
1937 async fn list_workers(
1938 &self,
1939 _args: ListWorkersArgs,
1940 ) -> Result<ListWorkersResult, EngineError> {
1941 Err(EngineError::Unavailable {
1942 op: "list_workers",
1943 })
1944 }
1945
1946 // ── RFC-017 Stage A — Scheduling (1) ───────────────────────
1947
1948 /// Scheduler-routed claim entrypoint (row 18, RFC-017 §7). Valkey
1949 /// forwards to its `ff_scheduler::Scheduler` cursor; Postgres
1950 /// forwards to `PostgresScheduler`'s `FOR UPDATE SKIP LOCKED`
1951 /// path.
1952 ///
1953 /// Backends that carry an embedded scheduler (e.g. `ValkeyBackend`
1954 /// constructed via `with_embedded_scheduler`, or `PostgresBackend`
1955 /// with its `with_scanners` sibling) route the claim through it.
1956 /// Backends without a wired scheduler return
1957 /// [`EngineError::Unavailable`]. HTTP consumers use
1958 /// `FlowFabricWorker::claim_via_server` instead.
1959 #[cfg(feature = "core")]
1960 async fn claim_for_worker(
1961 &self,
1962 _args: ClaimForWorkerArgs,
1963 ) -> Result<ClaimForWorkerOutcome, EngineError> {
1964 Err(EngineError::Unavailable {
1965 op: "claim_for_worker",
1966 })
1967 }
1968
1969 // ── Cross-cutting (RFC-017 Stage B trait-lift) ──────────────
1970
1971 /// Static observability label identifying the backend family in
1972 /// logs + metrics (RFC-017 §5.4 + §9 Stage B). Default impl
1973 /// returns `"unknown"` so legacy `impl EngineBackend` blocks that
1974 /// have not upgraded keep compiling; every in-tree backend
1975 /// overrides — `ValkeyBackend` → `"valkey"`, `PostgresBackend` →
1976 /// `"postgres"`.
1977 fn backend_label(&self) -> &'static str {
1978 "unknown"
1979 }
1980
1981 /// Backend downcast escape hatch (v0.12 PR-7a transitional).
1982 ///
1983 /// Scanner supervisors in `ff-engine` still dispatch through a
1984 /// concrete `ferriskey::Client`; to keep the engine's public
1985 /// boundary backend-agnostic (`Arc<dyn EngineBackend>`) while the
1986 /// scanner internals remain Valkey-shaped, the engine downcasts
1987 /// via this method and reaches in for the embedded client. Every
1988 /// backend that wants to be consumed by `Engine::start_with_completions`
1989 /// overrides this to return `self` as `&dyn Any`; the default
1990 /// returns a placeholder so a stray `downcast_ref` fails cleanly
1991 /// rather than risking unsound behaviour.
1992 ///
1993 /// v0.13 (PR-7b) will trait-ify individual scanners onto
1994 /// `EngineBackend` and retire `ff-engine`'s dependence on this
1995 /// downcast path. The method itself will remain on the trait
1996 /// (likely deprecated) rather than be removed — removing a
1997 /// public trait method is a breaking change for external
1998 /// `impl EngineBackend` blocks.
1999 fn as_any(&self) -> &(dyn std::any::Any + 'static) {
2000 // Placeholder so the default does not expose `Self` for
2001 // downcast. Backends override to return `self`.
2002 &()
2003 }
2004
2005 /// RFC-018 Stage A: snapshot of this backend's identity + the
2006 /// flat `Supports` surface it can actually service. Consumers use
2007 /// this at startup to gate UI features / choose between alternative
2008 /// code paths before dispatching. See
2009 /// `rfcs/RFC-018-backend-capability-discovery.md` for the full
2010 /// discovery contract and the four owner-adjudicated open
2011 /// questions (granularity: coarse; version: struct; sync; no
2012 /// event stream).
2013 ///
2014 /// Default: returns a value tagged `family = "unknown"` with every
2015 /// `supports.*` bool `false`, so pre-RFC-018 out-of-tree backends
2016 /// keep compiling and consumers treat "all false" as "dispatch
2017 /// and catch [`EngineError::Unavailable`]" (pre-RFC-018 behaviour).
2018 /// Concrete in-tree backends (`ValkeyBackend`, `PostgresBackend`)
2019 /// override to populate a real value.
2020 ///
2021 /// Sync (no `.await`): backend-static info should not require a
2022 /// probe on every query. Dynamic probes happen once at
2023 /// `connect*` time and cache the result.
2024 fn capabilities(&self) -> crate::capability::Capabilities {
2025 crate::capability::Capabilities::new(
2026 crate::capability::BackendIdentity::new(
2027 "unknown",
2028 crate::capability::Version::new(0, 0, 0),
2029 "unknown",
2030 ),
2031 crate::capability::Supports::none(),
2032 )
2033 }
2034
2035 /// Issue #281: run one-time backend-specific boot preparation.
2036 ///
2037 /// Intended to run ONCE per deployment startup — NOT per request.
2038 /// Idempotent and safe for consumers to call on every application
2039 /// boot; backends that have nothing to do return
2040 /// [`PrepareOutcome::NoOp`] without side effects.
2041 ///
2042 /// Per-backend behaviour:
2043 ///
2044 /// * **Valkey** — issues `FUNCTION LOAD REPLACE` for the
2045 /// `flowfabric` Lua library (with bounded retry on transient
2046 /// transport faults; permanent compile errors surface as
2047 /// [`EngineError::Transport`] without retry). Returns
2048 /// [`PrepareOutcome::Applied`] carrying
2049 /// `"FUNCTION LOAD (flowfabric lib v<N>)"`.
2050 /// * **Postgres** — returns [`PrepareOutcome::NoOp`]. Schema
2051 /// migrations are applied out-of-band per
2052 /// `rfcs/drafts/v0.7-migration-master.md §Q12`; the backend
2053 /// runs a schema-version check at connect time and refuses to
2054 /// start on mismatch, so no boot-side prepare work remains.
2055 /// * **Default impl** — returns [`PrepareOutcome::NoOp`] so
2056 /// out-of-tree backends without preparation work compile
2057 /// without boilerplate.
2058 ///
2059 /// # Relationship to the in-tree boot path
2060 ///
2061 /// `ValkeyBackend::initialize_deployment` (called from
2062 /// `Server::start_with_metrics`) already invokes
2063 /// [`ensure_library`](ff_script::loader::ensure_library) inline as
2064 /// its step 4; that path is unchanged. `prepare()` exists as a
2065 /// **trait-surface entry point** so consumers that construct an
2066 /// `Arc<dyn EngineBackend>` outside of `Server` (e.g.
2067 /// cairn-fabric's boot path at `cairn-fabric/src/boot.rs`) can
2068 /// run the same preparation without reaching into
2069 /// backend-specific modules. The overlap is intentional: calling
2070 /// both `prepare()` and `initialize_deployment` is safe because
2071 /// `FUNCTION LOAD REPLACE` is idempotent under the version
2072 /// check.
2073 ///
2074 /// # Layer forwarding
2075 ///
2076 /// Layer impls (`HookedBackend`, ff-sdk layers) do NOT forward
2077 /// `prepare` today — consistent with `backend_label` / `ping` /
2078 /// `shutdown_prepare`. Consumers that wrap a backend in layers
2079 /// MUST call `prepare()` on the raw backend before wrapping, or
2080 /// accept the default [`PrepareOutcome::NoOp`].
2081 async fn prepare(&self) -> Result<PrepareOutcome, EngineError> {
2082 Ok(PrepareOutcome::NoOp)
2083 }
2084
2085 /// Drain-before-shutdown hook (RFC-017 §5.4). The server calls
2086 /// this before draining its own background tasks so backend-
2087 /// scoped primitives (Valkey stream semaphore, Postgres sqlx
2088 /// pool, …) can close their gates and await in-flight work up to
2089 /// `grace`.
2090 ///
2091 /// Default impl returns `Ok(())` — a no-op backend has nothing
2092 /// backend-scoped to drain. Concrete backends whose data plane
2093 /// owns resources (connection pools, semaphores, listeners)
2094 /// override with a real body.
2095 async fn shutdown_prepare(&self, _grace: Duration) -> Result<(), EngineError> {
2096 Ok(())
2097 }
2098
2099 // ── RFC-017 Stage E2 — `Server::client` removal (header + reads) ───
2100
2101 /// RFC-017 Stage E2: the "header" portion of `cancel_flow` — run the
2102 /// atomic flow-state flip (Valkey: `ff_cancel_flow` FCALL; Postgres:
2103 /// `cancel_flow_once` tx), decode policy + membership, and surface
2104 /// the `flow_already_terminal` idempotency branch as a first-class
2105 /// [`CancelFlowHeader::AlreadyTerminal`] so the Server can build
2106 /// the wire [`CancelFlowResult`] without reaching for a raw
2107 /// `Client`. Separate from the existing
2108 /// [`EngineBackend::cancel_flow`] entry point (which takes the
2109 /// enum-typed `(policy, wait)` split and returns the wait-collapsed
2110 /// `CancelFlowResult`) because the Server owns its own
2111 /// wait-dispatch + member-cancel machinery via
2112 /// [`EngineBackend::cancel_execution`] + backlog ack.
2113 ///
2114 /// Default impl returns [`EngineError::Unavailable`] so un-migrated
2115 /// backends surface the miss loudly.
2116 #[cfg(feature = "core")]
2117 async fn cancel_flow_header(
2118 &self,
2119 _args: CancelFlowArgs,
2120 ) -> Result<crate::contracts::CancelFlowHeader, EngineError> {
2121 Err(EngineError::Unavailable {
2122 op: "cancel_flow_header",
2123 })
2124 }
2125
2126 /// RFC-017 Stage E2: best-effort acknowledgement that one member of
2127 /// a `cancel_all` flow has completed its per-member cancel. Drains
2128 /// the member from the flow's `pending_cancels` set and, if empty,
2129 /// removes the flow from the partition-level `cancel_backlog`
2130 /// (Valkey: `ff_ack_cancel_member` FCALL; Postgres: table write —
2131 /// default `Unavailable` until Wave 9).
2132 ///
2133 /// Failures are swallowed by the caller — the cancel-backlog
2134 /// reconciler is the authoritative drain — but a typed error here
2135 /// lets the caller log a backend-scoped context string.
2136 #[cfg(feature = "core")]
2137 async fn ack_cancel_member(
2138 &self,
2139 _flow_id: &FlowId,
2140 _execution_id: &ExecutionId,
2141 ) -> Result<(), EngineError> {
2142 Err(EngineError::Unavailable {
2143 op: "ack_cancel_member",
2144 })
2145 }
2146
2147 /// RFC-017 Stage E2: full-shape execution read used by the
2148 /// `GET /v1/executions/{id}` HTTP route. Returns the legacy
2149 /// [`ExecutionInfo`] wire shape (not the decoupled
2150 /// [`ExecutionSnapshot`]) so the existing HTTP response bytes stay
2151 /// identical across the migration.
2152 ///
2153 /// `Ok(None)` ⇒ no such execution. Default `Unavailable` because
2154 /// the Valkey HGETALL-and-parse is backend-specific.
2155 #[cfg(feature = "core")]
2156 async fn read_execution_info(
2157 &self,
2158 _id: &ExecutionId,
2159 ) -> Result<Option<ExecutionInfo>, EngineError> {
2160 Err(EngineError::Unavailable {
2161 op: "read_execution_info",
2162 })
2163 }
2164
2165 /// RFC-017 Stage E2: narrow `public_state` read used by the
2166 /// `GET /v1/executions/{id}/state` HTTP route. Returns `Ok(None)`
2167 /// when the execution is missing. Default `Unavailable`.
2168 #[cfg(feature = "core")]
2169 async fn read_execution_state(
2170 &self,
2171 _id: &ExecutionId,
2172 ) -> Result<Option<PublicState>, EngineError> {
2173 Err(EngineError::Unavailable {
2174 op: "read_execution_state",
2175 })
2176 }
2177
2178 // ── RFC-019 Stage A/B/C — Stream-cursor subscriptions ─────────
2179 //
2180 // Four owner-adjudicated families (RFC-019 §Open Questions #5):
2181 // `lease_history`, `completion`, `signal_delivery`,
2182 // `instance_tags`. Stage C (this crate) promotes each family to
2183 // a typed event enum; consumers `match` on variants instead of
2184 // parsing a backend-shaped byte blob.
2185 //
2186 // Each method returns a family-specific subscription alias (see
2187 // [`crate::stream_events`]). All defaults return
2188 // `EngineError::Unavailable` per RFC-017 trait-growth conventions.
2189
2190 /// Subscribe to lease lifecycle events (acquired / renewed /
2191 /// expired / reclaimed / revoked) for the partition this backend
2192 /// is configured with.
2193 ///
2194 /// Cross-partition fan-out is consumer-side merge: subscribe
2195 /// per-partition backend instance and interleave on the read
2196 /// side. Yields
2197 /// `Err(EngineError::StreamDisconnected { cursor })` on backend
2198 /// disconnect; resume by calling this method again with the
2199 /// returned cursor.
2200 ///
2201 /// `filter` (#282): when `filter.instance_tag` is `Some((k, v))`,
2202 /// only events whose execution carries tag `k = v` are yielded
2203 /// (matching the [`crate::backend::ScannerFilter`] surface from
2204 /// #122). Pass `&ScannerFilter::default()` for unfiltered
2205 /// behaviour. Filtering happens inside the backend stream; the
2206 /// [`crate::stream_events::LeaseHistorySubscription`] return type
2207 /// is unchanged.
2208 async fn subscribe_lease_history(
2209 &self,
2210 _cursor: crate::stream_subscribe::StreamCursor,
2211 _filter: &crate::backend::ScannerFilter,
2212 ) -> Result<crate::stream_events::LeaseHistorySubscription, EngineError> {
2213 Err(EngineError::Unavailable {
2214 op: "subscribe_lease_history",
2215 })
2216 }
2217
2218 /// Subscribe to completion events (terminal state transitions).
2219 ///
2220 /// - **Postgres**: wraps the `ff_completion_event` outbox +
2221 /// LISTEN/NOTIFY machinery. Durable via event-id cursor.
2222 /// - **Valkey**: wraps the RESP3 `ff:dag:completions` pubsub
2223 /// subscriber. Pubsub is at-most-once over the live
2224 /// subscription window; the cursor is always the empty
2225 /// sentinel. If you need at-least-once replay with durable
2226 /// cursor resume, use the Postgres backend (see
2227 /// `docs/POSTGRES_PARITY_MATRIX.md` row `subscribe_completion`).
2228 ///
2229 /// `filter` (#282): see [`Self::subscribe_lease_history`]. Valkey
2230 /// reuses the `subscribe_completions_filtered` per-event HGET
2231 /// gate; Postgres filters inline against the outbox's denormalised
2232 /// `instance_tag` column.
2233 async fn subscribe_completion(
2234 &self,
2235 _cursor: crate::stream_subscribe::StreamCursor,
2236 _filter: &crate::backend::ScannerFilter,
2237 ) -> Result<crate::stream_events::CompletionSubscription, EngineError> {
2238 Err(EngineError::Unavailable {
2239 op: "subscribe_completion",
2240 })
2241 }
2242
2243 /// Subscribe to signal-delivery events (satisfied / buffered /
2244 /// deduped).
2245 ///
2246 /// `filter` (#282): see [`Self::subscribe_lease_history`].
2247 async fn subscribe_signal_delivery(
2248 &self,
2249 _cursor: crate::stream_subscribe::StreamCursor,
2250 _filter: &crate::backend::ScannerFilter,
2251 ) -> Result<crate::stream_events::SignalDeliverySubscription, EngineError> {
2252 Err(EngineError::Unavailable {
2253 op: "subscribe_signal_delivery",
2254 })
2255 }
2256
2257 /// Subscribe to instance-tag events (tag attached / cleared).
2258 ///
2259 /// Producer wiring is deferred per #311 audit ("no concrete
2260 /// demand"); the trait method exists for API uniformity across
2261 /// the four families. Backends currently return
2262 /// `EngineError::Unavailable`.
2263 async fn subscribe_instance_tags(
2264 &self,
2265 _cursor: crate::stream_subscribe::StreamCursor,
2266 ) -> Result<crate::stream_events::InstanceTagSubscription, EngineError> {
2267 Err(EngineError::Unavailable {
2268 op: "subscribe_instance_tags",
2269 })
2270 }
2271
2272 // ── PR-7b Cluster 1 — Foundation scanner operations ─────────
2273 //
2274 // Per-execution write hooks invoked by the engine scanner loop.
2275 // Scanner bodies discover candidate executions via partition
2276 // indices (Valkey ZSETs / PG index scans) and, for each candidate,
2277 // call through to one of the methods below to perform the atomic
2278 // state-flip. Defaults return `EngineError::Unavailable { op }`;
2279 // Valkey impls wrap the corresponding `ff_*` FCALL; Postgres impls
2280 // run a single-row tx mirroring the Lua semantic.
2281
2282 /// Lease-expiry scanner hook — mark an expired lease as reclaimable
2283 /// so another worker can redeem the execution. Atomic per call.
2284 ///
2285 /// Valkey: `FCALL ff_mark_lease_expired_if_due`.
2286 /// Postgres: single-row tx on `ff_attempt` + `ff_exec_core` (see
2287 /// `ff_backend_postgres::reconcilers::lease_expiry`).
2288 async fn mark_lease_expired_if_due(
2289 &self,
2290 _partition: crate::partition::Partition,
2291 _execution_id: &ExecutionId,
2292 ) -> Result<(), EngineError> {
2293 Err(EngineError::Unavailable {
2294 op: "mark_lease_expired_if_due",
2295 })
2296 }
2297
2298 /// Delayed-promoter scanner hook — promote a delayed execution to
2299 /// `eligible_now` once its `delay_until` has passed.
2300 ///
2301 /// Valkey: `FCALL ff_promote_delayed`.
2302 /// Postgres: Wave 9 schema scope (no current impl).
2303 async fn promote_delayed(
2304 &self,
2305 _partition: crate::partition::Partition,
2306 _lane: &LaneId,
2307 _execution_id: &ExecutionId,
2308 _now_ms: TimestampMs,
2309 ) -> Result<(), EngineError> {
2310 Err(EngineError::Unavailable {
2311 op: "promote_delayed",
2312 })
2313 }
2314
2315 /// Pending-waitpoint-expiry scanner hook — close a pending
2316 /// waitpoint whose deadline has passed (wake the suspended
2317 /// execution with a timeout signal).
2318 ///
2319 /// Valkey: `FCALL ff_close_waitpoint`.
2320 /// Postgres: Wave 9 schema scope (no current impl).
2321 async fn close_waitpoint(
2322 &self,
2323 _partition: crate::partition::Partition,
2324 _execution_id: &ExecutionId,
2325 _waitpoint_id: &str,
2326 _now_ms: TimestampMs,
2327 ) -> Result<(), EngineError> {
2328 Err(EngineError::Unavailable {
2329 op: "close_waitpoint",
2330 })
2331 }
2332
2333 /// Shared hook for the attempt-timeout and execution-deadline
2334 /// scanners — terminate or retry an execution whose wall-clock
2335 /// budget has elapsed. `phase` discriminates which of the two
2336 /// scanner paths is calling so the backend can preserve diagnostic
2337 /// breadcrumbs without forking the surface.
2338 ///
2339 /// Valkey: `FCALL ff_expire_execution` (with `phase` passed through
2340 /// as an ARGV discriminator).
2341 /// Postgres: single-row tx mirror of the Lua semantic for
2342 /// `AttemptTimeout`; `ExecutionDeadline` is Wave 9 schema scope.
2343 async fn expire_execution(
2344 &self,
2345 _partition: crate::partition::Partition,
2346 _execution_id: &ExecutionId,
2347 _phase: ExpirePhase,
2348 _now_ms: TimestampMs,
2349 ) -> Result<(), EngineError> {
2350 Err(EngineError::Unavailable {
2351 op: "expire_execution",
2352 })
2353 }
2354
2355 /// Suspension-timeout scanner hook — expire a suspended execution
2356 /// whose suspension deadline has passed (wake with timeout).
2357 ///
2358 /// Valkey: `FCALL ff_expire_suspension`.
2359 /// Postgres: single-row tx on `ff_suspend` + `ff_exec_core`.
2360 async fn expire_suspension(
2361 &self,
2362 _partition: crate::partition::Partition,
2363 _execution_id: &ExecutionId,
2364 _now_ms: TimestampMs,
2365 ) -> Result<(), EngineError> {
2366 Err(EngineError::Unavailable {
2367 op: "expire_suspension",
2368 })
2369 }
2370
2371 // ── PR-7b Cluster 2 — Reconciler scanner operations ─────────
2372 //
2373 // Per-execution write hooks invoked by the `unblock` scanner. The
2374 // other two cluster-2 scanners (`budget_reset`, `dependency_reconciler`)
2375 // route through `reset_budget` + `resolve_dependency`, which are
2376 // already part of the RFC-017 service-layer surface and live above.
2377
2378 /// Unblock-scanner hook — move an execution from a blocked ZSET back
2379 /// to `eligible_now` once its blocking condition has cleared (budget
2380 /// under limit, quota window drained, or a capable worker has come
2381 /// online). `expected_blocking_reason` discriminates which of the
2382 /// `blocked:{budget,quota,route}` sets the execution is leaving and
2383 /// also fences against a stale unblock (Lua rejects if the core's
2384 /// `blocking_reason` no longer matches).
2385 ///
2386 /// # Backend status
2387 ///
2388 /// - **Valkey:** wraps `ff_unblock_execution` (RFC-010 §6). Atomic
2389 /// single-slot FCALL on `{p:N}`.
2390 /// - **Postgres:** `Unavailable` (structural). PG does not persist a
2391 /// per-reason `blocked:{budget,quota,route}` index — scheduler
2392 /// eligibility is re-evaluated live via SQL predicates on
2393 /// `ff_exec_core` + budget / quota tables (see
2394 /// `ff_backend_postgres::scheduler`). Nothing to reconcile, so the
2395 /// engine's PG scanner loop does not run this path; callers on
2396 /// a PG deployment receive the typed `Unavailable` per PR-7b/final
2397 /// contract.
2398 /// - **SQLite:** `Unavailable` for the same reason as Postgres.
2399 async fn unblock_execution(
2400 &self,
2401 _partition: crate::partition::Partition,
2402 _lane_id: &LaneId,
2403 _execution_id: &ExecutionId,
2404 _expected_blocking_reason: &str,
2405 _now_ms: TimestampMs,
2406 ) -> Result<(), EngineError> {
2407 Err(EngineError::Unavailable {
2408 op: "unblock_execution",
2409 })
2410 }
2411
2412 /// RFC-016 Stage C — sibling-cancel group drain.
2413 ///
2414 /// After `edge_cancel_dispatcher` has issued
2415 /// [`EngineBackend::cancel_execution`] against every listed
2416 /// sibling of a `(flow_id, downstream_eid)` group, this trait
2417 /// method atomically removes the tuple from the partition-local
2418 /// pending-cancel-groups index and clears the per-group flag +
2419 /// members state.
2420 ///
2421 /// Valkey: `FCALL ff_drain_sibling_cancel_group` (SREM +
2422 /// HDEL in one Lua unit).
2423 /// Postgres: `Unavailable` — PG's Wave-6b
2424 /// `reconcilers::edge_cancel_dispatcher::dispatcher_tick`
2425 /// owns the equivalent row drain inside its own per-group
2426 /// transaction; the Valkey-shaped per-tuple call is not used on
2427 /// the PG scanner path.
2428 /// SQLite: `Unavailable` (mirrors PG; RFC-023 scope).
2429 #[cfg(feature = "core")]
2430 async fn drain_sibling_cancel_group(
2431 &self,
2432 _flow_partition: crate::partition::Partition,
2433 _flow_id: &FlowId,
2434 _downstream_eid: &ExecutionId,
2435 ) -> Result<(), EngineError> {
2436 Err(EngineError::Unavailable {
2437 op: "drain_sibling_cancel_group",
2438 })
2439 }
2440
2441 /// RFC-016 Stage D — sibling-cancel group reconcile (Invariant
2442 /// Q6 safety net).
2443 ///
2444 /// Re-examines a `(flow_id, downstream_eid)` tuple in the
2445 /// partition-local pending-cancel-groups index and returns one
2446 /// of three atomic dispositions — see
2447 /// [`SiblingCancelReconcileAction`] — so the crash-recovery
2448 /// reconciler can drain stale or completed tuples without
2449 /// fighting the Stage-C dispatcher.
2450 ///
2451 /// Valkey: `FCALL ff_reconcile_sibling_cancel_group`.
2452 /// Postgres: `Unavailable` — PG's
2453 /// `reconcilers::edge_cancel_reconciler::reconciler_tick`
2454 /// owns the row-level reconcile inside its own batched tx.
2455 /// SQLite: `Unavailable` (mirrors PG).
2456 #[cfg(feature = "core")]
2457 async fn reconcile_sibling_cancel_group(
2458 &self,
2459 _flow_partition: crate::partition::Partition,
2460 _flow_id: &FlowId,
2461 _downstream_eid: &ExecutionId,
2462 ) -> Result<SiblingCancelReconcileAction, EngineError> {
2463 Err(EngineError::Unavailable {
2464 op: "reconcile_sibling_cancel_group",
2465 })
2466 }
2467
2468 // ── PR-7b Cluster 2b-A — Tally-recompute reconciler scanners ─────
2469 //
2470 // Coarse trait methods: each runs a full scan-and-fix pass over
2471 // one partition. Unlike cluster-2's per-candidate write hooks,
2472 // these scanners do multi-round-trip discovery (SSCAN / HMGET /
2473 // HGETALL / ZSCORE / ZREMRANGEBYSCORE) interleaved with conditional
2474 // writes. Moving the whole pass into the trait impl achieves the
2475 // PR-7b/final contract ("scanner trait leaks no `ferriskey::Client`")
2476 // without atomising operations that are inherently not atomic.
2477 //
2478 // Backend status (all three methods):
2479 //
2480 // - **Valkey:** real scan-loop in the trait impl; identical command
2481 // shape to the pre-routing scanner path.
2482 // - **Postgres:** default `Unavailable`. PG mutates budget / quota
2483 // counters and scheduling state inside SERIALIZABLE transactions,
2484 // and the scheduler evaluates eligibility via live SQL predicates
2485 // on `ff_exec_core` + budget / quota tables — no persistent index
2486 // or counter hash that can drift, hence nothing to reconcile. The
2487 // engine's PG scanner supervisor does not run these paths.
2488 // - **SQLite:** same as Postgres (RFC-023: SQLite runs its own
2489 // scanner supervisor; these FF-owned tally-recompute scanners
2490 // are not registered).
2491
2492 /// Index-reconciler pass — walks `ff:idx:{p:N}:all_executions`
2493 /// and verifies each execution appears in the correct scheduling
2494 /// sorted set (`eligible` / `delayed` / `blocked:*` / `active` /
2495 /// `suspended` / `terminal`) for its current `(lifecycle_phase,
2496 /// eligibility_state, ownership_state)` triple. Phase 1 is
2497 /// log-only; auto-fix is deferred to a later phase (RFC-010 §6.14).
2498 async fn reconcile_execution_index(
2499 &self,
2500 _partition: crate::partition::Partition,
2501 _lanes: &[LaneId],
2502 _filter: &crate::backend::ScannerFilter,
2503 ) -> Result<ReconcileCounts, EngineError> {
2504 Err(EngineError::Unavailable {
2505 op: "reconcile_execution_index",
2506 })
2507 }
2508
2509 /// Budget-reconciler pass — walks `ff:budget:{b:M}:policies_idx`,
2510 /// reads each budget's definition / usage / limits, and reconciles
2511 /// the `breached_at` marker against hard limits. Resetting budgets
2512 /// (non-zero `reset_interval_ms`) are skipped — they are handled
2513 /// by the `budget_reset` scanner (cluster 2). Drops index entries
2514 /// for budgets whose definition hash has been deleted (retention
2515 /// purge / manual). RFC-008 §Budget Reconciliation, RFC-010 §6.5.
2516 async fn reconcile_budget_counters(
2517 &self,
2518 _partition: crate::partition::Partition,
2519 _now_ms: TimestampMs,
2520 ) -> Result<ReconcileCounts, EngineError> {
2521 Err(EngineError::Unavailable {
2522 op: "reconcile_budget_counters",
2523 })
2524 }
2525
2526 /// Quota-reconciler pass — walks `ff:quota:{q:M}:policies_idx`,
2527 /// trims expired entries from rate-limit sliding windows, and
2528 /// recomputes each policy's concurrency counter by walking its
2529 /// `admitted_set` and pruning entries whose admission guard key
2530 /// has TTLed out. RFC-008 §Quota Reconciliation, RFC-010 §6.6.
2531 async fn reconcile_quota_counters(
2532 &self,
2533 _partition: crate::partition::Partition,
2534 _now_ms: TimestampMs,
2535 ) -> Result<ReconcileCounts, EngineError> {
2536 Err(EngineError::Unavailable {
2537 op: "reconcile_quota_counters",
2538 })
2539 }
2540
2541 // ── PR-7b Cluster 2b-B — Projection + retention scanners ──────
2542 //
2543 // Two additional non-FCALL scanners routed through the trait so
2544 // engine-side scanners stop touching `ferriskey::Client` directly.
2545 // Both ride on aggregate reads + derived writes; neither is a thin
2546 // single-FCALL passthrough.
2547
2548 /// Flow-projector scanner hook — sample flow members, derive an
2549 /// aggregate `public_flow_state` + per-state counts, and write them
2550 /// to the flow summary projection. Returns `Ok(true)` when the
2551 /// summary was updated, `Ok(false)` when the flow had no members
2552 /// or the index entry was defensively pruned (core missing).
2553 ///
2554 /// The derived `public_flow_state` written here is distinct from
2555 /// `ff_flow_core.public_flow_state` — the former is a rollup
2556 /// dashboard field, the latter is the authoritative mutation-guard
2557 /// state owned by `create_flow` / `cancel_flow`. See
2558 /// `ff_engine::scanner::flow_projector` module doc for the
2559 /// two-sources contract.
2560 ///
2561 /// # Backend status
2562 ///
2563 /// - **Valkey:** lifts the pre-PR-7b Rust-composed
2564 /// SCARD + SRANDMEMBER + per-member HGET + HSET summary pattern.
2565 /// No new Lua function; the aggregation is inherently
2566 /// multi-round-trip (cross-partition member reads) and atomicity
2567 /// is neither required nor achievable against 256 partitions.
2568 /// - **Postgres:** SELECT aggregates from `ff_exec_core` grouped by
2569 /// `public_state` for the flow's members, INSERT ... ON CONFLICT
2570 /// DO UPDATE into `ff_flow_summary` (migration 0019). One query
2571 /// per flow; partition-local aggregation.
2572 /// - **SQLite:** `Unavailable` per RFC-023 Phase 3.5 (flow summary
2573 /// projection is a shared-deployment dashboard feature; local
2574 /// single-tenant SQLite deployments don't need it).
2575 #[cfg(feature = "core")]
2576 async fn project_flow_summary(
2577 &self,
2578 _partition: crate::partition::Partition,
2579 _flow_id: &FlowId,
2580 _now_ms: TimestampMs,
2581 ) -> Result<bool, EngineError> {
2582 Err(EngineError::Unavailable {
2583 op: "project_flow_summary",
2584 })
2585 }
2586
2587 /// Retention-trimmer scanner hook — delete terminal executions and
2588 /// all their subordinate keys/rows once they are older than the
2589 /// configured retention window. Returns the number of executions
2590 /// actually purged in this call (so the scanner can loop when it
2591 /// hits `batch_size`).
2592 ///
2593 /// Retention trimming is inherently a scan-and-delete loop over
2594 /// time; the trait surface exists to remove engine-side Valkey
2595 /// coupling, **not** to atomise the operation into a single
2596 /// round-trip. Implementations may issue multiple round-trips
2597 /// (e.g. per-execution cascade across sibling tables); the trait
2598 /// contract is "make progress, bounded by batch_size".
2599 ///
2600 /// # Backend status
2601 ///
2602 /// - **Valkey:** lifts the pre-PR-7b ZRANGEBYSCORE + per-exec
2603 /// cascade-delete loop verbatim. Cluster-safe (all keys for one
2604 /// execution live on the same `{p:N}` slot).
2605 /// - **Postgres:** `DELETE FROM ff_exec_core` for terminal rows
2606 /// past the cutoff plus explicit cascade DELETEs on every
2607 /// execution-scoped sibling table (no FK CASCADE in the schema).
2608 /// Single transaction per batch.
2609 /// - **SQLite:** `Unavailable` per RFC-023 Phase 3.5. Single-tenant
2610 /// local deployments typically manage their own DB lifecycle and
2611 /// do not need a retention scanner.
2612 #[cfg(feature = "core")]
2613 async fn trim_retention(
2614 &self,
2615 _partition: crate::partition::Partition,
2616 _lane_id: &LaneId,
2617 _retention_ms: u64,
2618 _now_ms: TimestampMs,
2619 _batch_size: u32,
2620 _filter: &crate::backend::ScannerFilter,
2621 ) -> Result<u32, EngineError> {
2622 Err(EngineError::Unavailable {
2623 op: "trim_retention",
2624 })
2625 }
2626
2627 // ── PR-7b Wave 0a: exec_core field read primitive ──
2628
2629 /// Point-read N fields from the `exec_core` hash for a given
2630 /// execution. Returns a map of field-name → Option<String>
2631 /// (None for fields absent or stored as NULL). Scanner call
2632 /// sites formerly issuing raw `HGET`/`HMGET` on `ExecKeyContext::core()`
2633 /// route through this trait method (cairn #436 / PR-7b Wave 0a).
2634 ///
2635 /// Field values are coerced to String at the trait boundary for
2636 /// wire compatibility with the Valkey HGET shape. Consumers parse
2637 /// specific fields (`lane_id`, `current_attempt_index`, etc.) from
2638 /// the returned strings as needed.
2639 ///
2640 /// - **Valkey:** `HMGET exec_core_key f1 f2 ...`, zipping names to values.
2641 /// - **Postgres:** `SELECT` against `ff_exec_core` with dynamic column
2642 /// extraction; fields in `raw_fields` JSONB are extracted via `->>`.
2643 /// - **SQLite:** equivalent with `json_extract(raw_fields, '$.field')`.
2644 ///
2645 /// Default body returns `Unavailable` so non-v0.13 backends remain
2646 /// compile-compatible.
2647 #[cfg(feature = "core")]
2648 async fn read_exec_core_fields(
2649 &self,
2650 _partition: crate::partition::Partition,
2651 _execution_id: &crate::types::ExecutionId,
2652 _fields: &[&str],
2653 ) -> Result<std::collections::HashMap<String, Option<String>>, EngineError> {
2654 Err(EngineError::Unavailable {
2655 op: "read_exec_core_fields",
2656 })
2657 }
2658
2659 // ── PR-7b Wave 0a: backend clock primitive ──
2660
2661 /// Returns the backend's current wall-clock epoch milliseconds.
2662 ///
2663 /// Used by 15 scanners to compute "due" thresholds before issuing
2664 /// per-partition due-scans. Previously a Valkey-only helper in
2665 /// `ff-engine`'s `scanner::lease_expiry` issuing `TIME`; this
2666 /// trait method is the backend-agnostic replacement (cairn #436 /
2667 /// PR-7b Wave 0a).
2668 ///
2669 /// Every in-tree backend overrides. The default falls back to
2670 /// `SystemTime::now()` so out-of-tree `EngineBackend` impls (e.g.
2671 /// cairn mocks, test doubles) stay source-compatible across v0.12
2672 /// → v0.13.
2673 ///
2674 /// - **Valkey:** `TIME` command.
2675 /// - **Postgres:** `SELECT (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint`
2676 /// (not `now()` — `now()` is the transaction start timestamp,
2677 /// which is stale under any long-running tx; scanners need the
2678 /// true wall-clock read).
2679 /// - **SQLite:** `SELECT CAST((julianday('now') - 2440587.5) * 86400000 AS INTEGER)`.
2680 #[cfg(feature = "core")]
2681 async fn server_time_ms(&self) -> Result<u64, EngineError> {
2682 use std::time::{SystemTime, UNIX_EPOCH};
2683 let d = SystemTime::now()
2684 .duration_since(UNIX_EPOCH)
2685 .map_err(|e| EngineError::Transport {
2686 backend: "system-clock",
2687 source: format!("server_time_ms default: {e}").into(),
2688 })?;
2689 Ok(d.as_millis() as u64)
2690 }
2691}
2692
2693/// RFC-016 Stage D outcome of a single
2694/// [`EngineBackend::reconcile_sibling_cancel_group`] call.
2695///
2696/// Cardinality is intentionally bounded to three so the
2697/// `ff_sibling_cancel_reconcile_total{action}` metric label stays
2698/// closed.
2699#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2700pub enum SiblingCancelReconcileAction {
2701 /// Pending tuple was stale (flag cleared or edgegroup absent);
2702 /// SREM'd / DELETE'd without touching the group.
2703 SremmedStale,
2704 /// Flag still set but every listed sibling is already terminal —
2705 /// an interrupted drain. Flag cleared + tuple drained.
2706 CompletedDrain,
2707 /// Flag set and at least one sibling non-terminal — dispatcher
2708 /// owns this tuple; left untouched.
2709 NoOp,
2710}
2711
2712impl SiblingCancelReconcileAction {
2713 /// Short label for observability (matches the Lua + PG action
2714 /// strings exactly).
2715 pub fn as_str(&self) -> &'static str {
2716 match self {
2717 Self::SremmedStale => "sremmed_stale",
2718 Self::CompletedDrain => "completed_drain",
2719 Self::NoOp => "no_op",
2720 }
2721 }
2722}
2723
2724/// Result of one reconciler scan pass over a single partition.
2725///
2726/// `processed` counts candidates where the reconciler detected a
2727/// correctable condition (drift, breach flip, stale guard). `errors`
2728/// counts transport / parse failures. Scanners sum these into the
2729/// engine's per-pass metrics exactly as they did before PR-7b/2b-A.
2730#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
2731pub struct ReconcileCounts {
2732 /// Items the scanner corrected (drift fix, breach flip, guard
2733 /// eviction). Zero when the partition is healthy.
2734 pub processed: u32,
2735 /// Items the scanner could not process due to transport or parse
2736 /// errors. Non-zero values surface through scanner metrics and
2737 /// should be investigated.
2738 pub errors: u32,
2739}
2740
2741/// Which scanner invoked [`EngineBackend::expire_execution`].
2742///
2743/// The attempt-timeout and execution-deadline scanners share a single
2744/// trait method — their underlying state flip is identical (terminate
2745/// or retry per retry policy); the distinction is purely diagnostic
2746/// (which deadline elapsed) and carried through to the backend so the
2747/// same Lua / SQL path can log or tag appropriately.
2748#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2749pub enum ExpirePhase {
2750 /// Invoked by `attempt_timeout` scanner — the per-attempt
2751 /// wall-clock budget elapsed.
2752 AttemptTimeout,
2753 /// Invoked by `execution_deadline` scanner — the whole-execution
2754 /// wall-clock deadline elapsed.
2755 ExecutionDeadline,
2756}
2757
2758impl ExpirePhase {
2759 /// Short string tag suitable for Lua ARGV or Postgres breadcrumbs.
2760 pub fn as_str(&self) -> &'static str {
2761 match self {
2762 Self::AttemptTimeout => "attempt_timeout",
2763 Self::ExecutionDeadline => "execution_deadline",
2764 }
2765 }
2766}
2767
2768/// Object-safety assertion: `dyn EngineBackend` compiles iff every
2769/// method is dyn-compatible. Kept as a compile-time guard so a future
2770/// trait change that accidentally breaks dyn-safety fails the build
2771/// at this site rather than at every downstream `Arc<dyn
2772/// EngineBackend>` use.
2773#[allow(dead_code)]
2774fn _assert_dyn_compatible(_: &dyn EngineBackend) {}
2775
2776/// Polling interval for [`wait_for_flow_cancellation`]. Tight enough
2777/// that a local single-node cancel cascade observes `cancelled` within
2778/// one or two polls; slack enough that a `WaitIndefinite` caller does
2779/// not hammer `describe_flow` on a live cluster.
2780const CANCEL_WAIT_POLL_INTERVAL: Duration = Duration::from_millis(100);
2781
2782/// Defensive ceiling for [`CancelFlowWait::WaitIndefinite`] — if the
2783/// reconciler cascade has not converged in five minutes, something is
2784/// wedged and returning `Timeout` is strictly more useful than blocking
2785/// forever. RFC-012 §3.1.1 expects real-world cascades to finish within
2786/// `reconciler_interval + grace`, which is orders of magnitude below
2787/// this.
2788const CANCEL_WAIT_INDEFINITE_CEILING: Duration = Duration::from_secs(300);
2789
2790/// Poll `backend.describe_flow(flow_id)` until `public_flow_state` is
2791/// `"cancelled"` or `deadline` elapses.
2792///
2793/// Shared by every backend's `cancel_flow` trait impl that honours
2794/// [`CancelFlowWait::WaitTimeout`] / [`CancelFlowWait::WaitIndefinite`].
2795/// The underlying `cancel_flow` FCALL / SQL transaction flips the
2796/// flow-level state synchronously; member cancellations dispatch
2797/// asynchronously via the reconciler, which also flips
2798/// `public_flow_state` to `cancelled` once the cascade completes. This
2799/// helper waits for that terminal flip.
2800///
2801/// Returns:
2802/// * `Ok(())` once `public_flow_state = "cancelled"` is observed.
2803/// * `Err(EngineError::Timeout { op: "cancel_flow", elapsed })` when
2804/// `deadline` elapses first. `elapsed` is the wait budget (the
2805/// requested timeout), not wall-clock precision.
2806/// * `Err(e)` if `describe_flow` itself errors (propagated).
2807pub async fn wait_for_flow_cancellation<B: EngineBackend + ?Sized>(
2808 backend: &B,
2809 flow_id: &crate::types::FlowId,
2810 deadline: Duration,
2811) -> Result<(), EngineError> {
2812 let start = std::time::Instant::now();
2813 loop {
2814 match backend.describe_flow(flow_id).await? {
2815 Some(snap) if snap.public_flow_state == "cancelled" => return Ok(()),
2816 // `None` (flow removed) is also terminal from the caller's
2817 // perspective — nothing left to wait on.
2818 None => return Ok(()),
2819 Some(_) => {}
2820 }
2821 if start.elapsed() >= deadline {
2822 return Err(EngineError::Timeout {
2823 op: "cancel_flow",
2824 elapsed: deadline,
2825 });
2826 }
2827 tokio::time::sleep(CANCEL_WAIT_POLL_INTERVAL).await;
2828 }
2829}
2830
2831/// Convert a [`CancelFlowWait`] into the deadline passed to
2832/// [`wait_for_flow_cancellation`]. `NoWait` returns `None` — the caller
2833/// must skip the wait entirely.
2834pub fn cancel_flow_wait_deadline(wait: CancelFlowWait) -> Option<Duration> {
2835 // `CancelFlowWait` is `#[non_exhaustive]`; this match lives in the
2836 // defining crate so the exhaustiveness check keeps the compiler
2837 // honest. Future variants must be wired here explicitly.
2838 match wait {
2839 CancelFlowWait::NoWait => None,
2840 CancelFlowWait::WaitTimeout(d) => Some(d),
2841 CancelFlowWait::WaitIndefinite => Some(CANCEL_WAIT_INDEFINITE_CEILING),
2842 }
2843}
2844
2845/// Validate a caller-namespaced tag key against the regex
2846/// `^[a-z][a-z0-9_]*\.[a-z0-9_][a-z0-9_.]*$`.
2847///
2848/// The Rust trait-side check is **stricter than the Valkey Lua
2849/// contracts** (`ff_set_execution_tags` / `ff_set_flow_tags`), which
2850/// only check `^[a-z][a-z0-9_]*%.[^.]` — namespace prefix + first
2851/// suffix char, with the rest of the suffix unvalidated. Every
2852/// backend-side impl (`ff-backend-{valkey,postgres,sqlite}`) calls
2853/// this helper **before** the wire hop so the effective parity
2854/// contract is this full-key regex; Valkey's Lua is an additional,
2855/// more permissive server-side guard, not the parity-of-record.
2856///
2857/// A key passes iff:
2858///
2859/// * it begins with an ASCII lowercase letter;
2860/// * all characters up to the first `.` are lowercase alnum or `_`;
2861/// * the first `.` is followed by at least one non-`.` character
2862/// (so `cairn.` and `cairn..x` fail — the `<field>` must be non-empty).
2863///
2864/// Shared entry point for [`EngineBackend::set_execution_tag`] /
2865/// [`EngineBackend::set_flow_tag`] / [`EngineBackend::get_execution_tag`] /
2866/// [`EngineBackend::get_flow_tag`] so every backend rejects the same
2867/// keyspace. On rejection, returns
2868/// [`EngineError::Validation { kind: ValidationKind::InvalidInput, .. }`](crate::engine_error::EngineError::Validation)
2869/// with the offending key in `detail`.
2870///
2871/// `#[allow(clippy::result_large_err)]` — `EngineError` is the uniform
2872/// error across the whole `EngineBackend` trait surface (see trait method
2873/// signatures above); boxing it here alone would introduce a
2874/// gratuitous signature deviation. Clippy 1.95 flags free functions but
2875/// not trait methods; this function mirrors the trait-method convention.
2876#[allow(clippy::result_large_err)]
2877pub fn validate_tag_key(key: &str) -> Result<(), EngineError> {
2878 use crate::engine_error::ValidationKind;
2879
2880 let bad = || EngineError::Validation {
2881 kind: ValidationKind::InvalidInput,
2882 detail: format!(
2883 "invalid tag key: {key:?} (must match ^[a-z][a-z0-9_]*\\.[a-z0-9_][a-z0-9_.]*$)"
2884 ),
2885 };
2886
2887 let mut chars = key.chars();
2888 let first = chars.next().ok_or_else(bad)?;
2889 if !first.is_ascii_lowercase() {
2890 return Err(bad());
2891 }
2892 // Phase 1: consume the namespace prefix up to (but not including) the first `.`.
2893 // Chars must be `[a-z0-9_]`.
2894 let mut saw_dot = false;
2895 for c in chars.by_ref() {
2896 if c == '.' {
2897 saw_dot = true;
2898 break;
2899 }
2900 if !(c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_') {
2901 return Err(bad());
2902 }
2903 }
2904 if !saw_dot {
2905 return Err(bad());
2906 }
2907 // Phase 2: the first char after the first `.` must exist and be a
2908 // non-dot member char `[a-z0-9_]` (matches Valkey's Lua regex
2909 // `^[a-z][a-z0-9_]*%.[^.]` — a second consecutive dot is rejected).
2910 let second = chars.next().ok_or_else(bad)?;
2911 if !(second.is_ascii_lowercase() || second.is_ascii_digit() || second == '_') {
2912 return Err(bad());
2913 }
2914 // Phase 3 (Finding 2 tightening): every remaining char MUST be
2915 // `[a-z0-9_.]`. Before this, the suffix was unvalidated beyond
2916 // its first char — so `cairn.foo bar`, `cairn.Foo`,
2917 // `cairn.foo"bar`, `cairn.foo-bar` all passed trait-side validation
2918 // even though they would break SQLite JSON-path quoting, the Lua
2919 // HSET field-name conventions, or consumer grep patterns. Valkey's
2920 // Lua prefix regex is identically permissive on the suffix; this
2921 // tightens both layers from the Rust side. Dots in the suffix
2922 // remain legal (`app.sub.field` is valid) to preserve the existing
2923 // accepted shape.
2924 for c in chars {
2925 if !(c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_' || c == '.') {
2926 return Err(bad());
2927 }
2928 }
2929 Ok(())
2930}
2931
2932#[cfg(test)]
2933mod tests {
2934 use super::*;
2935
2936 /// A zero-state backend stub used to exercise the default
2937 /// `capabilities()` impl without pulling in a real
2938 /// transport. Only the default method is under test here; every
2939 /// other method is unreachable on this type.
2940 struct DefaultBackend;
2941
2942 #[async_trait]
2943 impl EngineBackend for DefaultBackend {
2944 async fn claim(
2945 &self,
2946 _lane: &LaneId,
2947 _capabilities: &CapabilitySet,
2948 _policy: ClaimPolicy,
2949 ) -> Result<Option<Handle>, EngineError> {
2950 unreachable!()
2951 }
2952 async fn renew(&self, _handle: &Handle) -> Result<LeaseRenewal, EngineError> {
2953 unreachable!()
2954 }
2955 async fn progress(
2956 &self,
2957 _handle: &Handle,
2958 _percent: Option<u8>,
2959 _message: Option<String>,
2960 ) -> Result<(), EngineError> {
2961 unreachable!()
2962 }
2963 async fn append_frame(
2964 &self,
2965 _handle: &Handle,
2966 _frame: Frame,
2967 ) -> Result<AppendFrameOutcome, EngineError> {
2968 unreachable!()
2969 }
2970 async fn complete(
2971 &self,
2972 _handle: &Handle,
2973 _payload: Option<Vec<u8>>,
2974 ) -> Result<(), EngineError> {
2975 unreachable!()
2976 }
2977 async fn fail(
2978 &self,
2979 _handle: &Handle,
2980 _reason: FailureReason,
2981 _classification: FailureClass,
2982 ) -> Result<FailOutcome, EngineError> {
2983 unreachable!()
2984 }
2985 async fn cancel(&self, _handle: &Handle, _reason: &str) -> Result<(), EngineError> {
2986 unreachable!()
2987 }
2988 async fn suspend(
2989 &self,
2990 _handle: &Handle,
2991 _args: SuspendArgs,
2992 ) -> Result<SuspendOutcome, EngineError> {
2993 unreachable!()
2994 }
2995 async fn create_waitpoint(
2996 &self,
2997 _handle: &Handle,
2998 _waitpoint_key: &str,
2999 _expires_in: Duration,
3000 ) -> Result<PendingWaitpoint, EngineError> {
3001 unreachable!()
3002 }
3003 async fn observe_signals(
3004 &self,
3005 _handle: &Handle,
3006 ) -> Result<Vec<ResumeSignal>, EngineError> {
3007 unreachable!()
3008 }
3009 async fn claim_from_resume_grant(
3010 &self,
3011 _token: ResumeToken,
3012 ) -> Result<Option<Handle>, EngineError> {
3013 unreachable!()
3014 }
3015 async fn delay(
3016 &self,
3017 _handle: &Handle,
3018 _delay_until: TimestampMs,
3019 ) -> Result<(), EngineError> {
3020 unreachable!()
3021 }
3022 async fn wait_children(&self, _handle: &Handle) -> Result<(), EngineError> {
3023 unreachable!()
3024 }
3025 async fn describe_execution(
3026 &self,
3027 _id: &ExecutionId,
3028 ) -> Result<Option<ExecutionSnapshot>, EngineError> {
3029 unreachable!()
3030 }
3031 async fn read_execution_context(
3032 &self,
3033 _execution_id: &ExecutionId,
3034 ) -> Result<ExecutionContext, EngineError> {
3035 Ok(ExecutionContext::new(
3036 Vec::new(),
3037 String::new(),
3038 std::collections::HashMap::new(),
3039 ))
3040 }
3041 async fn read_current_attempt_index(
3042 &self,
3043 _execution_id: &ExecutionId,
3044 ) -> Result<AttemptIndex, EngineError> {
3045 Ok(AttemptIndex::new(0))
3046 }
3047 async fn read_total_attempt_count(
3048 &self,
3049 _execution_id: &ExecutionId,
3050 ) -> Result<AttemptIndex, EngineError> {
3051 Ok(AttemptIndex::new(0))
3052 }
3053 async fn describe_flow(
3054 &self,
3055 _id: &FlowId,
3056 ) -> Result<Option<FlowSnapshot>, EngineError> {
3057 unreachable!()
3058 }
3059 #[cfg(feature = "core")]
3060 async fn list_edges(
3061 &self,
3062 _flow_id: &FlowId,
3063 _direction: EdgeDirection,
3064 ) -> Result<Vec<EdgeSnapshot>, EngineError> {
3065 unreachable!()
3066 }
3067 #[cfg(feature = "core")]
3068 async fn describe_edge(
3069 &self,
3070 _flow_id: &FlowId,
3071 _edge_id: &EdgeId,
3072 ) -> Result<Option<EdgeSnapshot>, EngineError> {
3073 unreachable!()
3074 }
3075 #[cfg(feature = "core")]
3076 async fn resolve_execution_flow_id(
3077 &self,
3078 _eid: &ExecutionId,
3079 ) -> Result<Option<FlowId>, EngineError> {
3080 unreachable!()
3081 }
3082 #[cfg(feature = "core")]
3083 async fn list_flows(
3084 &self,
3085 _partition: PartitionKey,
3086 _cursor: Option<FlowId>,
3087 _limit: usize,
3088 ) -> Result<ListFlowsPage, EngineError> {
3089 unreachable!()
3090 }
3091 #[cfg(feature = "core")]
3092 async fn list_lanes(
3093 &self,
3094 _cursor: Option<LaneId>,
3095 _limit: usize,
3096 ) -> Result<ListLanesPage, EngineError> {
3097 unreachable!()
3098 }
3099 #[cfg(feature = "core")]
3100 async fn list_suspended(
3101 &self,
3102 _partition: PartitionKey,
3103 _cursor: Option<ExecutionId>,
3104 _limit: usize,
3105 ) -> Result<ListSuspendedPage, EngineError> {
3106 unreachable!()
3107 }
3108 #[cfg(feature = "core")]
3109 async fn list_executions(
3110 &self,
3111 _partition: PartitionKey,
3112 _cursor: Option<ExecutionId>,
3113 _limit: usize,
3114 ) -> Result<ListExecutionsPage, EngineError> {
3115 unreachable!()
3116 }
3117 #[cfg(feature = "core")]
3118 async fn deliver_signal(
3119 &self,
3120 _args: DeliverSignalArgs,
3121 ) -> Result<DeliverSignalResult, EngineError> {
3122 unreachable!()
3123 }
3124 #[cfg(feature = "core")]
3125 async fn claim_resumed_execution(
3126 &self,
3127 _args: ClaimResumedExecutionArgs,
3128 ) -> Result<ClaimResumedExecutionResult, EngineError> {
3129 unreachable!()
3130 }
3131 async fn cancel_flow(
3132 &self,
3133 _id: &FlowId,
3134 _policy: CancelFlowPolicy,
3135 _wait: CancelFlowWait,
3136 ) -> Result<CancelFlowResult, EngineError> {
3137 unreachable!()
3138 }
3139 #[cfg(feature = "core")]
3140 async fn set_edge_group_policy(
3141 &self,
3142 _flow_id: &FlowId,
3143 _downstream_execution_id: &ExecutionId,
3144 _policy: crate::contracts::EdgeDependencyPolicy,
3145 ) -> Result<crate::contracts::SetEdgeGroupPolicyResult, EngineError> {
3146 unreachable!()
3147 }
3148 async fn report_usage(
3149 &self,
3150 _handle: &Handle,
3151 _budget: &BudgetId,
3152 _dimensions: crate::backend::UsageDimensions,
3153 ) -> Result<ReportUsageResult, EngineError> {
3154 unreachable!()
3155 }
3156 #[cfg(feature = "streaming")]
3157 async fn read_stream(
3158 &self,
3159 _execution_id: &ExecutionId,
3160 _attempt_index: AttemptIndex,
3161 _from: StreamCursor,
3162 _to: StreamCursor,
3163 _count_limit: u64,
3164 ) -> Result<StreamFrames, EngineError> {
3165 unreachable!()
3166 }
3167 #[cfg(feature = "streaming")]
3168 async fn tail_stream(
3169 &self,
3170 _execution_id: &ExecutionId,
3171 _attempt_index: AttemptIndex,
3172 _after: StreamCursor,
3173 _block_ms: u64,
3174 _count_limit: u64,
3175 _visibility: TailVisibility,
3176 ) -> Result<StreamFrames, EngineError> {
3177 unreachable!()
3178 }
3179 #[cfg(feature = "streaming")]
3180 async fn read_summary(
3181 &self,
3182 _execution_id: &ExecutionId,
3183 _attempt_index: AttemptIndex,
3184 ) -> Result<Option<SummaryDocument>, EngineError> {
3185 unreachable!()
3186 }
3187 }
3188
3189 /// The default `capabilities()` impl returns a value tagged
3190 /// `family = "unknown"` with every `supports.*` bool false, so
3191 /// pre-RFC-018 out-of-tree backends keep compiling and consumers
3192 /// can distinguish "backend predates RFC-018" from "backend
3193 /// reports concrete bools." Every concrete in-tree backend
3194 /// overrides.
3195 #[test]
3196 fn default_capabilities_is_unknown_family_all_false() {
3197 let b = DefaultBackend;
3198 let caps = b.capabilities();
3199 assert_eq!(caps.identity.family, "unknown");
3200 assert_eq!(
3201 caps.identity.version,
3202 crate::capability::Version::new(0, 0, 0)
3203 );
3204 assert_eq!(caps.identity.rfc017_stage, "unknown");
3205 // Every field false on the default (matches `Supports::none()`).
3206 assert_eq!(caps.supports, crate::capability::Supports::none());
3207 }
3208
3209 // ── resolve_dependency (PR-7b Step 0) ──
3210
3211 /// Default impl returns `Unavailable { op: "resolve_dependency" }`
3212 /// so pre-migration backends surface a typed Unsupported-grade
3213 /// error rather than panic. Both cluster 2's dependency_reconciler
3214 /// and cluster 4's completion_listener route through this method;
3215 /// the default is the safety net for non-Valkey deployments.
3216 #[cfg(feature = "core")]
3217 #[tokio::test]
3218 async fn default_resolve_dependency_is_unavailable() {
3219 use crate::contracts::ResolveDependencyArgs;
3220 use crate::partition::{Partition, PartitionFamily};
3221 use crate::types::{AttemptIndex, EdgeId, FlowId, LaneId};
3222
3223 let b = DefaultBackend;
3224 let partition = Partition {
3225 family: PartitionFamily::Flow,
3226 index: 0,
3227 };
3228 let args = ResolveDependencyArgs::new(
3229 partition,
3230 FlowId::parse("11111111-1111-1111-1111-111111111111").unwrap(),
3231 ExecutionId::parse("{fp:0}:22222222-2222-2222-2222-222222222222").unwrap(),
3232 ExecutionId::parse("{fp:0}:33333333-3333-3333-3333-333333333333").unwrap(),
3233 EdgeId::parse("44444444-4444-4444-4444-444444444444").unwrap(),
3234 LaneId::new("default"),
3235 AttemptIndex::new(0),
3236 "success".to_owned(),
3237 TimestampMs::now(),
3238 );
3239 match b.resolve_dependency(args).await {
3240 Err(EngineError::Unavailable { op }) => {
3241 assert_eq!(op, "resolve_dependency");
3242 }
3243 other => panic!("expected Unavailable, got {other:?}"),
3244 }
3245 }
3246
3247 // ── cascade_completion (PR-7b Cluster 4) ──
3248
3249 /// Default impl returns `Unavailable { op: "cascade_completion" }`
3250 /// so pre-migration / non-core builds surface a typed error rather
3251 /// than panic. The push-based completion listener routes through
3252 /// this method; the default is the safety net for deployments
3253 /// whose backend doesn't cascade (e.g. SQLite in the current
3254 /// Wave 9 scope).
3255 #[cfg(feature = "core")]
3256 #[tokio::test]
3257 async fn default_cascade_completion_is_unavailable() {
3258 use crate::backend::CompletionPayload;
3259
3260 let b = DefaultBackend;
3261 let eid = ExecutionId::parse(
3262 "{fp:0}:66666666-6666-6666-6666-666666666666",
3263 )
3264 .unwrap();
3265 let payload = CompletionPayload::new(eid, "success", None, TimestampMs::now());
3266 match b.cascade_completion(&payload).await {
3267 Err(EngineError::Unavailable { op }) => {
3268 assert_eq!(op, "cascade_completion");
3269 }
3270 other => panic!("expected Unavailable, got {other:?}"),
3271 }
3272 }
3273
3274 // ── unblock_execution (PR-7b Cluster 2) ──
3275
3276 /// Default impl returns `Unavailable { op: "unblock_execution" }`
3277 /// so non-Valkey deployments get a typed `Unavailable` rather than
3278 /// a panic. The engine scanner loop on PG/SQLite skips this path
3279 /// entirely (scheduler eligibility is re-evaluated live via SQL
3280 /// predicates), but a stray direct call must still fail gracefully.
3281 #[cfg(feature = "core")]
3282 #[tokio::test]
3283 async fn default_unblock_execution_is_unavailable() {
3284 use crate::partition::{Partition, PartitionFamily};
3285
3286 let b = DefaultBackend;
3287 let partition = Partition {
3288 family: PartitionFamily::Execution,
3289 index: 0,
3290 };
3291 let eid = ExecutionId::parse(
3292 "{fp:0}:55555555-5555-5555-5555-555555555555",
3293 )
3294 .unwrap();
3295 let lane = LaneId::new("default");
3296 match b
3297 .unblock_execution(
3298 partition,
3299 &lane,
3300 &eid,
3301 "waiting_for_budget",
3302 TimestampMs::from_millis(0),
3303 )
3304 .await
3305 {
3306 Err(EngineError::Unavailable { op }) => {
3307 assert_eq!(op, "unblock_execution");
3308 }
3309 other => panic!("expected Unavailable, got {other:?}"),
3310 }
3311 }
3312
3313 // ── project_flow_summary (PR-7b Cluster 2b-B) ──
3314
3315 /// Default impl returns `Unavailable { op: "project_flow_summary" }`
3316 /// so non-Valkey deployments get a typed `Unavailable` rather than
3317 /// a panic. SQLite rides the default per RFC-023 Phase 3.5.
3318 #[cfg(feature = "core")]
3319 #[tokio::test]
3320 async fn default_project_flow_summary_is_unavailable() {
3321 use crate::partition::{Partition, PartitionFamily};
3322 use crate::types::FlowId;
3323
3324 let b = DefaultBackend;
3325 let partition = Partition {
3326 family: PartitionFamily::Flow,
3327 index: 0,
3328 };
3329 let fid = FlowId::parse("11111111-1111-1111-1111-111111111111").unwrap();
3330 match b
3331 .project_flow_summary(partition, &fid, TimestampMs::from_millis(0))
3332 .await
3333 {
3334 Err(EngineError::Unavailable { op }) => {
3335 assert_eq!(op, "project_flow_summary");
3336 }
3337 other => panic!("expected Unavailable, got {other:?}"),
3338 }
3339 }
3340
3341 // ── trim_retention (PR-7b Cluster 2b-B) ──
3342
3343 /// Default impl returns `Unavailable { op: "trim_retention" }` so
3344 /// non-Valkey deployments get a typed `Unavailable` rather than a
3345 /// panic. SQLite rides the default per RFC-023 Phase 3.5.
3346 #[cfg(feature = "core")]
3347 #[tokio::test]
3348 async fn default_trim_retention_is_unavailable() {
3349 use crate::partition::{Partition, PartitionFamily};
3350
3351 let b = DefaultBackend;
3352 let partition = Partition {
3353 family: PartitionFamily::Execution,
3354 index: 0,
3355 };
3356 let lane = LaneId::new("default");
3357 match b
3358 .trim_retention(
3359 partition,
3360 &lane,
3361 60_000,
3362 TimestampMs::from_millis(0),
3363 20,
3364 &crate::backend::ScannerFilter::NOOP,
3365 )
3366 .await
3367 {
3368 Err(EngineError::Unavailable { op }) => {
3369 assert_eq!(op, "trim_retention");
3370 }
3371 other => panic!("expected Unavailable, got {other:?}"),
3372 }
3373 }
3374
3375 // ── read_exec_core_fields (PR-7b Wave 0a) ──
3376
3377 /// Default impl returns `Unavailable { op: "read_exec_core_fields" }`
3378 /// so out-of-tree backends that pre-date v0.13 keep compiling while
3379 /// surfacing a typed error on call. Empty-field input short-circuits
3380 /// to `Ok(empty map)` in all in-tree impls; the default is triggered
3381 /// only when the trait method itself is un-overridden.
3382 #[cfg(feature = "core")]
3383 #[tokio::test]
3384 async fn default_read_exec_core_fields_is_unavailable() {
3385 use crate::partition::{Partition, PartitionFamily};
3386
3387 let b = DefaultBackend;
3388 let partition = Partition {
3389 family: PartitionFamily::Execution,
3390 index: 0,
3391 };
3392 let eid = ExecutionId::parse(
3393 "{fp:0}:66666666-6666-6666-6666-666666666666",
3394 )
3395 .unwrap();
3396 match b
3397 .read_exec_core_fields(partition, &eid, &["lane_id"])
3398 .await
3399 {
3400 Err(EngineError::Unavailable { op }) => {
3401 assert_eq!(op, "read_exec_core_fields");
3402 }
3403 other => panic!("expected Unavailable, got {other:?}"),
3404 }
3405 }
3406
3407 // ── validate_tag_key (issue #433) ──
3408
3409 #[test]
3410 fn validate_tag_key_accepts_valid() {
3411 for k in [
3412 "cairn.session_id",
3413 "cairn.project",
3414 "a.b",
3415 "a1_2.x",
3416 "app.sub.field",
3417 "x.y_z",
3418 ] {
3419 validate_tag_key(k).unwrap_or_else(|e| panic!("{k:?} should pass: {e:?}"));
3420 }
3421 }
3422
3423 #[test]
3424 fn validate_tag_key_rejects_invalid() {
3425 for k in [
3426 "", // empty
3427 "Cairn.x", // uppercase first
3428 "1cairn.x", // leading digit
3429 "cairn", // no dot
3430 "cairn.", // empty suffix
3431 "cairn..x", // dot immediately after first dot
3432 ".cairn", // leading dot
3433 "cair n.x", // space before dot
3434 "ca-irn.x", // hyphen in prefix
3435 // Finding 2 tightening — suffix now fully validated.
3436 "cairn.Foo", // uppercase in suffix
3437 "cairn.foo bar", // space in suffix
3438 "cairn.foo\"bar", // double-quote in suffix (would break SQLite JSON-path quoting)
3439 "cairn.foo-bar", // hyphen in suffix
3440 "cairn.foo\\bar", // backslash in suffix
3441 ] {
3442 let err = validate_tag_key(k)
3443 .err()
3444 .unwrap_or_else(|| panic!("{k:?} should fail"));
3445 match err {
3446 EngineError::Validation {
3447 kind: crate::engine_error::ValidationKind::InvalidInput,
3448 ..
3449 } => {}
3450 other => panic!("{k:?}: unexpected err {other:?}"),
3451 }
3452 }
3453 }
3454
3455 // ── cairn #389: service-layer FCALL trait-method defaults ──
3456 //
3457 // Each method MUST return `EngineError::Unavailable { op: "<name>" }`
3458 // on backends that haven't overridden it, so out-of-tree backends
3459 // keep compiling and consumers get a terminal-classified error
3460 // rather than a panic. Mirrors the precedent used by
3461 // `issue_reclaim_grant` / `reclaim_execution` / `suspend_by_triple`.
3462 //
3463 // Feature-gated on `core` because the methods under test only
3464 // exist on the trait under that feature — matches the precedent
3465 // of `default_resolve_dependency_is_unavailable` above.
3466
3467 #[cfg(feature = "core")]
3468 #[tokio::test]
3469 async fn default_complete_execution_is_unavailable() {
3470 use crate::contracts::CompleteExecutionArgs;
3471 use crate::types::{ExecutionId, FlowId};
3472 let b = DefaultBackend;
3473 let config = crate::partition::PartitionConfig::default();
3474 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3475 let args = CompleteExecutionArgs {
3476 execution_id: eid,
3477 fence: None,
3478 attempt_index: AttemptIndex::new(0),
3479 result_payload: None,
3480 result_encoding: None,
3481 source: crate::types::CancelSource::default(),
3482 now: TimestampMs::from_millis(0),
3483 };
3484 match b.complete_execution(args).await.unwrap_err() {
3485 EngineError::Unavailable { op } => assert_eq!(op, "complete_execution"),
3486 other => panic!("expected Unavailable, got {other:?}"),
3487 }
3488 }
3489
3490 #[cfg(feature = "core")]
3491 #[tokio::test]
3492 async fn default_fail_execution_is_unavailable() {
3493 use crate::contracts::FailExecutionArgs;
3494 use crate::types::{ExecutionId, FlowId};
3495 let b = DefaultBackend;
3496 let config = crate::partition::PartitionConfig::default();
3497 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3498 let args = FailExecutionArgs {
3499 execution_id: eid,
3500 fence: None,
3501 attempt_index: AttemptIndex::new(0),
3502 failure_reason: String::new(),
3503 failure_category: String::new(),
3504 retry_policy_json: String::new(),
3505 next_attempt_policy_json: String::new(),
3506 source: crate::types::CancelSource::default(),
3507 };
3508 match b.fail_execution(args).await.unwrap_err() {
3509 EngineError::Unavailable { op } => assert_eq!(op, "fail_execution"),
3510 other => panic!("expected Unavailable, got {other:?}"),
3511 }
3512 }
3513
3514 #[cfg(feature = "core")]
3515 #[tokio::test]
3516 async fn default_renew_lease_is_unavailable() {
3517 use crate::contracts::RenewLeaseArgs;
3518 use crate::types::{ExecutionId, FlowId};
3519 let b = DefaultBackend;
3520 let config = crate::partition::PartitionConfig::default();
3521 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3522 let args = RenewLeaseArgs {
3523 execution_id: eid,
3524 attempt_index: AttemptIndex::new(0),
3525 fence: None,
3526 lease_ttl_ms: 1_000,
3527 lease_history_grace_ms: 60_000,
3528 };
3529 match b.renew_lease(args).await.unwrap_err() {
3530 EngineError::Unavailable { op } => assert_eq!(op, "renew_lease"),
3531 other => panic!("expected Unavailable, got {other:?}"),
3532 }
3533 }
3534
3535 #[cfg(feature = "core")]
3536 #[tokio::test]
3537 async fn default_resume_execution_is_unavailable() {
3538 use crate::contracts::ResumeExecutionArgs;
3539 use crate::types::{ExecutionId, FlowId};
3540 let b = DefaultBackend;
3541 let config = crate::partition::PartitionConfig::default();
3542 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3543 let args = ResumeExecutionArgs {
3544 execution_id: eid,
3545 trigger_type: "signal".to_owned(),
3546 resume_delay_ms: 0,
3547 };
3548 match b.resume_execution(args).await.unwrap_err() {
3549 EngineError::Unavailable { op } => assert_eq!(op, "resume_execution"),
3550 other => panic!("expected Unavailable, got {other:?}"),
3551 }
3552 }
3553
3554 #[cfg(feature = "core")]
3555 #[tokio::test]
3556 async fn default_check_admission_is_unavailable() {
3557 use crate::contracts::CheckAdmissionArgs;
3558 use crate::types::{ExecutionId, FlowId, QuotaPolicyId};
3559 let b = DefaultBackend;
3560 let config = crate::partition::PartitionConfig::default();
3561 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3562 let args = CheckAdmissionArgs {
3563 execution_id: eid,
3564 now: TimestampMs::from_millis(0),
3565 window_seconds: 60,
3566 rate_limit: 10,
3567 concurrency_cap: 1,
3568 jitter_ms: None,
3569 };
3570 let qid = QuotaPolicyId::new();
3571 match b
3572 .check_admission(&qid, "default", args)
3573 .await
3574 .unwrap_err()
3575 {
3576 EngineError::Unavailable { op } => assert_eq!(op, "check_admission"),
3577 other => panic!("expected Unavailable, got {other:?}"),
3578 }
3579 }
3580
3581 #[cfg(feature = "core")]
3582 #[tokio::test]
3583 async fn default_evaluate_flow_eligibility_is_unavailable() {
3584 use crate::contracts::EvaluateFlowEligibilityArgs;
3585 use crate::types::{ExecutionId, FlowId};
3586 let b = DefaultBackend;
3587 let config = crate::partition::PartitionConfig::default();
3588 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3589 let args = EvaluateFlowEligibilityArgs { execution_id: eid };
3590 match b.evaluate_flow_eligibility(args).await.unwrap_err() {
3591 EngineError::Unavailable { op } => assert_eq!(op, "evaluate_flow_eligibility"),
3592 other => panic!("expected Unavailable, got {other:?}"),
3593 }
3594 }
3595
3596 // ── Cluster 2b-A tally-recompute reconcilers ──
3597
3598 #[cfg(feature = "core")]
3599 #[tokio::test]
3600 async fn default_reconcile_execution_index_is_unavailable() {
3601 use crate::backend::ScannerFilter;
3602 use crate::partition::{Partition, PartitionFamily};
3603 let b = DefaultBackend;
3604 let partition = Partition { family: PartitionFamily::Execution, index: 0 };
3605 let lanes = [LaneId::new("default")];
3606 let filter = ScannerFilter::default();
3607 match b
3608 .reconcile_execution_index(partition, &lanes, &filter)
3609 .await
3610 .unwrap_err()
3611 {
3612 EngineError::Unavailable { op } => assert_eq!(op, "reconcile_execution_index"),
3613 other => panic!("expected Unavailable, got {other:?}"),
3614 }
3615 }
3616
3617 #[cfg(feature = "core")]
3618 #[tokio::test]
3619 async fn default_reconcile_budget_counters_is_unavailable() {
3620 use crate::partition::{Partition, PartitionFamily};
3621 let b = DefaultBackend;
3622 let partition = Partition { family: PartitionFamily::Budget, index: 0 };
3623 match b
3624 .reconcile_budget_counters(partition, TimestampMs::from_millis(0))
3625 .await
3626 .unwrap_err()
3627 {
3628 EngineError::Unavailable { op } => assert_eq!(op, "reconcile_budget_counters"),
3629 other => panic!("expected Unavailable, got {other:?}"),
3630 }
3631 }
3632
3633 #[cfg(feature = "core")]
3634 #[tokio::test]
3635 async fn default_reconcile_quota_counters_is_unavailable() {
3636 use crate::partition::{Partition, PartitionFamily};
3637 let b = DefaultBackend;
3638 let partition = Partition { family: PartitionFamily::Quota, index: 0 };
3639 match b
3640 .reconcile_quota_counters(partition, TimestampMs::from_millis(0))
3641 .await
3642 .unwrap_err()
3643 {
3644 EngineError::Unavailable { op } => assert_eq!(op, "reconcile_quota_counters"),
3645 other => panic!("expected Unavailable, got {other:?}"),
3646 }
3647 }
3648
3649 // ── #454 trait-additions default-impl tests (4) ────────────
3650
3651 #[cfg(feature = "core")]
3652 #[tokio::test]
3653 async fn default_record_spend_is_unavailable() {
3654 use crate::contracts::RecordSpendArgs;
3655 use crate::types::{BudgetId, ExecutionId, FlowId};
3656 let b = DefaultBackend;
3657 let config = crate::partition::PartitionConfig::default();
3658 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3659 let args = RecordSpendArgs::new(
3660 BudgetId::new(),
3661 eid,
3662 std::collections::BTreeMap::new(),
3663 "k",
3664 );
3665 match b.record_spend(args).await.unwrap_err() {
3666 EngineError::Unavailable { op } => assert_eq!(op, "record_spend"),
3667 other => panic!("expected Unavailable, got {other:?}"),
3668 }
3669 }
3670
3671 #[cfg(feature = "core")]
3672 #[tokio::test]
3673 async fn default_release_budget_is_unavailable() {
3674 use crate::contracts::ReleaseBudgetArgs;
3675 use crate::types::{BudgetId, ExecutionId, FlowId};
3676 let b = DefaultBackend;
3677 let config = crate::partition::PartitionConfig::default();
3678 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3679 let args = ReleaseBudgetArgs::new(BudgetId::new(), eid);
3680 match b.release_budget(args).await.unwrap_err() {
3681 EngineError::Unavailable { op } => assert_eq!(op, "release_budget"),
3682 other => panic!("expected Unavailable, got {other:?}"),
3683 }
3684 }
3685
3686 #[cfg(feature = "core")]
3687 #[tokio::test]
3688 async fn default_deliver_approval_signal_is_unavailable() {
3689 use crate::contracts::DeliverApprovalSignalArgs;
3690 use crate::types::{ExecutionId, FlowId, LaneId, WaitpointId};
3691 let b = DefaultBackend;
3692 let config = crate::partition::PartitionConfig::default();
3693 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3694 let args = DeliverApprovalSignalArgs::new(
3695 eid,
3696 LaneId::new("default"),
3697 WaitpointId::new(),
3698 "approved",
3699 "sfx",
3700 1_000,
3701 Some(100),
3702 Some(10),
3703 );
3704 match b.deliver_approval_signal(args).await.unwrap_err() {
3705 EngineError::Unavailable { op } => assert_eq!(op, "deliver_approval_signal"),
3706 other => panic!("expected Unavailable, got {other:?}"),
3707 }
3708 }
3709
3710 #[cfg(feature = "core")]
3711 #[tokio::test]
3712 async fn default_issue_grant_and_claim_is_unavailable() {
3713 use crate::contracts::IssueGrantAndClaimArgs;
3714 use crate::types::{ExecutionId, FlowId, LaneId};
3715 let b = DefaultBackend;
3716 let config = crate::partition::PartitionConfig::default();
3717 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3718 let args = IssueGrantAndClaimArgs::new(eid, LaneId::new("default"), 30_000);
3719 match b.issue_grant_and_claim(args).await.unwrap_err() {
3720 EngineError::Unavailable { op } => assert_eq!(op, "issue_grant_and_claim"),
3721 other => panic!("expected Unavailable, got {other:?}"),
3722 }
3723 }
3724}