ff_core/engine_backend.rs
1//! The `EngineBackend` trait — abstracting FlowFabric's write surface.
2//!
3//! **RFC-012 Stage 1a:** this is the trait landing. The
4//! Valkey-backed impl lives in `ff-backend-valkey`; future backends
5//! (Postgres) add a sibling crate with their own impl. ff-sdk's
6//! `FlowFabricWorker` gains `connect_with(backend)` /
7//! `backend(&self)` accessors so consumers that want to bring their
8//! own backend (tests, future non-Valkey deployments) can hand one
9//! in. The hot-path migration of `ClaimedTask` / `FlowFabricWorker`
10//! to forward through the trait lands across Stages 1b-1d.
11//!
12//! # Object safety
13//!
14//! `EngineBackend` is object-safe: all methods are `async fn` behind
15//! `#[async_trait]` and take `&self`. Consumers can hold
16//! `Arc<dyn EngineBackend>` for heterogenous-backend deployments.
17//! The trait is `Send + Sync + 'static` per RFC-012 §4.1; every impl
18//! must honour that bound.
19//!
20//! # Error surface
21//!
22//! Every method returns [`Result<_, EngineError>`]. `EngineError`'s
23//! `Transport` variant carries a boxed `dyn Error + Send + Sync`;
24//! Valkey-backed transport faults box a
25//! `ff_script::error::ScriptError` (downcast via
26//! `ff_script::engine_error_ext::transport_script_ref`). Other
27//! backends box their native error type and set the `backend` tag
28//! accordingly.
29//!
30//! # Atomicity contract
31//!
32//! Per-op state transitions MUST be atomic (RFC-012 §3.4). On Valkey
33//! this is the single-FCALL-per-op property; on Postgres it is the
34//! per-transaction property. A backend that cannot honour atomicity
35//! for a given op either MUST NOT implement `EngineBackend` or MUST
36//! return `EngineError::Unavailable { op }` for the affected method.
37//!
38//! # Replay semantics
39//!
40//! `complete`, `fail`, `cancel`, `suspend`, `delay`, `wait_children`
41//! are idempotent under replay — calling twice with the same handle
42//! and args returns the same outcome (success on first call, typed
43//! `State` / `Contention` on subsequent calls where the fence triple
44//! no longer matches a live lease).
45
46use std::time::Duration;
47
48use async_trait::async_trait;
49
50use crate::backend::{
51 AppendFrameOutcome, CancelFlowPolicy, CancelFlowWait, CapabilitySet, ClaimPolicy,
52 FailOutcome, FailureClass, FailureReason, Frame, Handle, LeaseRenewal, PendingWaitpoint,
53 PrepareOutcome, ResumeSignal, ResumeToken,
54};
55// `SummaryDocument` and `TailVisibility` are referenced only inside
56// `#[cfg(feature = "streaming")]` trait methods below, so the imports
57// must be gated to avoid an unused-imports warning on the non-streaming
58// build.
59#[cfg(feature = "streaming")]
60use crate::backend::{SummaryDocument, TailVisibility};
61use crate::contracts::{
62 CancelFlowResult, ExecutionContext, ExecutionSnapshot, FlowSnapshot, IssueReclaimGrantArgs,
63 IssueReclaimGrantOutcome, ReclaimExecutionArgs, ReclaimExecutionOutcome, ReportUsageResult,
64 RotateWaitpointHmacSecretAllArgs, RotateWaitpointHmacSecretAllResult, SeedOutcome,
65 SeedWaitpointHmacSecretArgs, SuspendArgs, SuspendOutcome,
66};
67#[cfg(feature = "core")]
68use crate::contracts::{
69 AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
70 ApplyDependencyToChildResult, BudgetStatus, CancelExecutionArgs, CancelExecutionResult,
71 CancelFlowArgs, ChangePriorityArgs, ChangePriorityResult, ClaimExecutionArgs,
72 ClaimExecutionResult, ClaimForWorkerArgs, ClaimForWorkerOutcome, ClaimResumedExecutionArgs,
73 ClaimResumedExecutionResult,
74 BlockRouteArgs, BlockRouteOutcome, CheckAdmissionArgs, CheckAdmissionResult,
75 ClaimGrantOutcome,
76 CompleteExecutionArgs, CompleteExecutionResult, CreateBudgetArgs, CreateBudgetResult,
77 CreateExecutionArgs, CreateExecutionResult, CreateFlowArgs, CreateFlowResult,
78 CreateQuotaPolicyArgs, CreateQuotaPolicyResult,
79 DeliverApprovalSignalArgs, DeliverSignalArgs, DeliverSignalResult, EdgeDirection, EdgeSnapshot,
80 EvaluateFlowEligibilityArgs, EvaluateFlowEligibilityResult, ExecutionInfo,
81 FailExecutionArgs, FailExecutionResult,
82 IssueClaimGrantArgs, IssueClaimGrantOutcome, IssueGrantAndClaimArgs,
83 RecordSpendArgs, ReleaseBudgetArgs, ScanEligibleArgs,
84 ListExecutionsPage, ListFlowsPage, ListLanesPage, ListPendingWaitpointsArgs,
85 ListPendingWaitpointsResult, ListSuspendedPage, RenewLeaseArgs, RenewLeaseResult,
86 ReplayExecutionArgs, ReplayExecutionResult,
87 ReportUsageAdminArgs, ResetBudgetArgs, ResetBudgetResult, ResumeExecutionArgs,
88 ResumeExecutionResult, RevokeLeaseArgs, RevokeLeaseResult,
89 StageDependencyEdgeArgs, StageDependencyEdgeResult,
90};
91#[cfg(feature = "core")]
92use crate::state::PublicState;
93#[cfg(feature = "core")]
94use crate::partition::PartitionKey;
95#[cfg(feature = "streaming")]
96use crate::contracts::{StreamCursor, StreamFrames};
97use crate::engine_error::EngineError;
98#[cfg(feature = "core")]
99use crate::types::EdgeId;
100#[cfg(feature = "core")]
101use crate::types::WaitpointId;
102use crate::types::{AttemptIndex, BudgetId, ExecutionId, FlowId, LaneId, LeaseFence, TimestampMs};
103
104/// The engine write surface — a single trait a backend implementation
105/// honours to serve a `FlowFabricWorker`.
106///
107/// See RFC-012 §3.1 for the inventory rationale and §3.3 for the
108/// type-level shape. 16 methods (Round-7 added `create_waitpoint`;
109/// `append_frame` return widened; `report_usage` return replaced —
110/// RFC-012 §R7). Issue #150 added the two trigger-surface methods
111/// (`deliver_signal` / `claim_resumed_execution`).
112///
113/// # Note on `complete` payload shape
114///
115/// The RFC §3.3 sketch uses `Option<Bytes>`; the Stage 1a trait uses
116/// `Option<Vec<u8>>` to match the existing
117/// `ff_sdk::ClaimedTask::complete` signature and avoid adding a
118/// `bytes` public-type dep for zero consumer benefit. Round-4 §7.17
119/// resolved the payload container debate to `Box<[u8]>` in the
120/// public type (see `HandleOpaque`); `Option<Vec<u8>>` is the
121/// zero-churn choice consistent with today's code. Consumers that
122/// need `&[u8]` can borrow via `.as_deref()` on the Option.
123#[async_trait]
124pub trait EngineBackend: Send + Sync + 'static {
125 // ── Claim + lifecycle ──
126
127 /// Fresh-work claim. Returns `Ok(None)` when no work is currently
128 /// available; `Err` only on transport or input-validation faults.
129 async fn claim(
130 &self,
131 lane: &LaneId,
132 capabilities: &CapabilitySet,
133 policy: ClaimPolicy,
134 ) -> Result<Option<Handle>, EngineError>;
135
136 /// Renew a held lease. Returns the updated expiry + epoch on
137 /// success; typed `State::StaleLease` / `State::LeaseExpired`
138 /// when the lease has been stolen or timed out.
139 async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError>;
140
141 /// Numeric-progress heartbeat.
142 ///
143 /// Writes scalar `progress_percent` / `progress_message` fields on
144 /// `exec_core`; each call overwrites the previous value. This does
145 /// NOT append to the output stream — stream-frame producers must use
146 /// [`append_frame`](Self::append_frame) instead.
147 async fn progress(
148 &self,
149 handle: &Handle,
150 percent: Option<u8>,
151 message: Option<String>,
152 ) -> Result<(), EngineError>;
153
154 /// Append one stream frame. Distinct from [`progress`](Self::progress)
155 /// per RFC-012 §3.1.1 K#6. Returns the backend-assigned stream entry
156 /// id and post-append frame count (RFC-012 §R7.2.1).
157 ///
158 /// Stream-frame producers (arbitrary `frame_type` + payload, consumed
159 /// via the read/tail surfaces) MUST use this method rather than
160 /// [`progress`](Self::progress); the latter updates scalar fields on
161 /// `exec_core` and is invisible to stream consumers.
162 async fn append_frame(
163 &self,
164 handle: &Handle,
165 frame: Frame,
166 ) -> Result<AppendFrameOutcome, EngineError>;
167
168 /// Terminal success. Borrows `handle` (round-4 M-D2) so callers
169 /// can retry under `EngineError::Transport` without losing the
170 /// cookie. Payload is `Option<Vec<u8>>` per the note above.
171 async fn complete(&self, handle: &Handle, payload: Option<Vec<u8>>) -> Result<(), EngineError>;
172
173 /// Terminal failure with classification. Returns [`FailOutcome`]
174 /// so the caller learns whether a retry was scheduled.
175 async fn fail(
176 &self,
177 handle: &Handle,
178 reason: FailureReason,
179 classification: FailureClass,
180 ) -> Result<FailOutcome, EngineError>;
181
182 /// Cooperative cancel by the worker holding the lease.
183 async fn cancel(&self, handle: &Handle, reason: &str) -> Result<(), EngineError>;
184
185 /// Suspend the execution awaiting a typed resume condition
186 /// (RFC-013 Stage 1d).
187 ///
188 /// Borrows `handle` (round-4 M-D2). Terminal-looking behaviour is
189 /// expressed through [`SuspendOutcome`]:
190 ///
191 /// * [`SuspendOutcome::Suspended`] — the pre-suspend handle is
192 /// logically invalidated; the fresh `HandleKind::Suspended`
193 /// handle inside the variant supersedes it. Runtime enforcement
194 /// via the fence triple: subsequent ops against the stale handle
195 /// surface as `Contention(LeaseConflict)`.
196 /// * [`SuspendOutcome::AlreadySatisfied`] — buffered signals on a
197 /// pending waitpoint already matched the resume condition at
198 /// suspension time. The lease is NOT released; the caller's
199 /// pre-suspend handle remains valid.
200 ///
201 /// See RFC-013 §2 for the type shapes, §3 for the replay /
202 /// idempotency contract, §4 for the error taxonomy.
203 async fn suspend(
204 &self,
205 handle: &Handle,
206 args: SuspendArgs,
207 ) -> Result<SuspendOutcome, EngineError>;
208
209 /// Suspend by execution id + lease fence triple, for service-layer
210 /// callers that hold a run record / lease-claim descriptor but no
211 /// worker [`Handle`] (cairn issue #322).
212 ///
213 /// Semantics mirror [`Self::suspend`] exactly — the same
214 /// [`SuspendArgs`] validation, the same [`SuspendOutcome`]
215 /// lifecycle, the same RFC-013 §3 dedup / replay contract. The
216 /// only difference is the fencing source: instead of the
217 /// `(lease_id, lease_epoch, attempt_id)` fields embedded in a
218 /// `Handle`, the backend fences against the triple passed directly.
219 /// Attempt-index, lane, and worker-instance metadata that
220 /// [`Self::suspend`] reads from the handle payload are recovered
221 /// from the backend's authoritative execution record (Valkey:
222 /// `exec_core` HGETs; Postgres: `ff_attempt` row lookup).
223 ///
224 /// The default impl returns [`EngineError::Unavailable`] so
225 /// existing backend impls remain non-breaking. Production backends
226 /// (Valkey, Postgres) override.
227 async fn suspend_by_triple(
228 &self,
229 exec_id: ExecutionId,
230 triple: LeaseFence,
231 args: SuspendArgs,
232 ) -> Result<SuspendOutcome, EngineError> {
233 let _ = (exec_id, triple, args);
234 Err(EngineError::Unavailable {
235 op: "suspend_by_triple",
236 })
237 }
238
239 /// Issue a pending waitpoint for future signal delivery.
240 ///
241 /// Waitpoints have two states in the Valkey wire contract:
242 /// **pending** (token issued, not yet backing a suspension) and
243 /// **active** (bound to a suspension). This method creates a
244 /// waitpoint in the **pending** state. A later `suspend` call
245 /// transitions a pending waitpoint to active (see Lua
246 /// `use_pending_waitpoint` ARGV flag at
247 /// `flowfabric.lua:3603,3641,3690`) — or, if buffered signals
248 /// already satisfy its condition, the suspend call returns
249 /// `SuspendOutcome::AlreadySatisfied` and the waitpoint activates
250 /// without ever releasing the lease.
251 ///
252 /// Pending-waitpoint expiry is a first-class terminal error on
253 /// the wire (`PendingWaitpointExpired` at
254 /// `ff-script/src/error.rs:170,403-408`). The attempt retains its
255 /// lease while the waitpoint is pending; signals delivered to
256 /// this waitpoint are buffered server-side (RFC-012 §R7.2.2).
257 async fn create_waitpoint(
258 &self,
259 handle: &Handle,
260 waitpoint_key: &str,
261 expires_in: Duration,
262 ) -> Result<PendingWaitpoint, EngineError>;
263
264 /// Read the HMAC token stored on a waitpoint record, keyed by
265 /// `(partition, waitpoint_id)`.
266 ///
267 /// Returns `Ok(Some(token))` when the waitpoint exists and carries
268 /// a token, `Ok(None)` when the waitpoint does not exist or no
269 /// token field is present. A missing waitpoint is not an error —
270 /// signals can legitimately arrive after a waitpoint has been
271 /// consumed or expired, and the signal-bridge authenticates on the
272 /// presence of a matching token, not on the waitpoint's liveness.
273 ///
274 /// # Use case
275 ///
276 /// Control-plane signal delivery (cairn signal-bridge): at
277 /// signal-resume time the bridge reads the token off the
278 /// waitpoint hash / row to authenticate the resume request it
279 /// subsequently issues. Previously implemented as direct
280 /// `ferriskey::Client::hget(waitpoint_key, "waitpoint_token")` —
281 /// Valkey-only. This method routes the read through the trait so
282 /// the pattern works on Postgres + SQLite as well.
283 ///
284 /// # Per-backend shape
285 ///
286 /// * **Valkey** — `HGET ff:wp:<tag>:<waitpoint_id> waitpoint_token`
287 /// on the waitpoint's partition. Empty string / missing field
288 /// maps to `None`.
289 /// * **Postgres** — `SELECT token FROM ff_waitpoint_pending
290 /// WHERE partition_key = $1 AND waitpoint_id = $2 LIMIT 1`.
291 /// Row-absent → `None`; empty `token` → `None`.
292 /// * **SQLite** — same shape as Postgres.
293 ///
294 /// The `partition` argument is the opaque [`PartitionKey`]
295 /// produced by FlowFabric (typically extracted from the
296 /// `Handle` / `ResumeToken` the waitpoint was minted against).
297 ///
298 /// # Default impl
299 ///
300 /// Returns [`EngineError::Unavailable`] with
301 /// `op = "read_waitpoint_token"` so out-of-tree backends and
302 /// in-tree backends not yet overriding this method continue to
303 /// compile. Mirrors the precedent used by
304 /// [`Self::issue_reclaim_grant`] / [`Self::reclaim_execution`].
305 #[cfg(feature = "core")]
306 async fn read_waitpoint_token(
307 &self,
308 _partition: PartitionKey,
309 _waitpoint_id: &WaitpointId,
310 ) -> Result<Option<String>, EngineError> {
311 Err(EngineError::Unavailable {
312 op: "read_waitpoint_token",
313 })
314 }
315
316 /// Non-mutating observation of signals that satisfied the handle's
317 /// resume condition.
318 async fn observe_signals(&self, handle: &Handle) -> Result<Vec<ResumeSignal>, EngineError>;
319
320 /// Consume a resume grant (via [`ResumeToken`]) to mint a
321 /// resumed-kind handle. Routes to `ff_claim_resumed_execution` on
322 /// Valkey / the epoch-bump reconciler on PG/SQLite. Returns
323 /// `Ok(None)` when the grant's target execution is no longer
324 /// resumable (already reclaimed, terminal, etc.).
325 ///
326 /// **Renamed from `claim_from_reclaim` (RFC-024 PR-B+C).** The
327 /// pre-rename name advertised "reclaim" but the semantic has
328 /// always been resume-after-suspend. The new lease-reclaim path
329 /// lives on [`Self::reclaim_execution`].
330 async fn claim_from_resume_grant(
331 &self,
332 token: ResumeToken,
333 ) -> Result<Option<Handle>, EngineError>;
334
335 /// Issue a lease-reclaim grant (RFC-024 §3.2). Admits executions
336 /// in `lease_expired_reclaimable` or `lease_revoked` state to the
337 /// reclaim path; the returned [`IssueReclaimGrantOutcome::Granted`]
338 /// carries a [`crate::contracts::ReclaimGrant`] which is then fed
339 /// to [`Self::reclaim_execution`] to mint a fresh attempt.
340 ///
341 /// Default impl returns [`EngineError::Unavailable`] — PR-D (PG),
342 /// PR-E (SQLite), and PR-F (Valkey) override with real bodies.
343 async fn issue_reclaim_grant(
344 &self,
345 _args: IssueReclaimGrantArgs,
346 ) -> Result<IssueReclaimGrantOutcome, EngineError> {
347 Err(EngineError::Unavailable {
348 op: "issue_reclaim_grant",
349 })
350 }
351
352 /// Consume a [`crate::contracts::ReclaimGrant`] to mint a fresh
353 /// attempt for a previously lease-expired / lease-revoked
354 /// execution (RFC-024 §3.2). Creates a new attempt row, bumps the
355 /// execution's `lease_reclaim_count`, and mints a
356 /// [`crate::backend::HandleKind::Reclaimed`] handle.
357 ///
358 /// Default impl returns [`EngineError::Unavailable`] — PR-D (PG),
359 /// PR-E (SQLite), and PR-F (Valkey) override with real bodies.
360 async fn reclaim_execution(
361 &self,
362 _args: ReclaimExecutionArgs,
363 ) -> Result<ReclaimExecutionOutcome, EngineError> {
364 Err(EngineError::Unavailable {
365 op: "reclaim_execution",
366 })
367 }
368
369 // Round-5 amendment: lease-releasing peers of `suspend`.
370
371 /// Park the execution until `delay_until`, releasing the lease.
372 async fn delay(&self, handle: &Handle, delay_until: TimestampMs) -> Result<(), EngineError>;
373
374 /// Mark the execution as waiting for its child flow to complete,
375 /// releasing the lease.
376 async fn wait_children(&self, handle: &Handle) -> Result<(), EngineError>;
377
378 // ── Read / admin ──
379
380 /// Snapshot an execution by id. `Ok(None)` ⇒ no such execution.
381 async fn describe_execution(
382 &self,
383 id: &ExecutionId,
384 ) -> Result<Option<ExecutionSnapshot>, EngineError>;
385
386 /// Point-read of the execution-scoped `(input_payload,
387 /// execution_kind, tags)` bundle used by the SDK worker when
388 /// assembling a `ClaimedTask` (see `ff_sdk::ClaimedTask`) after a
389 /// successful claim.
390 ///
391 /// No default impl — every `EngineBackend` must answer this
392 /// explicitly. Distinct from [`Self::describe_execution`]
393 /// (read-model projection) because the SDK needs the raw payload
394 /// bytes + kind + tags immediately post-claim, and the snapshot
395 /// projection deliberately omits the payload bytes.
396 ///
397 /// Per-backend shape:
398 ///
399 /// * **Valkey** — pipelined `GET :payload` + `HGETALL :core`
400 /// + `HGETALL :tags` on the execution's partition (same pattern
401 /// as [`Self::describe_execution`]).
402 /// * **Postgres** — single `SELECT payload, raw_fields` on
403 /// `ff_exec_core` keyed by `(partition_key, execution_id)`;
404 /// `execution_kind` + `tags` live in `raw_fields` JSONB.
405 /// * **SQLite** — identical shape to Postgres.
406 ///
407 /// Returns [`EngineError::Validation { kind: ValidationKind::InvalidInput, .. }`](crate::engine_error::EngineError::Validation)
408 /// when the execution does not exist — the SDK worker only calls
409 /// this after a successful claim, so a missing row is a loud
410 /// storage-tier invariant violation rather than a routine `Ok(None)`.
411 async fn read_execution_context(
412 &self,
413 execution_id: &ExecutionId,
414 ) -> Result<ExecutionContext, EngineError>;
415
416 /// Point-read of the execution's current attempt-index **pointer**
417 /// — the index of the currently-leased attempt row.
418 ///
419 /// Distinct from [`Self::read_total_attempt_count`]: this method
420 /// names the attempt that *already exists* (pointer), whereas
421 /// `read_total_attempt_count` is the monotonic claim counter used
422 /// to compute the next fresh attempt index. See the sibling's
423 /// rustdoc for the retry-path scenario that motivates the split.
424 ///
425 /// Used on the SDK worker's `claim_from_resume_grant` path —
426 /// specifically the private `claim_resumed_execution` helper —
427 /// immediately before dispatching [`Self::claim_resumed_execution`].
428 /// The returned index is fed into
429 /// [`ClaimResumedExecutionArgs::current_attempt_index`](crate::contracts::ClaimResumedExecutionArgs)
430 /// so the backend's script / transaction targets the correct
431 /// existing attempt row (KEYS[6] on Valkey; `ff_attempt` PK tuple
432 /// on PG/SQLite).
433 ///
434 /// Per-backend shape:
435 ///
436 /// * **Valkey** — `HGET {exec}:core current_attempt_index` on the
437 /// execution's partition. Single command. Both the
438 /// **missing-field** case (`exec_core` present but
439 /// `current_attempt_index` absent or empty-string, i.e. pre-claim
440 /// state) **and** the **missing-row** case (no `exec_core` hash
441 /// at all) read back as `AttemptIndex(0)`. This preserves the
442 /// pre-PR-3 inline-`HGET` semantic and is safe because Valkey's
443 /// happy path requires `exec_core` to exist before this method
444 /// is reached — the SDK only calls `read_current_attempt_index`
445 /// post-grant, and grant issuance is gated on `exec_core`
446 /// presence. A genuinely absent row would surface as the proper
447 /// business-logic error (`NotAResumedExecution` /
448 /// `ExecutionNotLeaseable`) on the downstream FCALL.
449 /// * **Postgres** — `SELECT attempt_index FROM ff_exec_core
450 /// WHERE partition_key = $1 AND execution_id = $2`. The column
451 /// is `NOT NULL DEFAULT 0` so a pre-claim row reads back as `0`
452 /// (matching Valkey's missing-field case). **Missing row**
453 /// surfaces as [`EngineError::Validation { kind:
454 /// ValidationKind::InvalidInput, .. }`](crate::engine_error::EngineError::Validation)
455 /// — diverges from Valkey's missing-row `→ 0` mapping.
456 /// * **SQLite** — `SELECT attempt_index FROM ff_exec_core
457 /// WHERE partition_key = ? AND execution_id = ?`; identical
458 /// semantics to Postgres (missing-row → `InvalidInput`).
459 ///
460 /// **Cross-backend asymmetry on missing row is intentional.** The
461 /// SDK happy path never observes it (grant issuance on Valkey
462 /// requires `exec_core`, and PG/SQLite currently return
463 /// `Unavailable` from `claim_from_grant` per
464 /// `project_claim_from_grant_pg_sqlite_gap.md`). Consumers writing
465 /// backend-agnostic tooling against this method directly must
466 /// treat the missing-row case as backend-dependent — match on
467 /// `InvalidInput` for PG/SQLite, and treat an unexpected `0` as
468 /// the Valkey equivalent signal.
469 ///
470 /// The default impl returns [`EngineError::Unavailable`] so the
471 /// trait addition is non-breaking for out-of-tree backends (same
472 /// precedent as [`Self::read_execution_context`] landing in v0.12
473 /// PR-1).
474 async fn read_current_attempt_index(
475 &self,
476 _execution_id: &ExecutionId,
477 ) -> Result<AttemptIndex, EngineError> {
478 Err(EngineError::Unavailable {
479 op: "read_current_attempt_index",
480 })
481 }
482
483 /// Point-read of the execution's **total attempt counter** — the
484 /// monotonic count of claims that have ever fired against this
485 /// execution (including the in-flight one once claimed).
486 ///
487 /// Used on the SDK worker's `claim_from_grant` / `claim_execution`
488 /// path — the next attempt-index for a fresh claim is this
489 /// counter's current value (so `1` on the second retry after the
490 /// first attempt failed terminally). This is semantically distinct
491 /// from [`Self::read_current_attempt_index`], which is a *pointer*
492 /// at the currently-leased attempt row and is only meaningful on
493 /// the `claim_from_resume_grant` path (where a live attempt already
494 /// exists and we want to re-seat its lease rather than mint a new
495 /// attempt row).
496 ///
497 /// Reading the pointer on the `claim_from_grant` path was a live
498 /// bug: on the retry-of-a-retry scenario the pointer still named
499 /// the *previous* terminal-failed attempt, so the newly-minted
500 /// attempt collided with it (Valkey KEYS[6]) or mis-targeted the
501 /// PG/SQLite `ff_attempt` PK tuple. This method fixes that by
502 /// reading the counter that Lua 5920 / PG `ff_claim_execution` /
503 /// SQLite `claim_impl` all already consult when computing the
504 /// next attempt index.
505 ///
506 /// Per-backend shape:
507 ///
508 /// * **Valkey** — `HGET {exec}:core total_attempt_count` on the
509 /// execution's partition. Single command; pre-claim read (field
510 /// absent or empty) maps to `0`.
511 /// * **Postgres** — `SELECT raw_fields->>'total_attempt_count'
512 /// FROM ff_exec_core WHERE (partition_key, execution_id) = ...`.
513 /// The field lives in the JSONB `raw_fields` bag rather than a
514 /// dedicated column (mirrors how `create_execution_impl` seeds
515 /// it on row creation). Missing row → `InvalidInput`; missing
516 /// field → `0`.
517 /// * **SQLite** — `SELECT CAST(json_extract(raw_fields,
518 /// '$.total_attempt_count') AS INTEGER) FROM ff_exec_core
519 /// WHERE ...`. Same JSON-in-`raw_fields` shape as PG; uses the
520 /// same `json_extract` idiom already employed in
521 /// `ff-backend-sqlite/src/queries/operator.rs` for replay_count.
522 ///
523 /// The default impl returns [`EngineError::Unavailable`] so the
524 /// trait addition is non-breaking for out-of-tree backends (same
525 /// precedent as [`Self::read_current_attempt_index`] landing in
526 /// v0.12 PR-3).
527 async fn read_total_attempt_count(
528 &self,
529 _execution_id: &ExecutionId,
530 ) -> Result<AttemptIndex, EngineError> {
531 Err(EngineError::Unavailable {
532 op: "read_total_attempt_count",
533 })
534 }
535
536 /// Snapshot a flow by id. `Ok(None)` ⇒ no such flow.
537 async fn describe_flow(&self, id: &FlowId) -> Result<Option<FlowSnapshot>, EngineError>;
538
539 // ── Namespaced tag point-writes / reads (issue #433) ──
540
541 /// Set a single namespaced tag on an execution. Tag `key` MUST match
542 /// the reserved caller-namespace pattern `^[a-z][a-z0-9_]*\.[a-z0-9_][a-z0-9_.]*$` —
543 /// i.e. `<caller>.<field>` — or the call returns
544 /// [`EngineError::Validation { kind: ValidationKind::InvalidInput, .. }`](crate::engine_error::EngineError::Validation)
545 /// with the offending key in `detail`. `value` is arbitrary UTF-8.
546 ///
547 /// The namespace prefix is carried inline in `key` (e.g.
548 /// `"cairn.session_id"`) — there is no separate `namespace` arg.
549 /// This matches the existing `ff_set_execution_tags` wire shape and
550 /// the flow-tag projection in [`ExecutionSnapshot::tags`].
551 ///
552 /// Validation is performed by each overriding backend impl via
553 /// [`validate_tag_key`] **before** the wire hop so PG / SQLite /
554 /// Valkey reject the same set of keys. The default trait impl
555 /// returns [`EngineError::Unavailable`] without running validation
556 /// — there is no meaningful storage to validate against on an
557 /// unsupported backend, and surfacing `Unavailable` before
558 /// `Validation` matches the precedence used elsewhere on the trait.
559 /// Backends MAY additionally validate on the storage tier (Valkey's
560 /// Lua path does, with a more permissive prefix-only check).
561 ///
562 /// Per-backend shape:
563 ///
564 /// * **Valkey** — `ff_set_execution_tags` FCALL with a single
565 /// `{key → value}` pair. Routes through the existing Lua
566 /// contract (no new wire format).
567 /// * **Postgres** — `UPDATE ff_exec_core SET raw_fields = jsonb_set(
568 /// coalesce(raw_fields, '{}'::jsonb), '{tags,<key>}', to_jsonb($value))
569 /// WHERE (partition_key, execution_id) = ...`. Same storage shape
570 /// read by [`Self::describe_execution`] / [`Self::read_execution_context`].
571 /// * **SQLite** — `UPDATE ff_exec_core SET raw_fields = json_set(
572 /// coalesce(raw_fields, '{}'), '$.tags."<key>"', $value) WHERE ...`.
573 /// The key is quoted in the JSON path so dots inside the
574 /// namespaced key (e.g. `cairn.session_id`) are treated as a
575 /// single literal member name rather than JSON-path separators —
576 /// yielding the same flat `raw_fields.tags` shape as PG.
577 ///
578 /// Missing execution surfaces as
579 /// [`EngineError::NotFound { entity: "execution" }`](crate::engine_error::EngineError::NotFound)
580 /// — matches the Valkey FCALL's `execution_not_found` mapping and
581 /// the existing `ScriptError::ExecutionNotFound` → `EngineError`
582 /// conversion (`ff_script::engine_error_ext`).
583 ///
584 /// The default impl returns [`EngineError::Unavailable`] so the
585 /// trait addition is non-breaking for out-of-tree backends.
586 async fn set_execution_tag(
587 &self,
588 _execution_id: &ExecutionId,
589 _key: &str,
590 _value: &str,
591 ) -> Result<(), EngineError> {
592 Err(EngineError::Unavailable {
593 op: "set_execution_tag",
594 })
595 }
596
597 /// Set a single namespaced tag on a flow. Same namespace rule as
598 /// [`Self::set_execution_tag`]: `key` MUST match
599 /// `^[a-z][a-z0-9_]*\.[a-z0-9_][a-z0-9_.]*$`.
600 ///
601 /// Per-backend shape:
602 ///
603 /// * **Valkey** — `ff_set_flow_tags` FCALL with a single pair.
604 /// Tags land on the dedicated `ff:flow:{fp:N}:<flow_id>:tags`
605 /// hash, not on the `flow_core` hash (diverges from the
606 /// execution shape — execution tags live on `ff:exec:...:tags`
607 /// by the same split). **Lazy migration on first write**: the
608 /// Lua (`ff_script::flowfabric.lua`, `ff_set_flow_tags`) scans
609 /// `flow_core` once per flow for pre-58.4 inline namespaced
610 /// fields (anything matching `^[a-z][a-z0-9_]*\.`), HSETs them
611 /// onto `:tags`, HDELs them from `flow_core`, and stamps
612 /// `tags_migrated=1` on `flow_core` so subsequent calls
613 /// short-circuit to O(1). This heals flows created before
614 /// RFC-058.4 landed; well-formed flows pay the migration cost
615 /// only on their very first tag write. Callers MUST read tags
616 /// via [`Self::get_flow_tag`] (`HGET :tags <key>`) — direct
617 /// `HGETALL` against `flow_core` will not see post-migration
618 /// values.
619 ///
620 /// **Cross-backend parity caveat on `describe_flow`**: the
621 /// pre-existing `ValkeyBackend::describe_flow` /
622 /// `FlowSnapshot::tags` read path snapshots `flow_core` fields
623 /// only and does NOT today merge the `:tags` sub-hash, whereas
624 /// Postgres `describe_flow` DOES surface flow tags via
625 /// `ff_backend_postgres::flow::extract_tags` (which reads them
626 /// off `raw_fields` — the same store `set_flow_tag` writes on
627 /// PG). Trait consumers MUST NOT assume a tag written here
628 /// will be visible via `describe_flow` on every backend: on
629 /// Valkey, callers that need the full tag set should
630 /// complement the snapshot with per-key [`Self::get_flow_tag`]
631 /// reads. Extending Valkey `describe_flow` to merge `:tags`
632 /// is additive and out of scope for this trait addition.
633 /// * **Postgres** — `UPDATE ff_flow_core SET raw_fields =
634 /// jsonb_set(..., '{<key>}', ...)` — flow tags are stored as
635 /// top-level `raw_fields` keys (matches
636 /// `ff_backend_postgres::flow::extract_tags`). No `tags` nesting
637 /// on flows, which diverges from the execution shape.
638 /// * **SQLite** — mirrors PG: `UPDATE ff_flow_core SET raw_fields =
639 /// json_set(..., '$."<key>"', $value) WHERE ...`. The key is
640 /// quoted so the dotted namespaced key lands as a single flat
641 /// top-level member of `raw_fields`.
642 ///
643 /// Missing flow surfaces as
644 /// [`EngineError::NotFound { entity: "flow" }`](crate::engine_error::EngineError::NotFound)
645 /// (matches the Valkey FCALL's `flow_not_found` mapping).
646 ///
647 /// The default impl returns [`EngineError::Unavailable`].
648 async fn set_flow_tag(
649 &self,
650 _flow_id: &FlowId,
651 _key: &str,
652 _value: &str,
653 ) -> Result<(), EngineError> {
654 Err(EngineError::Unavailable {
655 op: "set_flow_tag",
656 })
657 }
658
659 /// Read a single namespaced execution tag. Returns `Ok(None)` when
660 /// the tag is absent **or** the execution row does not exist —
661 /// the two cases are not distinguished on the read path. Callers
662 /// that need to distinguish should call [`Self::describe_execution`]
663 /// first (an `Ok(None)` from that method proves the execution is
664 /// absent). This matches Valkey's native `HGET` semantics and
665 /// keeps the read path at a single round-trip on every backend.
666 ///
667 /// `key` must pass [`validate_tag_key`] — a malformed key can
668 /// never be present in storage so the call short-circuits with
669 /// [`EngineError::Validation { kind: ValidationKind::InvalidInput, .. }`](crate::engine_error::EngineError::Validation)
670 /// rather than round-tripping.
671 ///
672 /// Per-backend shape:
673 ///
674 /// * **Valkey** — `HGET :tags <key>` on the execution's partition.
675 /// * **Postgres** — `SELECT raw_fields->'tags'->><key> FROM ff_exec_core
676 /// WHERE ...` with `fetch_optional` → missing row collapses to `None`.
677 /// * **SQLite** — `SELECT json_extract(raw_fields, '$.tags."<key>"')
678 /// FROM ff_exec_core WHERE ...` with the same collapse. The key is
679 /// quoted in the JSON path so dotted namespaced keys resolve to
680 /// the flat literal member written by `set_execution_tag`.
681 ///
682 /// The default impl returns [`EngineError::Unavailable`].
683 async fn get_execution_tag(
684 &self,
685 _execution_id: &ExecutionId,
686 _key: &str,
687 ) -> Result<Option<String>, EngineError> {
688 Err(EngineError::Unavailable {
689 op: "get_execution_tag",
690 })
691 }
692
693 /// Read an execution's `namespace` scalar. Returns `Ok(None)` when
694 /// the row is absent or the field is unset. Dedicated point-read
695 /// used by the scanner per-candidate filter (`should_skip_candidate`)
696 /// to preserve the 1-HGET cost contract documented in
697 /// `ff_engine::scanner::should_skip_candidate` — `describe_execution`
698 /// is heavier (HGETALL / full snapshot) and unnecessary when only
699 /// the namespace scalar is needed.
700 ///
701 /// Per-backend shape:
702 ///
703 /// * **Valkey** — `HGET :core namespace` on the execution's partition
704 /// (single field read on the already-hot exec_core hash).
705 /// * **Postgres** — `SELECT raw_fields->>'namespace' FROM ff_exec_core
706 /// WHERE partition_key = $1 AND execution_id = $2`.
707 /// * **SQLite** — `SELECT json_extract(raw_fields, '$.namespace')
708 /// FROM ff_exec_core WHERE ...`.
709 ///
710 /// The default impl returns [`EngineError::Unavailable`].
711 async fn get_execution_namespace(
712 &self,
713 _execution_id: &ExecutionId,
714 ) -> Result<Option<String>, EngineError> {
715 Err(EngineError::Unavailable {
716 op: "get_execution_namespace",
717 })
718 }
719
720 /// Read a single namespaced flow tag. Returns `Ok(None)` when
721 /// the tag is absent **or** the flow row does not exist (same
722 /// collapse semantics as [`Self::get_execution_tag`]). Symmetry
723 /// partner — consumers like cairn read `cairn.session_id` off
724 /// flows for archival.
725 ///
726 /// `key` must pass [`validate_tag_key`].
727 ///
728 /// Per-backend shape:
729 ///
730 /// * **Valkey** — `HGET :tags <key>` on the flow's partition.
731 /// * **Postgres** — `SELECT raw_fields->><key> FROM ff_flow_core
732 /// WHERE ...` (top-level `raw_fields` key, matches the flow-tag
733 /// storage shape).
734 /// * **SQLite** — `SELECT json_extract(raw_fields, '$."<key>"')
735 /// FROM ff_flow_core WHERE ...` (quoted key — see
736 /// `set_flow_tag`).
737 ///
738 /// The default impl returns [`EngineError::Unavailable`].
739 async fn get_flow_tag(
740 &self,
741 _flow_id: &FlowId,
742 _key: &str,
743 ) -> Result<Option<String>, EngineError> {
744 Err(EngineError::Unavailable {
745 op: "get_flow_tag",
746 })
747 }
748
749 /// List dependency edges adjacent to an execution. Read-only; the
750 /// backend resolves the subject execution's flow, reads the
751 /// direction-specific adjacency SET, and decodes each member's
752 /// flow-scoped `edge:<edge_id>` hash.
753 ///
754 /// Returns an empty `Vec` when the subject has no edges on the
755 /// requested side — including standalone executions (no owning
756 /// flow). Ordering is unspecified: the underlying adjacency SET
757 /// is an unordered SMEMBERS read. Callers that need deterministic
758 /// order should sort by [`EdgeSnapshot::edge_id`] /
759 /// [`EdgeSnapshot::created_at`] themselves.
760 ///
761 /// Parse failures on the edge hash surface as
762 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]
763 /// — unknown fields, missing required fields, endpoint mismatches
764 /// against the adjacency SET all fail loud rather than silently
765 /// returning partial results.
766 ///
767 /// Gated on the `core` feature — edge reads are part of the
768 /// minimal engine surface a Postgres-style backend must honour.
769 ///
770 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
771 #[cfg(feature = "core")]
772 async fn list_edges(
773 &self,
774 _flow_id: &FlowId,
775 _direction: EdgeDirection,
776 ) -> Result<Vec<EdgeSnapshot>, EngineError> {
777 Err(EngineError::Unavailable { op: "list_edges" })
778 }
779
780 /// Snapshot a single dependency edge by its owning flow + edge id.
781 ///
782 /// `Ok(None)` when the edge hash is absent (never staged, or
783 /// staged under a different flow than `flow_id`). Parse failures
784 /// on a present edge hash surface as
785 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]
786 /// — the stored `flow_id` field is cross-checked against the
787 /// caller's expected `flow_id` so a wrong-key read fails loud
788 /// rather than returning an unrelated edge.
789 ///
790 /// Gated on the `core` feature — single-edge reads are part of
791 /// the minimal snapshot surface an alternate backend must honour
792 /// alongside [`Self::describe_execution`] / [`Self::describe_flow`]
793 /// / [`Self::list_edges`].
794 ///
795 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
796 #[cfg(feature = "core")]
797 async fn describe_edge(
798 &self,
799 _flow_id: &FlowId,
800 _edge_id: &EdgeId,
801 ) -> Result<Option<EdgeSnapshot>, EngineError> {
802 Err(EngineError::Unavailable {
803 op: "describe_edge",
804 })
805 }
806
807 /// Resolve an execution's owning flow id, if any.
808 ///
809 /// `Ok(None)` when the execution's core record is absent or has
810 /// no associated flow (standalone execution). A present-but-
811 /// malformed `flow_id` field surfaces as
812 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`].
813 ///
814 /// Gated on the `core` feature. Used by ff-sdk's
815 /// `list_outgoing_edges` / `list_incoming_edges` to pivot from a
816 /// consumer-supplied `ExecutionId` to the `FlowId` required by
817 /// [`Self::list_edges`]. A Valkey backend serves this with a
818 /// single `HGET exec_core flow_id`; a Postgres backend serves it
819 /// with the equivalent single-column row lookup.
820 ///
821 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
822 #[cfg(feature = "core")]
823 async fn resolve_execution_flow_id(
824 &self,
825 _eid: &ExecutionId,
826 ) -> Result<Option<FlowId>, EngineError> {
827 Err(EngineError::Unavailable {
828 op: "resolve_execution_flow_id",
829 })
830 }
831
832 /// List flows on a partition with cursor-based pagination (issue
833 /// #185).
834 ///
835 /// Returns a [`ListFlowsPage`] of [`FlowSummary`](crate::contracts::FlowSummary)
836 /// rows ordered by `flow_id` (UUID byte-lexicographic). `cursor`
837 /// is `None` for the first page; callers forward the returned
838 /// `next_cursor` verbatim to continue iteration, and the listing
839 /// is exhausted when `next_cursor` is `None`. `limit` is the
840 /// maximum number of rows to return on this page — implementations
841 /// MAY return fewer (end of partition) but MUST NOT exceed it.
842 ///
843 /// Ordering rationale: flow ids are UUIDs, and both Valkey
844 /// (sort after-the-fact) and Postgres (`ORDER BY flow_id`) can
845 /// agree on byte-lexicographic order — the same order
846 /// `FlowId::to_string()` produces for canonical hyphenated UUIDs.
847 /// Mapping to `cursor > flow_id` keeps the contract backend-
848 /// independent.
849 ///
850 /// # Postgres implementation pattern
851 ///
852 /// A Postgres-backed implementation serves this directly with
853 ///
854 /// ```sql
855 /// SELECT flow_id, created_at_ms, public_flow_state
856 /// FROM ff_flow
857 /// WHERE partition_key = $1
858 /// AND ($2::uuid IS NULL OR flow_id > $2)
859 /// ORDER BY flow_id
860 /// LIMIT $3 + 1;
861 /// ```
862 ///
863 /// — reading one extra row to decide whether `next_cursor` should
864 /// be set to the last row's `flow_id`. The Valkey implementation
865 /// maintains the `ff:idx:{fp:N}:flow_index` SET and performs the
866 /// sort + slice client-side (SMEMBERS then sort-by-UUID-bytes),
867 /// pipelining `HGETALL flow_core` for each row on the page.
868 ///
869 /// Gated on the `core` feature — flow listing is part of the
870 /// minimal engine surface a Postgres-style backend must honour.
871 #[cfg(feature = "core")]
872 async fn list_flows(
873 &self,
874 _partition: PartitionKey,
875 _cursor: Option<FlowId>,
876 _limit: usize,
877 ) -> Result<ListFlowsPage, EngineError> {
878 Err(EngineError::Unavailable { op: "list_flows" })
879 }
880
881 /// Enumerate registered lanes with cursor-based pagination.
882 ///
883 /// Lanes are global (not partition-scoped) — the backend serves
884 /// this from its lane registry and does NOT accept a
885 /// [`crate::partition::Partition`] argument. Results are sorted
886 /// by [`LaneId`] name so the ordering is stable across calls and
887 /// cursors address a deterministic position in the sort.
888 ///
889 /// * `cursor` — exclusive lower bound. `None` starts from the
890 /// first lane. To continue a walk, pass the previous page's
891 /// [`ListLanesPage::next_cursor`].
892 /// * `limit` — hard cap on the number of lanes returned in the
893 /// page. Backends MAY round this down when the registry size
894 /// is smaller; they MUST NOT return more than `limit`.
895 ///
896 /// [`ListLanesPage::next_cursor`] is `Some(last_lane_in_page)`
897 /// iff at least one more lane exists after the returned page,
898 /// and `None` on the final page. Callers loop until `next_cursor`
899 /// is `None` to read the full registry.
900 ///
901 /// Gated on the `core` feature — lane enumeration is part of the
902 /// minimal snapshot surface an alternate backend must honour
903 /// alongside [`Self::describe_flow`] / [`Self::list_edges`].
904 #[cfg(feature = "core")]
905 async fn list_lanes(
906 &self,
907 _cursor: Option<LaneId>,
908 _limit: usize,
909 ) -> Result<ListLanesPage, EngineError> {
910 Err(EngineError::Unavailable { op: "list_lanes" })
911 }
912
913 /// List suspended executions in one partition, cursor-paginated,
914 /// with each entry's suspension `reason_code` populated (issue
915 /// #183).
916 ///
917 /// Consumer-facing "what's blocked on what?" panels (ff-board's
918 /// suspended-executions view, operator CLIs) need the reason in
919 /// the list response so the UI does not round-trip per row to
920 /// `describe_execution` for a field it knows it needs. `reason`
921 /// on [`SuspendedExecutionEntry`] carries the free-form
922 /// `suspension:current.reason_code` field — see the type rustdoc
923 /// for the String-not-enum rationale.
924 ///
925 /// `cursor` is opaque to callers; pass `None` to start a fresh
926 /// scan and feed the returned [`ListSuspendedPage::next_cursor`]
927 /// back in on subsequent pages until it comes back `None`.
928 /// `limit` bounds the `entries` count; backends MAY return fewer
929 /// when the partition is exhausted.
930 ///
931 /// Ordering is by ascending `suspended_at_ms` (the per-lane
932 /// suspended ZSET score == `timeout_at` or the no-timeout
933 /// sentinel) with execution id as a lex tiebreak, so cursor
934 /// continuation is deterministic across calls.
935 ///
936 /// Gated on the `core` feature — suspended-list enumeration is
937 /// part of the minimal engine surface a Postgres-style backend
938 /// must honour.
939 #[cfg(feature = "core")]
940 async fn list_suspended(
941 &self,
942 _partition: PartitionKey,
943 _cursor: Option<ExecutionId>,
944 _limit: usize,
945 ) -> Result<ListSuspendedPage, EngineError> {
946 Err(EngineError::Unavailable {
947 op: "list_suspended",
948 })
949 }
950
951 /// Forward-only paginated listing of the executions indexed under
952 /// one partition.
953 ///
954 /// Reads the partition-wide `ff:idx:{p:N}:all_executions` set,
955 /// sorts lexicographically on `ExecutionId`, and returns the page
956 /// of ids strictly greater than `cursor` (or starting from the
957 /// smallest id when `cursor = None`). The returned
958 /// [`ListExecutionsPage::next_cursor`] is the last id on the page
959 /// iff at least one more id exists past it; `None` signals
960 /// end-of-stream.
961 ///
962 /// `limit` is the maximum number of ids returned on this page. A
963 /// `limit` of `0` returns an empty page with `next_cursor = None`.
964 /// Backends MAY cap `limit` internally (Valkey: 1000) and return
965 /// fewer ids than requested; callers continue paginating until
966 /// `next_cursor == None`.
967 ///
968 /// Ordering is stable under concurrent inserts for already-emitted
969 /// ids (an id less-than-or-equal-to the caller's cursor is never
970 /// re-emitted in later pages) but new inserts past the cursor WILL
971 /// appear in subsequent pages — consistent with forward-only
972 /// cursor semantics.
973 ///
974 /// Gated on the `core` feature — partition-scoped listing is part
975 /// of the minimal engine surface every backend must honour.
976 #[cfg(feature = "core")]
977 async fn list_executions(
978 &self,
979 _partition: PartitionKey,
980 _cursor: Option<ExecutionId>,
981 _limit: usize,
982 ) -> Result<ListExecutionsPage, EngineError> {
983 Err(EngineError::Unavailable {
984 op: "list_executions",
985 })
986 }
987
988 // ── Trigger ops (issue #150) ──
989
990 /// Deliver an external signal to a suspended execution's waitpoint.
991 ///
992 /// The backend atomically records the signal, evaluates the resume
993 /// condition, and — when satisfied — transitions the execution
994 /// from `suspended` to `runnable` (or buffers the signal when the
995 /// waitpoint is still `pending`). Duplicate delivery — same
996 /// `idempotency_key` + waitpoint — surfaces as
997 /// [`DeliverSignalResult::Duplicate`] with the pre-existing
998 /// `signal_id` rather than mutating state twice.
999 ///
1000 /// Input validation (HMAC token presence, payload size limits,
1001 /// signal-name shape) is the backend's responsibility; callers
1002 /// pass a fully populated [`DeliverSignalArgs`] and receive typed
1003 /// outcomes or typed errors (`ScriptError::invalid_token`,
1004 /// `ScriptError::token_expired`, `ScriptError::ExecutionNotFound`
1005 /// surfaced via [`EngineError::Transport`] on the Valkey backend).
1006 ///
1007 /// Gated on the `core` feature — signal delivery is part of the
1008 /// minimal trigger surface every backend must honour so ff-server
1009 /// / REST handlers can dispatch against `Arc<dyn EngineBackend>`
1010 /// without knowing which backend is running underneath.
1011 #[cfg(feature = "core")]
1012 async fn deliver_signal(
1013 &self,
1014 _args: DeliverSignalArgs,
1015 ) -> Result<DeliverSignalResult, EngineError> {
1016 Err(EngineError::Unavailable {
1017 op: "deliver_signal",
1018 })
1019 }
1020
1021 /// Claim a resumed execution — a previously-suspended attempt that
1022 /// has cleared its resume condition (e.g. via
1023 /// [`Self::deliver_signal`]) and now needs a worker to pick up the
1024 /// same attempt index.
1025 ///
1026 /// Distinct from [`Self::claim`] (fresh work) and
1027 /// [`Self::claim_from_resume_grant`] (grant-based ownership transfer
1028 /// after a crash): the resumed-claim path re-binds an existing
1029 /// attempt rather than minting a new one. The backend issues a
1030 /// fresh `lease_id` + bumps the `lease_epoch`, preserving
1031 /// `attempt_id` / `attempt_index` so stream frames and progress
1032 /// updates continue on the same attempt.
1033 ///
1034 /// Typed failures surface via `ScriptError` → `EngineError`:
1035 /// `NotAResumedExecution` when the attempt state is not
1036 /// `attempt_interrupted`, `ExecutionNotLeaseable` when the
1037 /// lifecycle phase is not `runnable`, and `InvalidClaimGrant`
1038 /// when the grant key is missing or was already consumed.
1039 ///
1040 /// Gated on the `core` feature — resumed-claim is part of the
1041 /// minimal trigger surface every backend must honour.
1042 #[cfg(feature = "core")]
1043 async fn claim_resumed_execution(
1044 &self,
1045 _args: ClaimResumedExecutionArgs,
1046 ) -> Result<ClaimResumedExecutionResult, EngineError> {
1047 Err(EngineError::Unavailable {
1048 op: "claim_resumed_execution",
1049 })
1050 }
1051
1052 /// Scan a lane's eligible ZSET on one partition for
1053 /// highest-priority executions awaiting a worker (v0.12 PR-5).
1054 ///
1055 /// Lifted from the SDK-side `ZRANGEBYSCORE` inline on
1056 /// `FlowFabricWorker::claim_next` — the scheduler-bypass scanner
1057 /// gated behind `direct-valkey-claim`. The trait method itself is
1058 /// backend-agnostic; consumers that drive the scanner loop
1059 /// (bench harnesses, single-tenant dev) compose it with
1060 /// [`Self::issue_claim_grant`] + [`Self::claim_execution`] to
1061 /// replicate the pre-PR-5 `claim_next` body.
1062 ///
1063 /// # Backend coverage
1064 ///
1065 /// * **Valkey** — `ZRANGEBYSCORE eligible_zset -inf +inf LIMIT 0 <limit>`
1066 /// on the lane's partition-scoped eligible key. Single
1067 /// command; no script round-trip. Wire shape is byte-for-byte
1068 /// identical to the pre-PR SDK inline call so bench traces
1069 /// match pre-PR without new `#[tracing::instrument]` span names.
1070 /// * **Postgres / SQLite** — use the `Err(Unavailable)` default.
1071 /// PG/SQLite consumers drive work through the scheduler-routed
1072 /// [`Self::claim_for_worker`] path instead of the scanner
1073 /// primitives exposed here; lifting the scheduler itself onto
1074 /// the trait is RFC-024 follow-up scope. See
1075 /// `project_claim_from_grant_pg_sqlite_gap.md` for motivation.
1076 ///
1077 /// Default impl returns [`EngineError::Unavailable`] so the trait
1078 /// addition is non-breaking for out-of-tree backends. Same
1079 /// precedent as [`Self::claim_execution`] landing in v0.12 PR-4.
1080 #[cfg(feature = "core")]
1081 async fn scan_eligible_executions(
1082 &self,
1083 _args: ScanEligibleArgs,
1084 ) -> Result<Vec<ExecutionId>, EngineError> {
1085 Err(EngineError::Unavailable {
1086 op: "scan_eligible_executions",
1087 })
1088 }
1089
1090 /// Issue a claim grant — the scheduler's admission write — for a
1091 /// single execution on a single lane (v0.12 PR-5).
1092 ///
1093 /// Lifted from the SDK-side `ff_issue_claim_grant` inline helper
1094 /// on `FlowFabricWorker::claim_next`. The backend atomically
1095 /// writes the grant hash, appends to the per-worker grant index,
1096 /// and removes the execution from the lane's eligible ZSET.
1097 ///
1098 /// Typed rejects surface via [`EngineError::Validation`]:
1099 /// `CapabilityMismatch` when the worker's capabilities do not
1100 /// cover the execution's `required_capabilities`, `InvalidInput`
1101 /// for malformed args. Transport faults surface via
1102 /// [`EngineError::Transport`].
1103 ///
1104 /// # Backend coverage
1105 ///
1106 /// * **Valkey** — one `ff_issue_claim_grant` FCALL. KEYS/ARGV
1107 /// shape is byte-for-byte identical to the pre-PR SDK inline
1108 /// call; bench traces match pre-PR.
1109 /// * **Postgres / SQLite** — `Err(Unavailable)` default; use
1110 /// [`Self::claim_for_worker`] instead. See
1111 /// [`Self::scan_eligible_executions`] for the cross-link
1112 /// rationale.
1113 #[cfg(feature = "core")]
1114 async fn issue_claim_grant(
1115 &self,
1116 _args: IssueClaimGrantArgs,
1117 ) -> Result<IssueClaimGrantOutcome, EngineError> {
1118 Err(EngineError::Unavailable {
1119 op: "issue_claim_grant",
1120 })
1121 }
1122
1123 /// Move an execution from a lane's eligible ZSET into its
1124 /// blocked_route ZSET (v0.12 PR-5).
1125 ///
1126 /// Lifted from the SDK-side `ff_block_execution_for_admission`
1127 /// inline helper on `FlowFabricWorker::claim_next`. Called after
1128 /// a [`Self::issue_claim_grant`] `CapabilityMismatch` reject —
1129 /// without a block step, the inline scanner would re-pick the
1130 /// same top-of-ZSET every tick (parity with
1131 /// `ff-scheduler::Scheduler::block_candidate`).
1132 ///
1133 /// The engine's unblock scanner periodically promotes
1134 /// blocked_route back to eligible once a worker with matching
1135 /// caps registers.
1136 ///
1137 /// # Backend coverage
1138 ///
1139 /// * **Valkey** — one `ff_block_execution_for_admission` FCALL.
1140 /// * **Postgres / SQLite** — `Err(Unavailable)` default; the
1141 /// scheduler-routed [`Self::claim_for_worker`] path handles
1142 /// admission rejects server-side.
1143 #[cfg(feature = "core")]
1144 async fn block_route(
1145 &self,
1146 _args: BlockRouteArgs,
1147 ) -> Result<BlockRouteOutcome, EngineError> {
1148 Err(EngineError::Unavailable { op: "block_route" })
1149 }
1150
1151 /// Consume a scheduler-issued claim grant to mint a fresh attempt.
1152 ///
1153 /// The SDK's grant-consumer path — paired with `FlowFabricWorker::claim_from_grant`
1154 /// in `ff-sdk` — routes through this method. The scheduler has
1155 /// already validated budget / quota / capabilities and written a
1156 /// grant (Valkey `claim_grant` hash); this call atomically
1157 /// consumes that grant and creates the attempt row, mints
1158 /// `lease_id` + `lease_epoch`, and returns a
1159 /// [`ClaimExecutionResult::Claimed`] carrying the minted lease
1160 /// triple.
1161 ///
1162 /// Distinct from [`Self::claim`] (the scheduler-bypass scanner
1163 /// used by the `direct-valkey-claim` feature) — this method
1164 /// assumes the grant already exists and skips capability / ZSET
1165 /// scanning. The Valkey impl fires exactly one `ff_claim_execution`
1166 /// FCALL.
1167 ///
1168 /// Typed failures surface via `ScriptError` → `EngineError`:
1169 /// `UseClaimResumedExecution` when the attempt is actually
1170 /// `attempt_interrupted` (caller should retry via
1171 /// [`Self::claim_resumed_execution`] — see `ContentionKind` at
1172 /// `ff_core::engine_error`), `InvalidClaimGrant` when the grant is
1173 /// missing / consumed / worker-mismatched, `CapabilityMismatch`
1174 /// when the execution's `required_capabilities` drifted after
1175 /// grant issuance.
1176 ///
1177 /// # Backend coverage
1178 ///
1179 /// * **Valkey** — implemented in `ff-backend-valkey` (one
1180 /// `ff_claim_execution` FCALL).
1181 /// * **Postgres / SQLite** — use the `Err(Unavailable)` default in
1182 /// this PR. Grants on PG / SQLite today flow through
1183 /// `PostgresScheduler::claim_for_worker` (a sibling struct, not
1184 /// an `EngineBackend` method); wiring the default-over-trait
1185 /// behaviour into a PG / SQLite `claim_execution` impl lands
1186 /// with a future RFC-024 grant-consumer extension.
1187 ///
1188 /// The default impl returns [`EngineError::Unavailable`] so the
1189 /// trait addition is non-breaking for out-of-tree backends. Same
1190 /// precedent as [`Self::read_current_attempt_index`] landing in
1191 /// v0.12 PR-3.
1192 #[cfg(feature = "core")]
1193 async fn claim_execution(
1194 &self,
1195 _args: ClaimExecutionArgs,
1196 ) -> Result<ClaimExecutionResult, EngineError> {
1197 Err(EngineError::Unavailable {
1198 op: "claim_execution",
1199 })
1200 }
1201
1202 /// Operator-initiated cancellation of a flow and (optionally) its
1203 /// member executions. See RFC-012 §3.1.1 for the policy /wait
1204 /// matrix.
1205 async fn cancel_flow(
1206 &self,
1207 id: &FlowId,
1208 policy: CancelFlowPolicy,
1209 wait: CancelFlowWait,
1210 ) -> Result<CancelFlowResult, EngineError>;
1211
1212 /// RFC-016 Stage A: set the inbound-edge-group policy for a
1213 /// downstream execution. Must be called before the first
1214 /// `add_dependency(... -> downstream_execution_id)` — the backend
1215 /// rejects with [`EngineError::Conflict`] if edges have already
1216 /// been staged for this group.
1217 ///
1218 /// Stage A honours only
1219 /// [`EdgeDependencyPolicy::AllOf`](crate::contracts::EdgeDependencyPolicy::AllOf);
1220 /// the `AnyOf` / `Quorum` variants return
1221 /// [`EngineError::Validation`] with
1222 /// `detail = "stage A supports AllOf only; AnyOf/Quorum land in stage B"`
1223 /// until Stage B's resolver lands.
1224 #[cfg(feature = "core")]
1225 async fn set_edge_group_policy(
1226 &self,
1227 _flow_id: &FlowId,
1228 _downstream_execution_id: &ExecutionId,
1229 _policy: crate::contracts::EdgeDependencyPolicy,
1230 ) -> Result<crate::contracts::SetEdgeGroupPolicyResult, EngineError> {
1231 Err(EngineError::Unavailable {
1232 op: "set_edge_group_policy",
1233 })
1234 }
1235
1236 // ── HMAC secret rotation (v0.7 migration-master Q4) ──
1237
1238 /// Rotate the waitpoint HMAC signing kid **cluster-wide**.
1239 ///
1240 /// **v0.7 migration-master Q4 (adjudicated 2026-04-24).**
1241 /// Additive trait surface so Valkey and Postgres backends can
1242 /// both expose the "rotate everywhere" semantic under one name.
1243 ///
1244 /// * Valkey impl fans out an `ff_rotate_waitpoint_hmac_secret`
1245 /// FCALL per execution partition. `entries.len() == num_flow_partitions`
1246 /// and per-partition failures are surfaced as inner `Err`
1247 /// entries — the call as a whole does not fail when one
1248 /// partition's FCALL fails, matching
1249 /// [`ff_sdk::admin::rotate_waitpoint_hmac_secret_all_partitions`]'s
1250 /// partial-success contract.
1251 /// * Postgres impl (Wave 4) writes one row to
1252 /// `ff_waitpoint_hmac(kid, secret, rotated_at)` and returns a
1253 /// single-entry vec with `partition = 0`.
1254 ///
1255 /// The default impl returns
1256 /// [`EngineError::Unavailable`] with
1257 /// `op = "rotate_waitpoint_hmac_secret_all"` so backends that
1258 /// haven't implemented the method surface the miss loudly rather
1259 /// than silently no-op'ing. Both concrete backends override.
1260 async fn rotate_waitpoint_hmac_secret_all(
1261 &self,
1262 _args: RotateWaitpointHmacSecretAllArgs,
1263 ) -> Result<RotateWaitpointHmacSecretAllResult, EngineError> {
1264 Err(EngineError::Unavailable {
1265 op: "rotate_waitpoint_hmac_secret_all",
1266 })
1267 }
1268
1269 /// Seed the initial waitpoint HMAC secret for a fresh deployment
1270 /// (issue #280).
1271 ///
1272 /// **Idempotent.** If a `current_kid` (Valkey per-partition) or
1273 /// an active kid row (Postgres) already exists with the given
1274 /// `kid`, the method returns
1275 /// [`SeedOutcome::AlreadySeeded`] without overwriting, reporting
1276 /// whether the stored secret matches the caller-supplied one via
1277 /// `same_secret`. Callers (cairn boot, operator tooling) invoke
1278 /// this on every boot and let the backend decide whether to
1279 /// install — removing the client-side "check then HSET" race that
1280 /// cairn's raw-HSET boot path silently tolerated.
1281 ///
1282 /// For rotation of an already-seeded secret, use
1283 /// [`Self::rotate_waitpoint_hmac_secret_all`] instead; seed is
1284 /// install-only.
1285 ///
1286 /// The default impl returns [`EngineError::Unavailable`] with
1287 /// `op = "seed_waitpoint_hmac_secret"` so backends that haven't
1288 /// implemented the method surface the miss loudly.
1289 async fn seed_waitpoint_hmac_secret(
1290 &self,
1291 _args: SeedWaitpointHmacSecretArgs,
1292 ) -> Result<SeedOutcome, EngineError> {
1293 Err(EngineError::Unavailable {
1294 op: "seed_waitpoint_hmac_secret",
1295 })
1296 }
1297
1298 // ── Budget ──
1299
1300 /// Report usage against a budget and check limits. Returns the
1301 /// typed [`ReportUsageResult`] variant; backends enforce
1302 /// idempotency via the caller-supplied
1303 /// [`UsageDimensions::dedup_key`] (RFC-012 §R7.2.3 — replaces
1304 /// the pre-Round-7 `AdmissionDecision` return).
1305 async fn report_usage(
1306 &self,
1307 handle: &Handle,
1308 budget: &BudgetId,
1309 dimensions: crate::backend::UsageDimensions,
1310 ) -> Result<ReportUsageResult, EngineError>;
1311
1312 // ── Stream reads (RFC-012 Stage 1c tranche-4; issue #87) ──
1313
1314 /// Read frames from a completed or in-flight attempt's stream.
1315 ///
1316 /// `from` / `to` are [`StreamCursor`] values — `StreamCursor::Start`
1317 /// / `StreamCursor::End` are equivalent to XRANGE `-` / `+`, and
1318 /// `StreamCursor::At("<id>")` reads from a concrete entry id.
1319 ///
1320 /// Input validation (count_limit bounds, cursor shape) is the
1321 /// caller's responsibility — SDK-side wrappers in
1322 /// [`ff-sdk`](https://docs.rs/ff-sdk) enforce bounds before
1323 /// forwarding. Backends MAY additionally reject out-of-range
1324 /// input via [`EngineError::Validation`].
1325 ///
1326 /// Gated on the `streaming` feature — stream reads are part of
1327 /// the stream-subset surface a backend without XREAD-like
1328 /// primitives may omit.
1329 #[cfg(feature = "streaming")]
1330 async fn read_stream(
1331 &self,
1332 _execution_id: &ExecutionId,
1333 _attempt_index: AttemptIndex,
1334 _from: StreamCursor,
1335 _to: StreamCursor,
1336 _count_limit: u64,
1337 ) -> Result<StreamFrames, EngineError> {
1338 Err(EngineError::Unavailable { op: "read_stream" })
1339 }
1340
1341 /// Tail a live attempt's stream.
1342 ///
1343 /// `after` is an exclusive [`StreamCursor`] — entries with id
1344 /// strictly greater than `after` are returned. `StreamCursor::Start`
1345 /// / `StreamCursor::End` are NOT accepted here; callers MUST pass
1346 /// a concrete id (or `StreamCursor::from_beginning()`). The SDK
1347 /// wrapper rejects the open markers before reaching the backend.
1348 ///
1349 /// `block_ms == 0` → non-blocking peek. `block_ms > 0` → blocks up
1350 /// to that many ms for a new entry.
1351 ///
1352 /// `visibility` (RFC-015 §6.1) filters the returned entries by
1353 /// their stored [`StreamMode`](crate::backend::StreamMode)
1354 /// `mode` field. Default
1355 /// [`TailVisibility::All`](crate::backend::TailVisibility::All)
1356 /// preserves v1 behaviour.
1357 ///
1358 /// Gated on the `streaming` feature — see [`read_stream`](Self::read_stream).
1359 #[cfg(feature = "streaming")]
1360 async fn tail_stream(
1361 &self,
1362 _execution_id: &ExecutionId,
1363 _attempt_index: AttemptIndex,
1364 _after: StreamCursor,
1365 _block_ms: u64,
1366 _count_limit: u64,
1367 _visibility: TailVisibility,
1368 ) -> Result<StreamFrames, EngineError> {
1369 Err(EngineError::Unavailable { op: "tail_stream" })
1370 }
1371
1372 /// Read the rolling summary document for an attempt (RFC-015 §6.3).
1373 ///
1374 /// Returns `Ok(None)` when no [`StreamMode::DurableSummary`](crate::backend::StreamMode::DurableSummary)
1375 /// frame has ever been appended for the attempt. Non-blocking Hash
1376 /// read; safe to call from any consumer without holding the lease.
1377 ///
1378 /// Gated on the `streaming` feature — summary reads are part of
1379 /// the stream-subset surface.
1380 #[cfg(feature = "streaming")]
1381 async fn read_summary(
1382 &self,
1383 _execution_id: &ExecutionId,
1384 _attempt_index: AttemptIndex,
1385 ) -> Result<Option<SummaryDocument>, EngineError> {
1386 Err(EngineError::Unavailable {
1387 op: "read_summary",
1388 })
1389 }
1390
1391 // ── RFC-017 Stage A — Ingress (5) ──────────────────────────
1392 //
1393 // Every method in this block has a default impl returning
1394 // `EngineError::Unavailable { op }` per RFC-017 §5.3. Concrete
1395 // backends override each method with a real body. A missing
1396 // override surfaces as a loud typed error at the call site rather
1397 // than a silent no-op.
1398
1399 /// Create an execution. Ingress row 6 (RFC-017 §4). Wraps
1400 /// `ff_create_execution` on Valkey; `INSERT INTO ff_execution ...`
1401 /// on Postgres. The `idempotency_key` + backend-side default
1402 /// `dedup_ttl_ms = 86400000` make duplicate submissions idempotent.
1403 #[cfg(feature = "core")]
1404 async fn create_execution(
1405 &self,
1406 _args: CreateExecutionArgs,
1407 ) -> Result<CreateExecutionResult, EngineError> {
1408 Err(EngineError::Unavailable {
1409 op: "create_execution",
1410 })
1411 }
1412
1413 /// Create a flow header. Ingress row 5.
1414 #[cfg(feature = "core")]
1415 async fn create_flow(
1416 &self,
1417 _args: CreateFlowArgs,
1418 ) -> Result<CreateFlowResult, EngineError> {
1419 Err(EngineError::Unavailable { op: "create_flow" })
1420 }
1421
1422 /// Atomically add an execution to a flow (single-FCALL co-located
1423 /// commit on Valkey; single-transaction UPSERT on Postgres).
1424 #[cfg(feature = "core")]
1425 async fn add_execution_to_flow(
1426 &self,
1427 _args: AddExecutionToFlowArgs,
1428 ) -> Result<AddExecutionToFlowResult, EngineError> {
1429 Err(EngineError::Unavailable {
1430 op: "add_execution_to_flow",
1431 })
1432 }
1433
1434 /// Stage a dependency edge between flow members. CAS-guarded on
1435 /// `graph_revision` — stale rev returns `Contention(StaleGraphRevision)`.
1436 #[cfg(feature = "core")]
1437 async fn stage_dependency_edge(
1438 &self,
1439 _args: StageDependencyEdgeArgs,
1440 ) -> Result<StageDependencyEdgeResult, EngineError> {
1441 Err(EngineError::Unavailable {
1442 op: "stage_dependency_edge",
1443 })
1444 }
1445
1446 /// Apply a staged dependency edge to its downstream child.
1447 #[cfg(feature = "core")]
1448 async fn apply_dependency_to_child(
1449 &self,
1450 _args: ApplyDependencyToChildArgs,
1451 ) -> Result<ApplyDependencyToChildResult, EngineError> {
1452 Err(EngineError::Unavailable {
1453 op: "apply_dependency_to_child",
1454 })
1455 }
1456
1457 /// Resolve one dependency edge after its upstream reached a
1458 /// terminal outcome — satisfy on "success", mark impossible
1459 /// otherwise. Idempotent (`AlreadyResolved` on replay).
1460 ///
1461 /// PR-7b Step 0 overlap-resolver: lifted here so cluster 2
1462 /// (`scanner/dependency_reconciler`) could trait-route through
1463 /// `Arc<dyn EngineBackend>` without a merge conflict with cluster
1464 /// 4. Cluster 4 (`completion_listener::spawn_dispatch_loop`)
1465 /// ultimately routed through the coarser
1466 /// [`Self::cascade_completion`] (per-payload) instead of looping
1467 /// over this per-edge method, because the Postgres cascade is
1468 /// outbox-driven rather than per-edge — see `cascade_completion`
1469 /// rustdoc "Timing semantics" for details.
1470 ///
1471 /// # Backend status
1472 ///
1473 /// - **Valkey:** wraps `ff_resolve_dependency` (RFC-016 Stage C
1474 /// signature). Atomic single-slot FCALL.
1475 /// - **Postgres:** `Unavailable`. PG's post-completion cascade is
1476 /// not per-edge; it runs via
1477 /// `ff_backend_postgres::dispatch::dispatch_completion(event_id)`
1478 /// keyed on the `ff_completion_event` outbox row. The Valkey-
1479 /// shaped per-edge resolve does not map cleanly to that model;
1480 /// PG's `dependency_reconciler` already calls `dispatch_completion`
1481 /// directly. The engine's PR-7b/final integration test expects
1482 /// `Unsupported` logs from Valkey-shaped scanners on a PG
1483 /// deployment — this surface honours that contract.
1484 /// - **SQLite:** `Unavailable` for the same reason (mirrors PG).
1485 ///
1486 /// The default impl returns [`EngineError::Unavailable`] so a
1487 /// backend that has not been migrated surfaces a typed
1488 /// `Unsupported`-grade error rather than a panic.
1489 #[cfg(feature = "core")]
1490 async fn resolve_dependency(
1491 &self,
1492 _args: crate::contracts::ResolveDependencyArgs,
1493 ) -> Result<crate::contracts::ResolveDependencyOutcome, EngineError> {
1494 Err(EngineError::Unavailable {
1495 op: "resolve_dependency",
1496 })
1497 }
1498
1499 /// Cascade a terminal-execution completion into its downstream
1500 /// edges. Consumed by
1501 /// `ff-engine::completion_listener::spawn_dispatch_loop` (PR-7b
1502 /// Cluster 4) to trait-route the post-completion DAG-promotion
1503 /// path through `Arc<dyn EngineBackend>`.
1504 ///
1505 /// Distinct from [`Self::resolve_dependency`]: that method is
1506 /// per-edge (one `ff_resolve_dependency` FCALL); this method is
1507 /// per-completion and orchestrates the full outgoing-edge walk
1508 /// plus `child_skipped` recursion (Valkey) or outbox-event
1509 /// dispatch (Postgres).
1510 ///
1511 /// # Timing semantics
1512 ///
1513 /// Backends diverge on *when* the caller observes cascade work.
1514 /// See [`CascadeOutcome`] for the full contract; the short form:
1515 ///
1516 /// - **Valkey:** synchronous. FCALL-driven walk completes inline;
1517 /// `child_skipped` descendants are recursively cascaded up to the
1518 /// internal `MAX_CASCADE_DEPTH` cap before return.
1519 /// - **Postgres:** asynchronous via the `ff_completion_event`
1520 /// outbox. The call resolves `payload` to its `event_id`, runs
1521 /// `ff_backend_postgres::dispatch::dispatch_completion`, and
1522 /// returns when the outbox row has been claimed + its direct
1523 /// hops advanced. Further-descendant cascades ride their own
1524 /// outbox events (emitted by the per-hop tx) — NOT this call.
1525 ///
1526 /// Consumers that depend on synchronous cascade must either target
1527 /// Valkey explicitly or observe PG's `dispatched_at_ms` clearance
1528 /// via the `dependency_reconciler` partial index to verify drain.
1529 ///
1530 /// The default impl returns [`EngineError::Unavailable`] so
1531 /// backends that have not been migrated surface a typed error
1532 /// rather than a panic.
1533 ///
1534 /// [`CascadeOutcome`]: crate::contracts::CascadeOutcome
1535 #[cfg(feature = "core")]
1536 async fn cascade_completion(
1537 &self,
1538 _payload: &crate::backend::CompletionPayload,
1539 ) -> Result<crate::contracts::CascadeOutcome, EngineError> {
1540 Err(EngineError::Unavailable {
1541 op: "cascade_completion",
1542 })
1543 }
1544
1545 // ── RFC-017 Stage A — Operator control (4) ─────────────────
1546
1547 /// Operator-initiated execution cancel (row 2).
1548 #[cfg(feature = "core")]
1549 async fn cancel_execution(
1550 &self,
1551 _args: CancelExecutionArgs,
1552 ) -> Result<CancelExecutionResult, EngineError> {
1553 Err(EngineError::Unavailable {
1554 op: "cancel_execution",
1555 })
1556 }
1557
1558 /// Re-score an execution's eligibility priority (row 17).
1559 #[cfg(feature = "core")]
1560 async fn change_priority(
1561 &self,
1562 _args: ChangePriorityArgs,
1563 ) -> Result<ChangePriorityResult, EngineError> {
1564 Err(EngineError::Unavailable {
1565 op: "change_priority",
1566 })
1567 }
1568
1569 /// Replay a terminal execution (row 22). Variadic KEYS handling
1570 /// (inbound-edge pre-read) is hidden inside the Valkey impl per
1571 /// RFC-017 §4 row 3.
1572 #[cfg(feature = "core")]
1573 async fn replay_execution(
1574 &self,
1575 _args: ReplayExecutionArgs,
1576 ) -> Result<ReplayExecutionResult, EngineError> {
1577 Err(EngineError::Unavailable {
1578 op: "replay_execution",
1579 })
1580 }
1581
1582 /// Operator-initiated lease revoke (row 19).
1583 #[cfg(feature = "core")]
1584 async fn revoke_lease(
1585 &self,
1586 _args: RevokeLeaseArgs,
1587 ) -> Result<RevokeLeaseResult, EngineError> {
1588 Err(EngineError::Unavailable { op: "revoke_lease" })
1589 }
1590
1591 // ── cairn #389 — service-layer typed FCALL surface ────────────
1592 //
1593 // These methods mirror `complete`/`fail`/`renew` (which take a
1594 // worker [`Handle`]) but dispatch against `(execution_id, fence)`
1595 // tuples supplied directly. Service-layer callers (cairn's
1596 // `valkey_control_plane_impl.rs`, future consumers that hold a
1597 // run/lease descriptor without a `Handle`) use these to avoid
1598 // going through the raw `ferriskey::Value` FCALL escape hatch.
1599 //
1600 // Same shape / same precedent as `suspend_by_triple` (cairn #322).
1601 //
1602 // Default impl returns [`EngineError::Unavailable`] so the trait
1603 // addition is non-breaking for out-of-tree backends. The in-tree
1604 // Valkey backend overrides; Postgres + SQLite keep the default
1605 // until follow-up parity work lands (consistent with
1606 // `issue_reclaim_grant` / `reclaim_execution` precedent).
1607
1608 /// Service-layer `complete_execution` — peer of [`Self::complete`]
1609 /// that takes a fence triple instead of a worker [`Handle`]. See
1610 /// the group preamble above for cairn-migration context.
1611 #[cfg(feature = "core")]
1612 async fn complete_execution(
1613 &self,
1614 _args: CompleteExecutionArgs,
1615 ) -> Result<CompleteExecutionResult, EngineError> {
1616 Err(EngineError::Unavailable {
1617 op: "complete_execution",
1618 })
1619 }
1620
1621 /// Service-layer `fail_execution` — peer of [`Self::fail`] that
1622 /// takes a fence triple instead of a worker [`Handle`].
1623 #[cfg(feature = "core")]
1624 async fn fail_execution(
1625 &self,
1626 _args: FailExecutionArgs,
1627 ) -> Result<FailExecutionResult, EngineError> {
1628 Err(EngineError::Unavailable {
1629 op: "fail_execution",
1630 })
1631 }
1632
1633 /// Service-layer `renew_lease` — peer of [`Self::renew`] that
1634 /// takes a fence triple instead of a worker [`Handle`].
1635 #[cfg(feature = "core")]
1636 async fn renew_lease(
1637 &self,
1638 _args: RenewLeaseArgs,
1639 ) -> Result<RenewLeaseResult, EngineError> {
1640 Err(EngineError::Unavailable { op: "renew_lease" })
1641 }
1642
1643 /// Service-layer `resume_execution` — transitions a suspended
1644 /// execution back to runnable. Distinct from
1645 /// [`Self::claim_from_resume_grant`] (which mints a worker handle
1646 /// against an already-eligible resumed execution): this method is
1647 /// the lifecycle transition primitive the control plane calls
1648 /// when an operator / auto-resume policy moves a suspended
1649 /// execution forward.
1650 ///
1651 /// The Valkey impl pre-reads `current_waitpoint_id` + `lane_id`
1652 /// from `exec_core` so callers only need the execution id + the
1653 /// trigger type — same ergonomics as `revoke_lease` reading
1654 /// `current_worker_instance_id` when callers omit it.
1655 #[cfg(feature = "core")]
1656 async fn resume_execution(
1657 &self,
1658 _args: ResumeExecutionArgs,
1659 ) -> Result<ResumeExecutionResult, EngineError> {
1660 Err(EngineError::Unavailable {
1661 op: "resume_execution",
1662 })
1663 }
1664
1665 /// Service-layer `check_admission_and_record` — atomic admission
1666 /// check against a quota policy. Callers supply the policy id +
1667 /// dimension (quota keys live on their own `{q:<policy>}`
1668 /// partition that cannot be derived from `execution_id`, so these
1669 /// travel outside [`CheckAdmissionArgs`]). `dimension` defaults
1670 /// to `"default"` inside the Valkey body when the caller passes
1671 /// an empty string — matches cairn's pre-migration default.
1672 #[cfg(feature = "core")]
1673 async fn check_admission(
1674 &self,
1675 _quota_policy_id: &crate::types::QuotaPolicyId,
1676 _dimension: &str,
1677 _args: CheckAdmissionArgs,
1678 ) -> Result<CheckAdmissionResult, EngineError> {
1679 Err(EngineError::Unavailable {
1680 op: "check_admission",
1681 })
1682 }
1683
1684 /// Service-layer `evaluate_flow_eligibility` — read-only check
1685 /// that returns the execution's current eligibility state
1686 /// (`eligible`, `blocked_by_dependencies`, or a backend-specific
1687 /// status string). Called by cairn's dependency-resolution path
1688 /// to decide whether a downstream execution can proceed.
1689 #[cfg(feature = "core")]
1690 async fn evaluate_flow_eligibility(
1691 &self,
1692 _args: EvaluateFlowEligibilityArgs,
1693 ) -> Result<EvaluateFlowEligibilityResult, EngineError> {
1694 Err(EngineError::Unavailable {
1695 op: "evaluate_flow_eligibility",
1696 })
1697 }
1698
1699 // ── #454 — cairn typed-FCALL additions (4) ─────────────────
1700
1701 /// Per-execution budget spend with tenant-open dimensions.
1702 ///
1703 /// Cairn #454 Q1/Q2: takes an open-set `BTreeMap<String, u64>` of
1704 /// deltas (distinct from the fixed-shape
1705 /// [`Self::report_usage`]/[`Self::report_usage_admin`] which use
1706 /// [`UsageDimensions`]). Return shape reuses [`ReportUsageResult`]
1707 /// — same `Ok` / `SoftBreach` / `HardBreach` / `AlreadyApplied`
1708 /// variants cairn's UI already branches on.
1709 ///
1710 /// The default impl returns [`EngineError::Unavailable`] so the
1711 /// trait remains additive for backends that have not landed the
1712 /// #454 body yet.
1713 #[cfg(feature = "core")]
1714 async fn record_spend(
1715 &self,
1716 _args: RecordSpendArgs,
1717 ) -> Result<ReportUsageResult, EngineError> {
1718 Err(EngineError::Unavailable {
1719 op: "record_spend",
1720 })
1721 }
1722
1723 /// Per-execution budget attribution release.
1724 ///
1725 /// Called on execution termination to reverse this execution's
1726 /// contribution to a budget counter. Per cairn #454 clarification
1727 /// this is **per-execution**, not a whole-budget flush — the
1728 /// budget persists across executions.
1729 ///
1730 /// The default impl returns [`EngineError::Unavailable`].
1731 #[cfg(feature = "core")]
1732 async fn release_budget(
1733 &self,
1734 _args: ReleaseBudgetArgs,
1735 ) -> Result<(), EngineError> {
1736 Err(EngineError::Unavailable {
1737 op: "release_budget",
1738 })
1739 }
1740
1741 /// Operator-driven approval-signal delivery.
1742 ///
1743 /// Pre-shaped variant of [`Self::deliver_signal`] where the caller
1744 /// **does not carry the waitpoint token**. The backend reads the
1745 /// token from `ff_waitpoint_pending`, HMAC-verifies server-side,
1746 /// and dispatches. Operator API never sees the token bytes.
1747 ///
1748 /// Cairn #454 Q3 — `signal_name` is a flat string (conventional
1749 /// values `"approved"` / `"rejected"`); audit metadata lives in
1750 /// cairn's audit log, not on the FF surface.
1751 ///
1752 /// The default impl returns [`EngineError::Unavailable`].
1753 #[cfg(feature = "core")]
1754 async fn deliver_approval_signal(
1755 &self,
1756 _args: DeliverApprovalSignalArgs,
1757 ) -> Result<DeliverSignalResult, EngineError> {
1758 Err(EngineError::Unavailable {
1759 op: "deliver_approval_signal",
1760 })
1761 }
1762
1763 /// Backend-atomic `issue_claim_grant` + `claim_execution`.
1764 ///
1765 /// Cairn #454 Q4 — composing the two primitives caller-side risks
1766 /// leaking a grant if `claim_execution` fails after
1767 /// `issue_claim_grant` succeeded. This method's contract is that
1768 /// the composition is backend-atomic: Valkey fuses them in one
1769 /// FCALL; PG/SQLite fuse them in one tx.
1770 ///
1771 /// The default impl **must not** be a chained call — it returns
1772 /// [`EngineError::Unavailable`] so consumers cannot accidentally
1773 /// use a non-atomic fallback.
1774 #[cfg(feature = "core")]
1775 async fn issue_grant_and_claim(
1776 &self,
1777 _args: IssueGrantAndClaimArgs,
1778 ) -> Result<ClaimGrantOutcome, EngineError> {
1779 Err(EngineError::Unavailable {
1780 op: "issue_grant_and_claim",
1781 })
1782 }
1783
1784 // ── RFC-017 Stage A — Budget + quota admin (5) ─────────────
1785
1786 /// Create a budget definition (row 6).
1787 #[cfg(feature = "core")]
1788 async fn create_budget(
1789 &self,
1790 _args: CreateBudgetArgs,
1791 ) -> Result<CreateBudgetResult, EngineError> {
1792 Err(EngineError::Unavailable {
1793 op: "create_budget",
1794 })
1795 }
1796
1797 /// Reset a budget's usage counters (row 10).
1798 #[cfg(feature = "core")]
1799 async fn reset_budget(
1800 &self,
1801 _args: ResetBudgetArgs,
1802 ) -> Result<ResetBudgetResult, EngineError> {
1803 Err(EngineError::Unavailable { op: "reset_budget" })
1804 }
1805
1806 /// Create a quota policy (row 7).
1807 #[cfg(feature = "core")]
1808 async fn create_quota_policy(
1809 &self,
1810 _args: CreateQuotaPolicyArgs,
1811 ) -> Result<CreateQuotaPolicyResult, EngineError> {
1812 Err(EngineError::Unavailable {
1813 op: "create_quota_policy",
1814 })
1815 }
1816
1817 /// Read-only budget status for operator visibility (row 8).
1818 #[cfg(feature = "core")]
1819 async fn get_budget_status(
1820 &self,
1821 _id: &BudgetId,
1822 ) -> Result<BudgetStatus, EngineError> {
1823 Err(EngineError::Unavailable {
1824 op: "get_budget_status",
1825 })
1826 }
1827
1828 /// Admin-path `report_usage` (row 9 + RFC-017 §5 round-1 F4).
1829 /// Distinct from the existing [`Self::report_usage`] which takes
1830 /// a worker handle — the admin path has no lease context.
1831 #[cfg(feature = "core")]
1832 async fn report_usage_admin(
1833 &self,
1834 _budget: &BudgetId,
1835 _args: ReportUsageAdminArgs,
1836 ) -> Result<ReportUsageResult, EngineError> {
1837 Err(EngineError::Unavailable {
1838 op: "report_usage_admin",
1839 })
1840 }
1841
1842 // ── RFC-017 Stage A — Read + diagnostics (3) ───────────────
1843
1844 /// Fetch the stored result payload for a completed execution
1845 /// (row 4). Returns `Ok(None)` when the execution is missing, not
1846 /// yet complete, or its payload was trimmed by retention policy.
1847 async fn get_execution_result(
1848 &self,
1849 _id: &ExecutionId,
1850 ) -> Result<Option<Vec<u8>>, EngineError> {
1851 Err(EngineError::Unavailable {
1852 op: "get_execution_result",
1853 })
1854 }
1855
1856 /// List the pending-or-active waitpoints for an execution, cursor
1857 /// paginated (row 5 / §8). Stage A preserves the existing
1858 /// `PendingWaitpointInfo` shape; Stage D ships the §8 HMAC
1859 /// sanitisation + `(token_kid, token_fingerprint)` schema.
1860 #[cfg(feature = "core")]
1861 async fn list_pending_waitpoints(
1862 &self,
1863 _args: ListPendingWaitpointsArgs,
1864 ) -> Result<ListPendingWaitpointsResult, EngineError> {
1865 Err(EngineError::Unavailable {
1866 op: "list_pending_waitpoints",
1867 })
1868 }
1869
1870 /// Backend-level reachability probe (row 1). Valkey: `PING`;
1871 /// Postgres: `SELECT 1`.
1872 async fn ping(&self) -> Result<(), EngineError> {
1873 Err(EngineError::Unavailable { op: "ping" })
1874 }
1875
1876 // ── RFC-017 Stage A — Scheduling (1) ───────────────────────
1877
1878 /// Scheduler-routed claim entrypoint (row 18, RFC-017 §7). Valkey
1879 /// forwards to its `ff_scheduler::Scheduler` cursor; Postgres
1880 /// forwards to `PostgresScheduler`'s `FOR UPDATE SKIP LOCKED`
1881 /// path.
1882 ///
1883 /// Backends that carry an embedded scheduler (e.g. `ValkeyBackend`
1884 /// constructed via `with_embedded_scheduler`, or `PostgresBackend`
1885 /// with its `with_scanners` sibling) route the claim through it.
1886 /// Backends without a wired scheduler return
1887 /// [`EngineError::Unavailable`]. HTTP consumers use
1888 /// `FlowFabricWorker::claim_via_server` instead.
1889 #[cfg(feature = "core")]
1890 async fn claim_for_worker(
1891 &self,
1892 _args: ClaimForWorkerArgs,
1893 ) -> Result<ClaimForWorkerOutcome, EngineError> {
1894 Err(EngineError::Unavailable {
1895 op: "claim_for_worker",
1896 })
1897 }
1898
1899 // ── Cross-cutting (RFC-017 Stage B trait-lift) ──────────────
1900
1901 /// Static observability label identifying the backend family in
1902 /// logs + metrics (RFC-017 §5.4 + §9 Stage B). Default impl
1903 /// returns `"unknown"` so legacy `impl EngineBackend` blocks that
1904 /// have not upgraded keep compiling; every in-tree backend
1905 /// overrides — `ValkeyBackend` → `"valkey"`, `PostgresBackend` →
1906 /// `"postgres"`.
1907 fn backend_label(&self) -> &'static str {
1908 "unknown"
1909 }
1910
1911 /// Backend downcast escape hatch (v0.12 PR-7a transitional).
1912 ///
1913 /// Scanner supervisors in `ff-engine` still dispatch through a
1914 /// concrete `ferriskey::Client`; to keep the engine's public
1915 /// boundary backend-agnostic (`Arc<dyn EngineBackend>`) while the
1916 /// scanner internals remain Valkey-shaped, the engine downcasts
1917 /// via this method and reaches in for the embedded client. Every
1918 /// backend that wants to be consumed by `Engine::start_with_completions`
1919 /// overrides this to return `self` as `&dyn Any`; the default
1920 /// returns a placeholder so a stray `downcast_ref` fails cleanly
1921 /// rather than risking unsound behaviour.
1922 ///
1923 /// v0.13 (PR-7b) will trait-ify individual scanners onto
1924 /// `EngineBackend` and retire `ff-engine`'s dependence on this
1925 /// downcast path. The method itself will remain on the trait
1926 /// (likely deprecated) rather than be removed — removing a
1927 /// public trait method is a breaking change for external
1928 /// `impl EngineBackend` blocks.
1929 fn as_any(&self) -> &(dyn std::any::Any + 'static) {
1930 // Placeholder so the default does not expose `Self` for
1931 // downcast. Backends override to return `self`.
1932 &()
1933 }
1934
1935 /// RFC-018 Stage A: snapshot of this backend's identity + the
1936 /// flat `Supports` surface it can actually service. Consumers use
1937 /// this at startup to gate UI features / choose between alternative
1938 /// code paths before dispatching. See
1939 /// `rfcs/RFC-018-backend-capability-discovery.md` for the full
1940 /// discovery contract and the four owner-adjudicated open
1941 /// questions (granularity: coarse; version: struct; sync; no
1942 /// event stream).
1943 ///
1944 /// Default: returns a value tagged `family = "unknown"` with every
1945 /// `supports.*` bool `false`, so pre-RFC-018 out-of-tree backends
1946 /// keep compiling and consumers treat "all false" as "dispatch
1947 /// and catch [`EngineError::Unavailable`]" (pre-RFC-018 behaviour).
1948 /// Concrete in-tree backends (`ValkeyBackend`, `PostgresBackend`)
1949 /// override to populate a real value.
1950 ///
1951 /// Sync (no `.await`): backend-static info should not require a
1952 /// probe on every query. Dynamic probes happen once at
1953 /// `connect*` time and cache the result.
1954 fn capabilities(&self) -> crate::capability::Capabilities {
1955 crate::capability::Capabilities::new(
1956 crate::capability::BackendIdentity::new(
1957 "unknown",
1958 crate::capability::Version::new(0, 0, 0),
1959 "unknown",
1960 ),
1961 crate::capability::Supports::none(),
1962 )
1963 }
1964
1965 /// Issue #281: run one-time backend-specific boot preparation.
1966 ///
1967 /// Intended to run ONCE per deployment startup — NOT per request.
1968 /// Idempotent and safe for consumers to call on every application
1969 /// boot; backends that have nothing to do return
1970 /// [`PrepareOutcome::NoOp`] without side effects.
1971 ///
1972 /// Per-backend behaviour:
1973 ///
1974 /// * **Valkey** — issues `FUNCTION LOAD REPLACE` for the
1975 /// `flowfabric` Lua library (with bounded retry on transient
1976 /// transport faults; permanent compile errors surface as
1977 /// [`EngineError::Transport`] without retry). Returns
1978 /// [`PrepareOutcome::Applied`] carrying
1979 /// `"FUNCTION LOAD (flowfabric lib v<N>)"`.
1980 /// * **Postgres** — returns [`PrepareOutcome::NoOp`]. Schema
1981 /// migrations are applied out-of-band per
1982 /// `rfcs/drafts/v0.7-migration-master.md §Q12`; the backend
1983 /// runs a schema-version check at connect time and refuses to
1984 /// start on mismatch, so no boot-side prepare work remains.
1985 /// * **Default impl** — returns [`PrepareOutcome::NoOp`] so
1986 /// out-of-tree backends without preparation work compile
1987 /// without boilerplate.
1988 ///
1989 /// # Relationship to the in-tree boot path
1990 ///
1991 /// `ValkeyBackend::initialize_deployment` (called from
1992 /// `Server::start_with_metrics`) already invokes
1993 /// [`ensure_library`](ff_script::loader::ensure_library) inline as
1994 /// its step 4; that path is unchanged. `prepare()` exists as a
1995 /// **trait-surface entry point** so consumers that construct an
1996 /// `Arc<dyn EngineBackend>` outside of `Server` (e.g.
1997 /// cairn-fabric's boot path at `cairn-fabric/src/boot.rs`) can
1998 /// run the same preparation without reaching into
1999 /// backend-specific modules. The overlap is intentional: calling
2000 /// both `prepare()` and `initialize_deployment` is safe because
2001 /// `FUNCTION LOAD REPLACE` is idempotent under the version
2002 /// check.
2003 ///
2004 /// # Layer forwarding
2005 ///
2006 /// Layer impls (`HookedBackend`, ff-sdk layers) do NOT forward
2007 /// `prepare` today — consistent with `backend_label` / `ping` /
2008 /// `shutdown_prepare`. Consumers that wrap a backend in layers
2009 /// MUST call `prepare()` on the raw backend before wrapping, or
2010 /// accept the default [`PrepareOutcome::NoOp`].
2011 async fn prepare(&self) -> Result<PrepareOutcome, EngineError> {
2012 Ok(PrepareOutcome::NoOp)
2013 }
2014
2015 /// Drain-before-shutdown hook (RFC-017 §5.4). The server calls
2016 /// this before draining its own background tasks so backend-
2017 /// scoped primitives (Valkey stream semaphore, Postgres sqlx
2018 /// pool, …) can close their gates and await in-flight work up to
2019 /// `grace`.
2020 ///
2021 /// Default impl returns `Ok(())` — a no-op backend has nothing
2022 /// backend-scoped to drain. Concrete backends whose data plane
2023 /// owns resources (connection pools, semaphores, listeners)
2024 /// override with a real body.
2025 async fn shutdown_prepare(&self, _grace: Duration) -> Result<(), EngineError> {
2026 Ok(())
2027 }
2028
2029 // ── RFC-017 Stage E2 — `Server::client` removal (header + reads) ───
2030
2031 /// RFC-017 Stage E2: the "header" portion of `cancel_flow` — run the
2032 /// atomic flow-state flip (Valkey: `ff_cancel_flow` FCALL; Postgres:
2033 /// `cancel_flow_once` tx), decode policy + membership, and surface
2034 /// the `flow_already_terminal` idempotency branch as a first-class
2035 /// [`CancelFlowHeader::AlreadyTerminal`] so the Server can build
2036 /// the wire [`CancelFlowResult`] without reaching for a raw
2037 /// `Client`. Separate from the existing
2038 /// [`EngineBackend::cancel_flow`] entry point (which takes the
2039 /// enum-typed `(policy, wait)` split and returns the wait-collapsed
2040 /// `CancelFlowResult`) because the Server owns its own
2041 /// wait-dispatch + member-cancel machinery via
2042 /// [`EngineBackend::cancel_execution`] + backlog ack.
2043 ///
2044 /// Default impl returns [`EngineError::Unavailable`] so un-migrated
2045 /// backends surface the miss loudly.
2046 #[cfg(feature = "core")]
2047 async fn cancel_flow_header(
2048 &self,
2049 _args: CancelFlowArgs,
2050 ) -> Result<crate::contracts::CancelFlowHeader, EngineError> {
2051 Err(EngineError::Unavailable {
2052 op: "cancel_flow_header",
2053 })
2054 }
2055
2056 /// RFC-017 Stage E2: best-effort acknowledgement that one member of
2057 /// a `cancel_all` flow has completed its per-member cancel. Drains
2058 /// the member from the flow's `pending_cancels` set and, if empty,
2059 /// removes the flow from the partition-level `cancel_backlog`
2060 /// (Valkey: `ff_ack_cancel_member` FCALL; Postgres: table write —
2061 /// default `Unavailable` until Wave 9).
2062 ///
2063 /// Failures are swallowed by the caller — the cancel-backlog
2064 /// reconciler is the authoritative drain — but a typed error here
2065 /// lets the caller log a backend-scoped context string.
2066 #[cfg(feature = "core")]
2067 async fn ack_cancel_member(
2068 &self,
2069 _flow_id: &FlowId,
2070 _execution_id: &ExecutionId,
2071 ) -> Result<(), EngineError> {
2072 Err(EngineError::Unavailable {
2073 op: "ack_cancel_member",
2074 })
2075 }
2076
2077 /// RFC-017 Stage E2: full-shape execution read used by the
2078 /// `GET /v1/executions/{id}` HTTP route. Returns the legacy
2079 /// [`ExecutionInfo`] wire shape (not the decoupled
2080 /// [`ExecutionSnapshot`]) so the existing HTTP response bytes stay
2081 /// identical across the migration.
2082 ///
2083 /// `Ok(None)` ⇒ no such execution. Default `Unavailable` because
2084 /// the Valkey HGETALL-and-parse is backend-specific.
2085 #[cfg(feature = "core")]
2086 async fn read_execution_info(
2087 &self,
2088 _id: &ExecutionId,
2089 ) -> Result<Option<ExecutionInfo>, EngineError> {
2090 Err(EngineError::Unavailable {
2091 op: "read_execution_info",
2092 })
2093 }
2094
2095 /// RFC-017 Stage E2: narrow `public_state` read used by the
2096 /// `GET /v1/executions/{id}/state` HTTP route. Returns `Ok(None)`
2097 /// when the execution is missing. Default `Unavailable`.
2098 #[cfg(feature = "core")]
2099 async fn read_execution_state(
2100 &self,
2101 _id: &ExecutionId,
2102 ) -> Result<Option<PublicState>, EngineError> {
2103 Err(EngineError::Unavailable {
2104 op: "read_execution_state",
2105 })
2106 }
2107
2108 // ── RFC-019 Stage A/B/C — Stream-cursor subscriptions ─────────
2109 //
2110 // Four owner-adjudicated families (RFC-019 §Open Questions #5):
2111 // `lease_history`, `completion`, `signal_delivery`,
2112 // `instance_tags`. Stage C (this crate) promotes each family to
2113 // a typed event enum; consumers `match` on variants instead of
2114 // parsing a backend-shaped byte blob.
2115 //
2116 // Each method returns a family-specific subscription alias (see
2117 // [`crate::stream_events`]). All defaults return
2118 // `EngineError::Unavailable` per RFC-017 trait-growth conventions.
2119
2120 /// Subscribe to lease lifecycle events (acquired / renewed /
2121 /// expired / reclaimed / revoked) for the partition this backend
2122 /// is configured with.
2123 ///
2124 /// Cross-partition fan-out is consumer-side merge: subscribe
2125 /// per-partition backend instance and interleave on the read
2126 /// side. Yields
2127 /// `Err(EngineError::StreamDisconnected { cursor })` on backend
2128 /// disconnect; resume by calling this method again with the
2129 /// returned cursor.
2130 ///
2131 /// `filter` (#282): when `filter.instance_tag` is `Some((k, v))`,
2132 /// only events whose execution carries tag `k = v` are yielded
2133 /// (matching the [`crate::backend::ScannerFilter`] surface from
2134 /// #122). Pass `&ScannerFilter::default()` for unfiltered
2135 /// behaviour. Filtering happens inside the backend stream; the
2136 /// [`crate::stream_events::LeaseHistorySubscription`] return type
2137 /// is unchanged.
2138 async fn subscribe_lease_history(
2139 &self,
2140 _cursor: crate::stream_subscribe::StreamCursor,
2141 _filter: &crate::backend::ScannerFilter,
2142 ) -> Result<crate::stream_events::LeaseHistorySubscription, EngineError> {
2143 Err(EngineError::Unavailable {
2144 op: "subscribe_lease_history",
2145 })
2146 }
2147
2148 /// Subscribe to completion events (terminal state transitions).
2149 ///
2150 /// - **Postgres**: wraps the `ff_completion_event` outbox +
2151 /// LISTEN/NOTIFY machinery. Durable via event-id cursor.
2152 /// - **Valkey**: wraps the RESP3 `ff:dag:completions` pubsub
2153 /// subscriber. Pubsub is at-most-once over the live
2154 /// subscription window; the cursor is always the empty
2155 /// sentinel. If you need at-least-once replay with durable
2156 /// cursor resume, use the Postgres backend (see
2157 /// `docs/POSTGRES_PARITY_MATRIX.md` row `subscribe_completion`).
2158 ///
2159 /// `filter` (#282): see [`Self::subscribe_lease_history`]. Valkey
2160 /// reuses the `subscribe_completions_filtered` per-event HGET
2161 /// gate; Postgres filters inline against the outbox's denormalised
2162 /// `instance_tag` column.
2163 async fn subscribe_completion(
2164 &self,
2165 _cursor: crate::stream_subscribe::StreamCursor,
2166 _filter: &crate::backend::ScannerFilter,
2167 ) -> Result<crate::stream_events::CompletionSubscription, EngineError> {
2168 Err(EngineError::Unavailable {
2169 op: "subscribe_completion",
2170 })
2171 }
2172
2173 /// Subscribe to signal-delivery events (satisfied / buffered /
2174 /// deduped).
2175 ///
2176 /// `filter` (#282): see [`Self::subscribe_lease_history`].
2177 async fn subscribe_signal_delivery(
2178 &self,
2179 _cursor: crate::stream_subscribe::StreamCursor,
2180 _filter: &crate::backend::ScannerFilter,
2181 ) -> Result<crate::stream_events::SignalDeliverySubscription, EngineError> {
2182 Err(EngineError::Unavailable {
2183 op: "subscribe_signal_delivery",
2184 })
2185 }
2186
2187 /// Subscribe to instance-tag events (tag attached / cleared).
2188 ///
2189 /// Producer wiring is deferred per #311 audit ("no concrete
2190 /// demand"); the trait method exists for API uniformity across
2191 /// the four families. Backends currently return
2192 /// `EngineError::Unavailable`.
2193 async fn subscribe_instance_tags(
2194 &self,
2195 _cursor: crate::stream_subscribe::StreamCursor,
2196 ) -> Result<crate::stream_events::InstanceTagSubscription, EngineError> {
2197 Err(EngineError::Unavailable {
2198 op: "subscribe_instance_tags",
2199 })
2200 }
2201
2202 // ── PR-7b Cluster 1 — Foundation scanner operations ─────────
2203 //
2204 // Per-execution write hooks invoked by the engine scanner loop.
2205 // Scanner bodies discover candidate executions via partition
2206 // indices (Valkey ZSETs / PG index scans) and, for each candidate,
2207 // call through to one of the methods below to perform the atomic
2208 // state-flip. Defaults return `EngineError::Unavailable { op }`;
2209 // Valkey impls wrap the corresponding `ff_*` FCALL; Postgres impls
2210 // run a single-row tx mirroring the Lua semantic.
2211
2212 /// Lease-expiry scanner hook — mark an expired lease as reclaimable
2213 /// so another worker can redeem the execution. Atomic per call.
2214 ///
2215 /// Valkey: `FCALL ff_mark_lease_expired_if_due`.
2216 /// Postgres: single-row tx on `ff_attempt` + `ff_exec_core` (see
2217 /// `ff_backend_postgres::reconcilers::lease_expiry`).
2218 async fn mark_lease_expired_if_due(
2219 &self,
2220 _partition: crate::partition::Partition,
2221 _execution_id: &ExecutionId,
2222 ) -> Result<(), EngineError> {
2223 Err(EngineError::Unavailable {
2224 op: "mark_lease_expired_if_due",
2225 })
2226 }
2227
2228 /// Delayed-promoter scanner hook — promote a delayed execution to
2229 /// `eligible_now` once its `delay_until` has passed.
2230 ///
2231 /// Valkey: `FCALL ff_promote_delayed`.
2232 /// Postgres: Wave 9 schema scope (no current impl).
2233 async fn promote_delayed(
2234 &self,
2235 _partition: crate::partition::Partition,
2236 _lane: &LaneId,
2237 _execution_id: &ExecutionId,
2238 _now_ms: TimestampMs,
2239 ) -> Result<(), EngineError> {
2240 Err(EngineError::Unavailable {
2241 op: "promote_delayed",
2242 })
2243 }
2244
2245 /// Pending-waitpoint-expiry scanner hook — close a pending
2246 /// waitpoint whose deadline has passed (wake the suspended
2247 /// execution with a timeout signal).
2248 ///
2249 /// Valkey: `FCALL ff_close_waitpoint`.
2250 /// Postgres: Wave 9 schema scope (no current impl).
2251 async fn close_waitpoint(
2252 &self,
2253 _partition: crate::partition::Partition,
2254 _execution_id: &ExecutionId,
2255 _waitpoint_id: &str,
2256 _now_ms: TimestampMs,
2257 ) -> Result<(), EngineError> {
2258 Err(EngineError::Unavailable {
2259 op: "close_waitpoint",
2260 })
2261 }
2262
2263 /// Shared hook for the attempt-timeout and execution-deadline
2264 /// scanners — terminate or retry an execution whose wall-clock
2265 /// budget has elapsed. `phase` discriminates which of the two
2266 /// scanner paths is calling so the backend can preserve diagnostic
2267 /// breadcrumbs without forking the surface.
2268 ///
2269 /// Valkey: `FCALL ff_expire_execution` (with `phase` passed through
2270 /// as an ARGV discriminator).
2271 /// Postgres: single-row tx mirror of the Lua semantic for
2272 /// `AttemptTimeout`; `ExecutionDeadline` is Wave 9 schema scope.
2273 async fn expire_execution(
2274 &self,
2275 _partition: crate::partition::Partition,
2276 _execution_id: &ExecutionId,
2277 _phase: ExpirePhase,
2278 _now_ms: TimestampMs,
2279 ) -> Result<(), EngineError> {
2280 Err(EngineError::Unavailable {
2281 op: "expire_execution",
2282 })
2283 }
2284
2285 /// Suspension-timeout scanner hook — expire a suspended execution
2286 /// whose suspension deadline has passed (wake with timeout).
2287 ///
2288 /// Valkey: `FCALL ff_expire_suspension`.
2289 /// Postgres: single-row tx on `ff_suspend` + `ff_exec_core`.
2290 async fn expire_suspension(
2291 &self,
2292 _partition: crate::partition::Partition,
2293 _execution_id: &ExecutionId,
2294 _now_ms: TimestampMs,
2295 ) -> Result<(), EngineError> {
2296 Err(EngineError::Unavailable {
2297 op: "expire_suspension",
2298 })
2299 }
2300
2301 // ── PR-7b Cluster 2 — Reconciler scanner operations ─────────
2302 //
2303 // Per-execution write hooks invoked by the `unblock` scanner. The
2304 // other two cluster-2 scanners (`budget_reset`, `dependency_reconciler`)
2305 // route through `reset_budget` + `resolve_dependency`, which are
2306 // already part of the RFC-017 service-layer surface and live above.
2307
2308 /// Unblock-scanner hook — move an execution from a blocked ZSET back
2309 /// to `eligible_now` once its blocking condition has cleared (budget
2310 /// under limit, quota window drained, or a capable worker has come
2311 /// online). `expected_blocking_reason` discriminates which of the
2312 /// `blocked:{budget,quota,route}` sets the execution is leaving and
2313 /// also fences against a stale unblock (Lua rejects if the core's
2314 /// `blocking_reason` no longer matches).
2315 ///
2316 /// # Backend status
2317 ///
2318 /// - **Valkey:** wraps `ff_unblock_execution` (RFC-010 §6). Atomic
2319 /// single-slot FCALL on `{p:N}`.
2320 /// - **Postgres:** `Unavailable` (structural). PG does not persist a
2321 /// per-reason `blocked:{budget,quota,route}` index — scheduler
2322 /// eligibility is re-evaluated live via SQL predicates on
2323 /// `ff_exec_core` + budget / quota tables (see
2324 /// `ff_backend_postgres::scheduler`). Nothing to reconcile, so the
2325 /// engine's PG scanner loop does not run this path; callers on
2326 /// a PG deployment receive the typed `Unavailable` per PR-7b/final
2327 /// contract.
2328 /// - **SQLite:** `Unavailable` for the same reason as Postgres.
2329 async fn unblock_execution(
2330 &self,
2331 _partition: crate::partition::Partition,
2332 _lane_id: &LaneId,
2333 _execution_id: &ExecutionId,
2334 _expected_blocking_reason: &str,
2335 _now_ms: TimestampMs,
2336 ) -> Result<(), EngineError> {
2337 Err(EngineError::Unavailable {
2338 op: "unblock_execution",
2339 })
2340 }
2341
2342 /// RFC-016 Stage C — sibling-cancel group drain.
2343 ///
2344 /// After `edge_cancel_dispatcher` has issued
2345 /// [`EngineBackend::cancel_execution`] against every listed
2346 /// sibling of a `(flow_id, downstream_eid)` group, this trait
2347 /// method atomically removes the tuple from the partition-local
2348 /// pending-cancel-groups index and clears the per-group flag +
2349 /// members state.
2350 ///
2351 /// Valkey: `FCALL ff_drain_sibling_cancel_group` (SREM +
2352 /// HDEL in one Lua unit).
2353 /// Postgres: `Unavailable` — PG's Wave-6b
2354 /// `reconcilers::edge_cancel_dispatcher::dispatcher_tick`
2355 /// owns the equivalent row drain inside its own per-group
2356 /// transaction; the Valkey-shaped per-tuple call is not used on
2357 /// the PG scanner path.
2358 /// SQLite: `Unavailable` (mirrors PG; RFC-023 scope).
2359 #[cfg(feature = "core")]
2360 async fn drain_sibling_cancel_group(
2361 &self,
2362 _flow_partition: crate::partition::Partition,
2363 _flow_id: &FlowId,
2364 _downstream_eid: &ExecutionId,
2365 ) -> Result<(), EngineError> {
2366 Err(EngineError::Unavailable {
2367 op: "drain_sibling_cancel_group",
2368 })
2369 }
2370
2371 /// RFC-016 Stage D — sibling-cancel group reconcile (Invariant
2372 /// Q6 safety net).
2373 ///
2374 /// Re-examines a `(flow_id, downstream_eid)` tuple in the
2375 /// partition-local pending-cancel-groups index and returns one
2376 /// of three atomic dispositions — see
2377 /// [`SiblingCancelReconcileAction`] — so the crash-recovery
2378 /// reconciler can drain stale or completed tuples without
2379 /// fighting the Stage-C dispatcher.
2380 ///
2381 /// Valkey: `FCALL ff_reconcile_sibling_cancel_group`.
2382 /// Postgres: `Unavailable` — PG's
2383 /// `reconcilers::edge_cancel_reconciler::reconciler_tick`
2384 /// owns the row-level reconcile inside its own batched tx.
2385 /// SQLite: `Unavailable` (mirrors PG).
2386 #[cfg(feature = "core")]
2387 async fn reconcile_sibling_cancel_group(
2388 &self,
2389 _flow_partition: crate::partition::Partition,
2390 _flow_id: &FlowId,
2391 _downstream_eid: &ExecutionId,
2392 ) -> Result<SiblingCancelReconcileAction, EngineError> {
2393 Err(EngineError::Unavailable {
2394 op: "reconcile_sibling_cancel_group",
2395 })
2396 }
2397
2398 // ── PR-7b Cluster 2b-A — Tally-recompute reconciler scanners ─────
2399 //
2400 // Coarse trait methods: each runs a full scan-and-fix pass over
2401 // one partition. Unlike cluster-2's per-candidate write hooks,
2402 // these scanners do multi-round-trip discovery (SSCAN / HMGET /
2403 // HGETALL / ZSCORE / ZREMRANGEBYSCORE) interleaved with conditional
2404 // writes. Moving the whole pass into the trait impl achieves the
2405 // PR-7b/final contract ("scanner trait leaks no `ferriskey::Client`")
2406 // without atomising operations that are inherently not atomic.
2407 //
2408 // Backend status (all three methods):
2409 //
2410 // - **Valkey:** real scan-loop in the trait impl; identical command
2411 // shape to the pre-routing scanner path.
2412 // - **Postgres:** default `Unavailable`. PG mutates budget / quota
2413 // counters and scheduling state inside SERIALIZABLE transactions,
2414 // and the scheduler evaluates eligibility via live SQL predicates
2415 // on `ff_exec_core` + budget / quota tables — no persistent index
2416 // or counter hash that can drift, hence nothing to reconcile. The
2417 // engine's PG scanner supervisor does not run these paths.
2418 // - **SQLite:** same as Postgres (RFC-023: SQLite runs its own
2419 // scanner supervisor; these FF-owned tally-recompute scanners
2420 // are not registered).
2421
2422 /// Index-reconciler pass — walks `ff:idx:{p:N}:all_executions`
2423 /// and verifies each execution appears in the correct scheduling
2424 /// sorted set (`eligible` / `delayed` / `blocked:*` / `active` /
2425 /// `suspended` / `terminal`) for its current `(lifecycle_phase,
2426 /// eligibility_state, ownership_state)` triple. Phase 1 is
2427 /// log-only; auto-fix is deferred to a later phase (RFC-010 §6.14).
2428 async fn reconcile_execution_index(
2429 &self,
2430 _partition: crate::partition::Partition,
2431 _lanes: &[LaneId],
2432 _filter: &crate::backend::ScannerFilter,
2433 ) -> Result<ReconcileCounts, EngineError> {
2434 Err(EngineError::Unavailable {
2435 op: "reconcile_execution_index",
2436 })
2437 }
2438
2439 /// Budget-reconciler pass — walks `ff:budget:{b:M}:policies_idx`,
2440 /// reads each budget's definition / usage / limits, and reconciles
2441 /// the `breached_at` marker against hard limits. Resetting budgets
2442 /// (non-zero `reset_interval_ms`) are skipped — they are handled
2443 /// by the `budget_reset` scanner (cluster 2). Drops index entries
2444 /// for budgets whose definition hash has been deleted (retention
2445 /// purge / manual). RFC-008 §Budget Reconciliation, RFC-010 §6.5.
2446 async fn reconcile_budget_counters(
2447 &self,
2448 _partition: crate::partition::Partition,
2449 _now_ms: TimestampMs,
2450 ) -> Result<ReconcileCounts, EngineError> {
2451 Err(EngineError::Unavailable {
2452 op: "reconcile_budget_counters",
2453 })
2454 }
2455
2456 /// Quota-reconciler pass — walks `ff:quota:{q:M}:policies_idx`,
2457 /// trims expired entries from rate-limit sliding windows, and
2458 /// recomputes each policy's concurrency counter by walking its
2459 /// `admitted_set` and pruning entries whose admission guard key
2460 /// has TTLed out. RFC-008 §Quota Reconciliation, RFC-010 §6.6.
2461 async fn reconcile_quota_counters(
2462 &self,
2463 _partition: crate::partition::Partition,
2464 _now_ms: TimestampMs,
2465 ) -> Result<ReconcileCounts, EngineError> {
2466 Err(EngineError::Unavailable {
2467 op: "reconcile_quota_counters",
2468 })
2469 }
2470
2471 // ── PR-7b Cluster 2b-B — Projection + retention scanners ──────
2472 //
2473 // Two additional non-FCALL scanners routed through the trait so
2474 // engine-side scanners stop touching `ferriskey::Client` directly.
2475 // Both ride on aggregate reads + derived writes; neither is a thin
2476 // single-FCALL passthrough.
2477
2478 /// Flow-projector scanner hook — sample flow members, derive an
2479 /// aggregate `public_flow_state` + per-state counts, and write them
2480 /// to the flow summary projection. Returns `Ok(true)` when the
2481 /// summary was updated, `Ok(false)` when the flow had no members
2482 /// or the index entry was defensively pruned (core missing).
2483 ///
2484 /// The derived `public_flow_state` written here is distinct from
2485 /// `ff_flow_core.public_flow_state` — the former is a rollup
2486 /// dashboard field, the latter is the authoritative mutation-guard
2487 /// state owned by `create_flow` / `cancel_flow`. See
2488 /// `ff_engine::scanner::flow_projector` module doc for the
2489 /// two-sources contract.
2490 ///
2491 /// # Backend status
2492 ///
2493 /// - **Valkey:** lifts the pre-PR-7b Rust-composed
2494 /// SCARD + SRANDMEMBER + per-member HGET + HSET summary pattern.
2495 /// No new Lua function; the aggregation is inherently
2496 /// multi-round-trip (cross-partition member reads) and atomicity
2497 /// is neither required nor achievable against 256 partitions.
2498 /// - **Postgres:** SELECT aggregates from `ff_exec_core` grouped by
2499 /// `public_state` for the flow's members, INSERT ... ON CONFLICT
2500 /// DO UPDATE into `ff_flow_summary` (migration 0019). One query
2501 /// per flow; partition-local aggregation.
2502 /// - **SQLite:** `Unavailable` per RFC-023 Phase 3.5 (flow summary
2503 /// projection is a shared-deployment dashboard feature; local
2504 /// single-tenant SQLite deployments don't need it).
2505 #[cfg(feature = "core")]
2506 async fn project_flow_summary(
2507 &self,
2508 _partition: crate::partition::Partition,
2509 _flow_id: &FlowId,
2510 _now_ms: TimestampMs,
2511 ) -> Result<bool, EngineError> {
2512 Err(EngineError::Unavailable {
2513 op: "project_flow_summary",
2514 })
2515 }
2516
2517 /// Retention-trimmer scanner hook — delete terminal executions and
2518 /// all their subordinate keys/rows once they are older than the
2519 /// configured retention window. Returns the number of executions
2520 /// actually purged in this call (so the scanner can loop when it
2521 /// hits `batch_size`).
2522 ///
2523 /// Retention trimming is inherently a scan-and-delete loop over
2524 /// time; the trait surface exists to remove engine-side Valkey
2525 /// coupling, **not** to atomise the operation into a single
2526 /// round-trip. Implementations may issue multiple round-trips
2527 /// (e.g. per-execution cascade across sibling tables); the trait
2528 /// contract is "make progress, bounded by batch_size".
2529 ///
2530 /// # Backend status
2531 ///
2532 /// - **Valkey:** lifts the pre-PR-7b ZRANGEBYSCORE + per-exec
2533 /// cascade-delete loop verbatim. Cluster-safe (all keys for one
2534 /// execution live on the same `{p:N}` slot).
2535 /// - **Postgres:** `DELETE FROM ff_exec_core` for terminal rows
2536 /// past the cutoff plus explicit cascade DELETEs on every
2537 /// execution-scoped sibling table (no FK CASCADE in the schema).
2538 /// Single transaction per batch.
2539 /// - **SQLite:** `Unavailable` per RFC-023 Phase 3.5. Single-tenant
2540 /// local deployments typically manage their own DB lifecycle and
2541 /// do not need a retention scanner.
2542 #[cfg(feature = "core")]
2543 async fn trim_retention(
2544 &self,
2545 _partition: crate::partition::Partition,
2546 _lane_id: &LaneId,
2547 _retention_ms: u64,
2548 _now_ms: TimestampMs,
2549 _batch_size: u32,
2550 _filter: &crate::backend::ScannerFilter,
2551 ) -> Result<u32, EngineError> {
2552 Err(EngineError::Unavailable {
2553 op: "trim_retention",
2554 })
2555 }
2556
2557 // ── PR-7b Wave 0a: exec_core field read primitive ──
2558
2559 /// Point-read N fields from the `exec_core` hash for a given
2560 /// execution. Returns a map of field-name → Option<String>
2561 /// (None for fields absent or stored as NULL). Scanner call
2562 /// sites formerly issuing raw `HGET`/`HMGET` on `ExecKeyContext::core()`
2563 /// route through this trait method (cairn #436 / PR-7b Wave 0a).
2564 ///
2565 /// Field values are coerced to String at the trait boundary for
2566 /// wire compatibility with the Valkey HGET shape. Consumers parse
2567 /// specific fields (`lane_id`, `current_attempt_index`, etc.) from
2568 /// the returned strings as needed.
2569 ///
2570 /// - **Valkey:** `HMGET exec_core_key f1 f2 ...`, zipping names to values.
2571 /// - **Postgres:** `SELECT` against `ff_exec_core` with dynamic column
2572 /// extraction; fields in `raw_fields` JSONB are extracted via `->>`.
2573 /// - **SQLite:** equivalent with `json_extract(raw_fields, '$.field')`.
2574 ///
2575 /// Default body returns `Unavailable` so non-v0.13 backends remain
2576 /// compile-compatible.
2577 #[cfg(feature = "core")]
2578 async fn read_exec_core_fields(
2579 &self,
2580 _partition: crate::partition::Partition,
2581 _execution_id: &crate::types::ExecutionId,
2582 _fields: &[&str],
2583 ) -> Result<std::collections::HashMap<String, Option<String>>, EngineError> {
2584 Err(EngineError::Unavailable {
2585 op: "read_exec_core_fields",
2586 })
2587 }
2588
2589 // ── PR-7b Wave 0a: backend clock primitive ──
2590
2591 /// Returns the backend's current wall-clock epoch milliseconds.
2592 ///
2593 /// Used by 15 scanners to compute "due" thresholds before issuing
2594 /// per-partition due-scans. Previously a Valkey-only helper in
2595 /// `ff-engine`'s `scanner::lease_expiry` issuing `TIME`; this
2596 /// trait method is the backend-agnostic replacement (cairn #436 /
2597 /// PR-7b Wave 0a).
2598 ///
2599 /// Every in-tree backend overrides. The default falls back to
2600 /// `SystemTime::now()` so out-of-tree `EngineBackend` impls (e.g.
2601 /// cairn mocks, test doubles) stay source-compatible across v0.12
2602 /// → v0.13.
2603 ///
2604 /// - **Valkey:** `TIME` command.
2605 /// - **Postgres:** `SELECT (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint`
2606 /// (not `now()` — `now()` is the transaction start timestamp,
2607 /// which is stale under any long-running tx; scanners need the
2608 /// true wall-clock read).
2609 /// - **SQLite:** `SELECT CAST((julianday('now') - 2440587.5) * 86400000 AS INTEGER)`.
2610 #[cfg(feature = "core")]
2611 async fn server_time_ms(&self) -> Result<u64, EngineError> {
2612 use std::time::{SystemTime, UNIX_EPOCH};
2613 let d = SystemTime::now()
2614 .duration_since(UNIX_EPOCH)
2615 .map_err(|e| EngineError::Transport {
2616 backend: "system-clock",
2617 source: format!("server_time_ms default: {e}").into(),
2618 })?;
2619 Ok(d.as_millis() as u64)
2620 }
2621}
2622
2623/// RFC-016 Stage D outcome of a single
2624/// [`EngineBackend::reconcile_sibling_cancel_group`] call.
2625///
2626/// Cardinality is intentionally bounded to three so the
2627/// `ff_sibling_cancel_reconcile_total{action}` metric label stays
2628/// closed.
2629#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2630pub enum SiblingCancelReconcileAction {
2631 /// Pending tuple was stale (flag cleared or edgegroup absent);
2632 /// SREM'd / DELETE'd without touching the group.
2633 SremmedStale,
2634 /// Flag still set but every listed sibling is already terminal —
2635 /// an interrupted drain. Flag cleared + tuple drained.
2636 CompletedDrain,
2637 /// Flag set and at least one sibling non-terminal — dispatcher
2638 /// owns this tuple; left untouched.
2639 NoOp,
2640}
2641
2642impl SiblingCancelReconcileAction {
2643 /// Short label for observability (matches the Lua + PG action
2644 /// strings exactly).
2645 pub fn as_str(&self) -> &'static str {
2646 match self {
2647 Self::SremmedStale => "sremmed_stale",
2648 Self::CompletedDrain => "completed_drain",
2649 Self::NoOp => "no_op",
2650 }
2651 }
2652}
2653
2654/// Result of one reconciler scan pass over a single partition.
2655///
2656/// `processed` counts candidates where the reconciler detected a
2657/// correctable condition (drift, breach flip, stale guard). `errors`
2658/// counts transport / parse failures. Scanners sum these into the
2659/// engine's per-pass metrics exactly as they did before PR-7b/2b-A.
2660#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
2661pub struct ReconcileCounts {
2662 /// Items the scanner corrected (drift fix, breach flip, guard
2663 /// eviction). Zero when the partition is healthy.
2664 pub processed: u32,
2665 /// Items the scanner could not process due to transport or parse
2666 /// errors. Non-zero values surface through scanner metrics and
2667 /// should be investigated.
2668 pub errors: u32,
2669}
2670
2671/// Which scanner invoked [`EngineBackend::expire_execution`].
2672///
2673/// The attempt-timeout and execution-deadline scanners share a single
2674/// trait method — their underlying state flip is identical (terminate
2675/// or retry per retry policy); the distinction is purely diagnostic
2676/// (which deadline elapsed) and carried through to the backend so the
2677/// same Lua / SQL path can log or tag appropriately.
2678#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2679pub enum ExpirePhase {
2680 /// Invoked by `attempt_timeout` scanner — the per-attempt
2681 /// wall-clock budget elapsed.
2682 AttemptTimeout,
2683 /// Invoked by `execution_deadline` scanner — the whole-execution
2684 /// wall-clock deadline elapsed.
2685 ExecutionDeadline,
2686}
2687
2688impl ExpirePhase {
2689 /// Short string tag suitable for Lua ARGV or Postgres breadcrumbs.
2690 pub fn as_str(&self) -> &'static str {
2691 match self {
2692 Self::AttemptTimeout => "attempt_timeout",
2693 Self::ExecutionDeadline => "execution_deadline",
2694 }
2695 }
2696}
2697
2698/// Object-safety assertion: `dyn EngineBackend` compiles iff every
2699/// method is dyn-compatible. Kept as a compile-time guard so a future
2700/// trait change that accidentally breaks dyn-safety fails the build
2701/// at this site rather than at every downstream `Arc<dyn
2702/// EngineBackend>` use.
2703#[allow(dead_code)]
2704fn _assert_dyn_compatible(_: &dyn EngineBackend) {}
2705
2706/// Polling interval for [`wait_for_flow_cancellation`]. Tight enough
2707/// that a local single-node cancel cascade observes `cancelled` within
2708/// one or two polls; slack enough that a `WaitIndefinite` caller does
2709/// not hammer `describe_flow` on a live cluster.
2710const CANCEL_WAIT_POLL_INTERVAL: Duration = Duration::from_millis(100);
2711
2712/// Defensive ceiling for [`CancelFlowWait::WaitIndefinite`] — if the
2713/// reconciler cascade has not converged in five minutes, something is
2714/// wedged and returning `Timeout` is strictly more useful than blocking
2715/// forever. RFC-012 §3.1.1 expects real-world cascades to finish within
2716/// `reconciler_interval + grace`, which is orders of magnitude below
2717/// this.
2718const CANCEL_WAIT_INDEFINITE_CEILING: Duration = Duration::from_secs(300);
2719
2720/// Poll `backend.describe_flow(flow_id)` until `public_flow_state` is
2721/// `"cancelled"` or `deadline` elapses.
2722///
2723/// Shared by every backend's `cancel_flow` trait impl that honours
2724/// [`CancelFlowWait::WaitTimeout`] / [`CancelFlowWait::WaitIndefinite`].
2725/// The underlying `cancel_flow` FCALL / SQL transaction flips the
2726/// flow-level state synchronously; member cancellations dispatch
2727/// asynchronously via the reconciler, which also flips
2728/// `public_flow_state` to `cancelled` once the cascade completes. This
2729/// helper waits for that terminal flip.
2730///
2731/// Returns:
2732/// * `Ok(())` once `public_flow_state = "cancelled"` is observed.
2733/// * `Err(EngineError::Timeout { op: "cancel_flow", elapsed })` when
2734/// `deadline` elapses first. `elapsed` is the wait budget (the
2735/// requested timeout), not wall-clock precision.
2736/// * `Err(e)` if `describe_flow` itself errors (propagated).
2737pub async fn wait_for_flow_cancellation<B: EngineBackend + ?Sized>(
2738 backend: &B,
2739 flow_id: &crate::types::FlowId,
2740 deadline: Duration,
2741) -> Result<(), EngineError> {
2742 let start = std::time::Instant::now();
2743 loop {
2744 match backend.describe_flow(flow_id).await? {
2745 Some(snap) if snap.public_flow_state == "cancelled" => return Ok(()),
2746 // `None` (flow removed) is also terminal from the caller's
2747 // perspective — nothing left to wait on.
2748 None => return Ok(()),
2749 Some(_) => {}
2750 }
2751 if start.elapsed() >= deadline {
2752 return Err(EngineError::Timeout {
2753 op: "cancel_flow",
2754 elapsed: deadline,
2755 });
2756 }
2757 tokio::time::sleep(CANCEL_WAIT_POLL_INTERVAL).await;
2758 }
2759}
2760
2761/// Convert a [`CancelFlowWait`] into the deadline passed to
2762/// [`wait_for_flow_cancellation`]. `NoWait` returns `None` — the caller
2763/// must skip the wait entirely.
2764pub fn cancel_flow_wait_deadline(wait: CancelFlowWait) -> Option<Duration> {
2765 // `CancelFlowWait` is `#[non_exhaustive]`; this match lives in the
2766 // defining crate so the exhaustiveness check keeps the compiler
2767 // honest. Future variants must be wired here explicitly.
2768 match wait {
2769 CancelFlowWait::NoWait => None,
2770 CancelFlowWait::WaitTimeout(d) => Some(d),
2771 CancelFlowWait::WaitIndefinite => Some(CANCEL_WAIT_INDEFINITE_CEILING),
2772 }
2773}
2774
2775/// Validate a caller-namespaced tag key against the regex
2776/// `^[a-z][a-z0-9_]*\.[a-z0-9_][a-z0-9_.]*$`.
2777///
2778/// The Rust trait-side check is **stricter than the Valkey Lua
2779/// contracts** (`ff_set_execution_tags` / `ff_set_flow_tags`), which
2780/// only check `^[a-z][a-z0-9_]*%.[^.]` — namespace prefix + first
2781/// suffix char, with the rest of the suffix unvalidated. Every
2782/// backend-side impl (`ff-backend-{valkey,postgres,sqlite}`) calls
2783/// this helper **before** the wire hop so the effective parity
2784/// contract is this full-key regex; Valkey's Lua is an additional,
2785/// more permissive server-side guard, not the parity-of-record.
2786///
2787/// A key passes iff:
2788///
2789/// * it begins with an ASCII lowercase letter;
2790/// * all characters up to the first `.` are lowercase alnum or `_`;
2791/// * the first `.` is followed by at least one non-`.` character
2792/// (so `cairn.` and `cairn..x` fail — the `<field>` must be non-empty).
2793///
2794/// Shared entry point for [`EngineBackend::set_execution_tag`] /
2795/// [`EngineBackend::set_flow_tag`] / [`EngineBackend::get_execution_tag`] /
2796/// [`EngineBackend::get_flow_tag`] so every backend rejects the same
2797/// keyspace. On rejection, returns
2798/// [`EngineError::Validation { kind: ValidationKind::InvalidInput, .. }`](crate::engine_error::EngineError::Validation)
2799/// with the offending key in `detail`.
2800///
2801/// `#[allow(clippy::result_large_err)]` — `EngineError` is the uniform
2802/// error across the whole `EngineBackend` trait surface (see trait method
2803/// signatures above); boxing it here alone would introduce a
2804/// gratuitous signature deviation. Clippy 1.95 flags free functions but
2805/// not trait methods; this function mirrors the trait-method convention.
2806#[allow(clippy::result_large_err)]
2807pub fn validate_tag_key(key: &str) -> Result<(), EngineError> {
2808 use crate::engine_error::ValidationKind;
2809
2810 let bad = || EngineError::Validation {
2811 kind: ValidationKind::InvalidInput,
2812 detail: format!(
2813 "invalid tag key: {key:?} (must match ^[a-z][a-z0-9_]*\\.[a-z0-9_][a-z0-9_.]*$)"
2814 ),
2815 };
2816
2817 let mut chars = key.chars();
2818 let first = chars.next().ok_or_else(bad)?;
2819 if !first.is_ascii_lowercase() {
2820 return Err(bad());
2821 }
2822 // Phase 1: consume the namespace prefix up to (but not including) the first `.`.
2823 // Chars must be `[a-z0-9_]`.
2824 let mut saw_dot = false;
2825 for c in chars.by_ref() {
2826 if c == '.' {
2827 saw_dot = true;
2828 break;
2829 }
2830 if !(c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_') {
2831 return Err(bad());
2832 }
2833 }
2834 if !saw_dot {
2835 return Err(bad());
2836 }
2837 // Phase 2: the first char after the first `.` must exist and be a
2838 // non-dot member char `[a-z0-9_]` (matches Valkey's Lua regex
2839 // `^[a-z][a-z0-9_]*%.[^.]` — a second consecutive dot is rejected).
2840 let second = chars.next().ok_or_else(bad)?;
2841 if !(second.is_ascii_lowercase() || second.is_ascii_digit() || second == '_') {
2842 return Err(bad());
2843 }
2844 // Phase 3 (Finding 2 tightening): every remaining char MUST be
2845 // `[a-z0-9_.]`. Before this, the suffix was unvalidated beyond
2846 // its first char — so `cairn.foo bar`, `cairn.Foo`,
2847 // `cairn.foo"bar`, `cairn.foo-bar` all passed trait-side validation
2848 // even though they would break SQLite JSON-path quoting, the Lua
2849 // HSET field-name conventions, or consumer grep patterns. Valkey's
2850 // Lua prefix regex is identically permissive on the suffix; this
2851 // tightens both layers from the Rust side. Dots in the suffix
2852 // remain legal (`app.sub.field` is valid) to preserve the existing
2853 // accepted shape.
2854 for c in chars {
2855 if !(c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_' || c == '.') {
2856 return Err(bad());
2857 }
2858 }
2859 Ok(())
2860}
2861
2862#[cfg(test)]
2863mod tests {
2864 use super::*;
2865
2866 /// A zero-state backend stub used to exercise the default
2867 /// `capabilities()` impl without pulling in a real
2868 /// transport. Only the default method is under test here; every
2869 /// other method is unreachable on this type.
2870 struct DefaultBackend;
2871
2872 #[async_trait]
2873 impl EngineBackend for DefaultBackend {
2874 async fn claim(
2875 &self,
2876 _lane: &LaneId,
2877 _capabilities: &CapabilitySet,
2878 _policy: ClaimPolicy,
2879 ) -> Result<Option<Handle>, EngineError> {
2880 unreachable!()
2881 }
2882 async fn renew(&self, _handle: &Handle) -> Result<LeaseRenewal, EngineError> {
2883 unreachable!()
2884 }
2885 async fn progress(
2886 &self,
2887 _handle: &Handle,
2888 _percent: Option<u8>,
2889 _message: Option<String>,
2890 ) -> Result<(), EngineError> {
2891 unreachable!()
2892 }
2893 async fn append_frame(
2894 &self,
2895 _handle: &Handle,
2896 _frame: Frame,
2897 ) -> Result<AppendFrameOutcome, EngineError> {
2898 unreachable!()
2899 }
2900 async fn complete(
2901 &self,
2902 _handle: &Handle,
2903 _payload: Option<Vec<u8>>,
2904 ) -> Result<(), EngineError> {
2905 unreachable!()
2906 }
2907 async fn fail(
2908 &self,
2909 _handle: &Handle,
2910 _reason: FailureReason,
2911 _classification: FailureClass,
2912 ) -> Result<FailOutcome, EngineError> {
2913 unreachable!()
2914 }
2915 async fn cancel(&self, _handle: &Handle, _reason: &str) -> Result<(), EngineError> {
2916 unreachable!()
2917 }
2918 async fn suspend(
2919 &self,
2920 _handle: &Handle,
2921 _args: SuspendArgs,
2922 ) -> Result<SuspendOutcome, EngineError> {
2923 unreachable!()
2924 }
2925 async fn create_waitpoint(
2926 &self,
2927 _handle: &Handle,
2928 _waitpoint_key: &str,
2929 _expires_in: Duration,
2930 ) -> Result<PendingWaitpoint, EngineError> {
2931 unreachable!()
2932 }
2933 async fn observe_signals(
2934 &self,
2935 _handle: &Handle,
2936 ) -> Result<Vec<ResumeSignal>, EngineError> {
2937 unreachable!()
2938 }
2939 async fn claim_from_resume_grant(
2940 &self,
2941 _token: ResumeToken,
2942 ) -> Result<Option<Handle>, EngineError> {
2943 unreachable!()
2944 }
2945 async fn delay(
2946 &self,
2947 _handle: &Handle,
2948 _delay_until: TimestampMs,
2949 ) -> Result<(), EngineError> {
2950 unreachable!()
2951 }
2952 async fn wait_children(&self, _handle: &Handle) -> Result<(), EngineError> {
2953 unreachable!()
2954 }
2955 async fn describe_execution(
2956 &self,
2957 _id: &ExecutionId,
2958 ) -> Result<Option<ExecutionSnapshot>, EngineError> {
2959 unreachable!()
2960 }
2961 async fn read_execution_context(
2962 &self,
2963 _execution_id: &ExecutionId,
2964 ) -> Result<ExecutionContext, EngineError> {
2965 Ok(ExecutionContext::new(
2966 Vec::new(),
2967 String::new(),
2968 std::collections::HashMap::new(),
2969 ))
2970 }
2971 async fn read_current_attempt_index(
2972 &self,
2973 _execution_id: &ExecutionId,
2974 ) -> Result<AttemptIndex, EngineError> {
2975 Ok(AttemptIndex::new(0))
2976 }
2977 async fn read_total_attempt_count(
2978 &self,
2979 _execution_id: &ExecutionId,
2980 ) -> Result<AttemptIndex, EngineError> {
2981 Ok(AttemptIndex::new(0))
2982 }
2983 async fn describe_flow(
2984 &self,
2985 _id: &FlowId,
2986 ) -> Result<Option<FlowSnapshot>, EngineError> {
2987 unreachable!()
2988 }
2989 #[cfg(feature = "core")]
2990 async fn list_edges(
2991 &self,
2992 _flow_id: &FlowId,
2993 _direction: EdgeDirection,
2994 ) -> Result<Vec<EdgeSnapshot>, EngineError> {
2995 unreachable!()
2996 }
2997 #[cfg(feature = "core")]
2998 async fn describe_edge(
2999 &self,
3000 _flow_id: &FlowId,
3001 _edge_id: &EdgeId,
3002 ) -> Result<Option<EdgeSnapshot>, EngineError> {
3003 unreachable!()
3004 }
3005 #[cfg(feature = "core")]
3006 async fn resolve_execution_flow_id(
3007 &self,
3008 _eid: &ExecutionId,
3009 ) -> Result<Option<FlowId>, EngineError> {
3010 unreachable!()
3011 }
3012 #[cfg(feature = "core")]
3013 async fn list_flows(
3014 &self,
3015 _partition: PartitionKey,
3016 _cursor: Option<FlowId>,
3017 _limit: usize,
3018 ) -> Result<ListFlowsPage, EngineError> {
3019 unreachable!()
3020 }
3021 #[cfg(feature = "core")]
3022 async fn list_lanes(
3023 &self,
3024 _cursor: Option<LaneId>,
3025 _limit: usize,
3026 ) -> Result<ListLanesPage, EngineError> {
3027 unreachable!()
3028 }
3029 #[cfg(feature = "core")]
3030 async fn list_suspended(
3031 &self,
3032 _partition: PartitionKey,
3033 _cursor: Option<ExecutionId>,
3034 _limit: usize,
3035 ) -> Result<ListSuspendedPage, EngineError> {
3036 unreachable!()
3037 }
3038 #[cfg(feature = "core")]
3039 async fn list_executions(
3040 &self,
3041 _partition: PartitionKey,
3042 _cursor: Option<ExecutionId>,
3043 _limit: usize,
3044 ) -> Result<ListExecutionsPage, EngineError> {
3045 unreachable!()
3046 }
3047 #[cfg(feature = "core")]
3048 async fn deliver_signal(
3049 &self,
3050 _args: DeliverSignalArgs,
3051 ) -> Result<DeliverSignalResult, EngineError> {
3052 unreachable!()
3053 }
3054 #[cfg(feature = "core")]
3055 async fn claim_resumed_execution(
3056 &self,
3057 _args: ClaimResumedExecutionArgs,
3058 ) -> Result<ClaimResumedExecutionResult, EngineError> {
3059 unreachable!()
3060 }
3061 async fn cancel_flow(
3062 &self,
3063 _id: &FlowId,
3064 _policy: CancelFlowPolicy,
3065 _wait: CancelFlowWait,
3066 ) -> Result<CancelFlowResult, EngineError> {
3067 unreachable!()
3068 }
3069 #[cfg(feature = "core")]
3070 async fn set_edge_group_policy(
3071 &self,
3072 _flow_id: &FlowId,
3073 _downstream_execution_id: &ExecutionId,
3074 _policy: crate::contracts::EdgeDependencyPolicy,
3075 ) -> Result<crate::contracts::SetEdgeGroupPolicyResult, EngineError> {
3076 unreachable!()
3077 }
3078 async fn report_usage(
3079 &self,
3080 _handle: &Handle,
3081 _budget: &BudgetId,
3082 _dimensions: crate::backend::UsageDimensions,
3083 ) -> Result<ReportUsageResult, EngineError> {
3084 unreachable!()
3085 }
3086 #[cfg(feature = "streaming")]
3087 async fn read_stream(
3088 &self,
3089 _execution_id: &ExecutionId,
3090 _attempt_index: AttemptIndex,
3091 _from: StreamCursor,
3092 _to: StreamCursor,
3093 _count_limit: u64,
3094 ) -> Result<StreamFrames, EngineError> {
3095 unreachable!()
3096 }
3097 #[cfg(feature = "streaming")]
3098 async fn tail_stream(
3099 &self,
3100 _execution_id: &ExecutionId,
3101 _attempt_index: AttemptIndex,
3102 _after: StreamCursor,
3103 _block_ms: u64,
3104 _count_limit: u64,
3105 _visibility: TailVisibility,
3106 ) -> Result<StreamFrames, EngineError> {
3107 unreachable!()
3108 }
3109 #[cfg(feature = "streaming")]
3110 async fn read_summary(
3111 &self,
3112 _execution_id: &ExecutionId,
3113 _attempt_index: AttemptIndex,
3114 ) -> Result<Option<SummaryDocument>, EngineError> {
3115 unreachable!()
3116 }
3117 }
3118
3119 /// The default `capabilities()` impl returns a value tagged
3120 /// `family = "unknown"` with every `supports.*` bool false, so
3121 /// pre-RFC-018 out-of-tree backends keep compiling and consumers
3122 /// can distinguish "backend predates RFC-018" from "backend
3123 /// reports concrete bools." Every concrete in-tree backend
3124 /// overrides.
3125 #[test]
3126 fn default_capabilities_is_unknown_family_all_false() {
3127 let b = DefaultBackend;
3128 let caps = b.capabilities();
3129 assert_eq!(caps.identity.family, "unknown");
3130 assert_eq!(
3131 caps.identity.version,
3132 crate::capability::Version::new(0, 0, 0)
3133 );
3134 assert_eq!(caps.identity.rfc017_stage, "unknown");
3135 // Every field false on the default (matches `Supports::none()`).
3136 assert_eq!(caps.supports, crate::capability::Supports::none());
3137 }
3138
3139 // ── resolve_dependency (PR-7b Step 0) ──
3140
3141 /// Default impl returns `Unavailable { op: "resolve_dependency" }`
3142 /// so pre-migration backends surface a typed Unsupported-grade
3143 /// error rather than panic. Both cluster 2's dependency_reconciler
3144 /// and cluster 4's completion_listener route through this method;
3145 /// the default is the safety net for non-Valkey deployments.
3146 #[cfg(feature = "core")]
3147 #[tokio::test]
3148 async fn default_resolve_dependency_is_unavailable() {
3149 use crate::contracts::ResolveDependencyArgs;
3150 use crate::partition::{Partition, PartitionFamily};
3151 use crate::types::{AttemptIndex, EdgeId, FlowId, LaneId};
3152
3153 let b = DefaultBackend;
3154 let partition = Partition {
3155 family: PartitionFamily::Flow,
3156 index: 0,
3157 };
3158 let args = ResolveDependencyArgs::new(
3159 partition,
3160 FlowId::parse("11111111-1111-1111-1111-111111111111").unwrap(),
3161 ExecutionId::parse("{fp:0}:22222222-2222-2222-2222-222222222222").unwrap(),
3162 ExecutionId::parse("{fp:0}:33333333-3333-3333-3333-333333333333").unwrap(),
3163 EdgeId::parse("44444444-4444-4444-4444-444444444444").unwrap(),
3164 LaneId::new("default"),
3165 AttemptIndex::new(0),
3166 "success".to_owned(),
3167 TimestampMs::now(),
3168 );
3169 match b.resolve_dependency(args).await {
3170 Err(EngineError::Unavailable { op }) => {
3171 assert_eq!(op, "resolve_dependency");
3172 }
3173 other => panic!("expected Unavailable, got {other:?}"),
3174 }
3175 }
3176
3177 // ── cascade_completion (PR-7b Cluster 4) ──
3178
3179 /// Default impl returns `Unavailable { op: "cascade_completion" }`
3180 /// so pre-migration / non-core builds surface a typed error rather
3181 /// than panic. The push-based completion listener routes through
3182 /// this method; the default is the safety net for deployments
3183 /// whose backend doesn't cascade (e.g. SQLite in the current
3184 /// Wave 9 scope).
3185 #[cfg(feature = "core")]
3186 #[tokio::test]
3187 async fn default_cascade_completion_is_unavailable() {
3188 use crate::backend::CompletionPayload;
3189
3190 let b = DefaultBackend;
3191 let eid = ExecutionId::parse(
3192 "{fp:0}:66666666-6666-6666-6666-666666666666",
3193 )
3194 .unwrap();
3195 let payload = CompletionPayload::new(eid, "success", None, TimestampMs::now());
3196 match b.cascade_completion(&payload).await {
3197 Err(EngineError::Unavailable { op }) => {
3198 assert_eq!(op, "cascade_completion");
3199 }
3200 other => panic!("expected Unavailable, got {other:?}"),
3201 }
3202 }
3203
3204 // ── unblock_execution (PR-7b Cluster 2) ──
3205
3206 /// Default impl returns `Unavailable { op: "unblock_execution" }`
3207 /// so non-Valkey deployments get a typed `Unavailable` rather than
3208 /// a panic. The engine scanner loop on PG/SQLite skips this path
3209 /// entirely (scheduler eligibility is re-evaluated live via SQL
3210 /// predicates), but a stray direct call must still fail gracefully.
3211 #[cfg(feature = "core")]
3212 #[tokio::test]
3213 async fn default_unblock_execution_is_unavailable() {
3214 use crate::partition::{Partition, PartitionFamily};
3215
3216 let b = DefaultBackend;
3217 let partition = Partition {
3218 family: PartitionFamily::Execution,
3219 index: 0,
3220 };
3221 let eid = ExecutionId::parse(
3222 "{fp:0}:55555555-5555-5555-5555-555555555555",
3223 )
3224 .unwrap();
3225 let lane = LaneId::new("default");
3226 match b
3227 .unblock_execution(
3228 partition,
3229 &lane,
3230 &eid,
3231 "waiting_for_budget",
3232 TimestampMs::from_millis(0),
3233 )
3234 .await
3235 {
3236 Err(EngineError::Unavailable { op }) => {
3237 assert_eq!(op, "unblock_execution");
3238 }
3239 other => panic!("expected Unavailable, got {other:?}"),
3240 }
3241 }
3242
3243 // ── project_flow_summary (PR-7b Cluster 2b-B) ──
3244
3245 /// Default impl returns `Unavailable { op: "project_flow_summary" }`
3246 /// so non-Valkey deployments get a typed `Unavailable` rather than
3247 /// a panic. SQLite rides the default per RFC-023 Phase 3.5.
3248 #[cfg(feature = "core")]
3249 #[tokio::test]
3250 async fn default_project_flow_summary_is_unavailable() {
3251 use crate::partition::{Partition, PartitionFamily};
3252 use crate::types::FlowId;
3253
3254 let b = DefaultBackend;
3255 let partition = Partition {
3256 family: PartitionFamily::Flow,
3257 index: 0,
3258 };
3259 let fid = FlowId::parse("11111111-1111-1111-1111-111111111111").unwrap();
3260 match b
3261 .project_flow_summary(partition, &fid, TimestampMs::from_millis(0))
3262 .await
3263 {
3264 Err(EngineError::Unavailable { op }) => {
3265 assert_eq!(op, "project_flow_summary");
3266 }
3267 other => panic!("expected Unavailable, got {other:?}"),
3268 }
3269 }
3270
3271 // ── trim_retention (PR-7b Cluster 2b-B) ──
3272
3273 /// Default impl returns `Unavailable { op: "trim_retention" }` so
3274 /// non-Valkey deployments get a typed `Unavailable` rather than a
3275 /// panic. SQLite rides the default per RFC-023 Phase 3.5.
3276 #[cfg(feature = "core")]
3277 #[tokio::test]
3278 async fn default_trim_retention_is_unavailable() {
3279 use crate::partition::{Partition, PartitionFamily};
3280
3281 let b = DefaultBackend;
3282 let partition = Partition {
3283 family: PartitionFamily::Execution,
3284 index: 0,
3285 };
3286 let lane = LaneId::new("default");
3287 match b
3288 .trim_retention(
3289 partition,
3290 &lane,
3291 60_000,
3292 TimestampMs::from_millis(0),
3293 20,
3294 &crate::backend::ScannerFilter::NOOP,
3295 )
3296 .await
3297 {
3298 Err(EngineError::Unavailable { op }) => {
3299 assert_eq!(op, "trim_retention");
3300 }
3301 other => panic!("expected Unavailable, got {other:?}"),
3302 }
3303 }
3304
3305 // ── read_exec_core_fields (PR-7b Wave 0a) ──
3306
3307 /// Default impl returns `Unavailable { op: "read_exec_core_fields" }`
3308 /// so out-of-tree backends that pre-date v0.13 keep compiling while
3309 /// surfacing a typed error on call. Empty-field input short-circuits
3310 /// to `Ok(empty map)` in all in-tree impls; the default is triggered
3311 /// only when the trait method itself is un-overridden.
3312 #[cfg(feature = "core")]
3313 #[tokio::test]
3314 async fn default_read_exec_core_fields_is_unavailable() {
3315 use crate::partition::{Partition, PartitionFamily};
3316
3317 let b = DefaultBackend;
3318 let partition = Partition {
3319 family: PartitionFamily::Execution,
3320 index: 0,
3321 };
3322 let eid = ExecutionId::parse(
3323 "{fp:0}:66666666-6666-6666-6666-666666666666",
3324 )
3325 .unwrap();
3326 match b
3327 .read_exec_core_fields(partition, &eid, &["lane_id"])
3328 .await
3329 {
3330 Err(EngineError::Unavailable { op }) => {
3331 assert_eq!(op, "read_exec_core_fields");
3332 }
3333 other => panic!("expected Unavailable, got {other:?}"),
3334 }
3335 }
3336
3337 // ── validate_tag_key (issue #433) ──
3338
3339 #[test]
3340 fn validate_tag_key_accepts_valid() {
3341 for k in [
3342 "cairn.session_id",
3343 "cairn.project",
3344 "a.b",
3345 "a1_2.x",
3346 "app.sub.field",
3347 "x.y_z",
3348 ] {
3349 validate_tag_key(k).unwrap_or_else(|e| panic!("{k:?} should pass: {e:?}"));
3350 }
3351 }
3352
3353 #[test]
3354 fn validate_tag_key_rejects_invalid() {
3355 for k in [
3356 "", // empty
3357 "Cairn.x", // uppercase first
3358 "1cairn.x", // leading digit
3359 "cairn", // no dot
3360 "cairn.", // empty suffix
3361 "cairn..x", // dot immediately after first dot
3362 ".cairn", // leading dot
3363 "cair n.x", // space before dot
3364 "ca-irn.x", // hyphen in prefix
3365 // Finding 2 tightening — suffix now fully validated.
3366 "cairn.Foo", // uppercase in suffix
3367 "cairn.foo bar", // space in suffix
3368 "cairn.foo\"bar", // double-quote in suffix (would break SQLite JSON-path quoting)
3369 "cairn.foo-bar", // hyphen in suffix
3370 "cairn.foo\\bar", // backslash in suffix
3371 ] {
3372 let err = validate_tag_key(k)
3373 .err()
3374 .unwrap_or_else(|| panic!("{k:?} should fail"));
3375 match err {
3376 EngineError::Validation {
3377 kind: crate::engine_error::ValidationKind::InvalidInput,
3378 ..
3379 } => {}
3380 other => panic!("{k:?}: unexpected err {other:?}"),
3381 }
3382 }
3383 }
3384
3385 // ── cairn #389: service-layer FCALL trait-method defaults ──
3386 //
3387 // Each method MUST return `EngineError::Unavailable { op: "<name>" }`
3388 // on backends that haven't overridden it, so out-of-tree backends
3389 // keep compiling and consumers get a terminal-classified error
3390 // rather than a panic. Mirrors the precedent used by
3391 // `issue_reclaim_grant` / `reclaim_execution` / `suspend_by_triple`.
3392 //
3393 // Feature-gated on `core` because the methods under test only
3394 // exist on the trait under that feature — matches the precedent
3395 // of `default_resolve_dependency_is_unavailable` above.
3396
3397 #[cfg(feature = "core")]
3398 #[tokio::test]
3399 async fn default_complete_execution_is_unavailable() {
3400 use crate::contracts::CompleteExecutionArgs;
3401 use crate::types::{ExecutionId, FlowId};
3402 let b = DefaultBackend;
3403 let config = crate::partition::PartitionConfig::default();
3404 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3405 let args = CompleteExecutionArgs {
3406 execution_id: eid,
3407 fence: None,
3408 attempt_index: AttemptIndex::new(0),
3409 result_payload: None,
3410 result_encoding: None,
3411 source: crate::types::CancelSource::default(),
3412 now: TimestampMs::from_millis(0),
3413 };
3414 match b.complete_execution(args).await.unwrap_err() {
3415 EngineError::Unavailable { op } => assert_eq!(op, "complete_execution"),
3416 other => panic!("expected Unavailable, got {other:?}"),
3417 }
3418 }
3419
3420 #[cfg(feature = "core")]
3421 #[tokio::test]
3422 async fn default_fail_execution_is_unavailable() {
3423 use crate::contracts::FailExecutionArgs;
3424 use crate::types::{ExecutionId, FlowId};
3425 let b = DefaultBackend;
3426 let config = crate::partition::PartitionConfig::default();
3427 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3428 let args = FailExecutionArgs {
3429 execution_id: eid,
3430 fence: None,
3431 attempt_index: AttemptIndex::new(0),
3432 failure_reason: String::new(),
3433 failure_category: String::new(),
3434 retry_policy_json: String::new(),
3435 next_attempt_policy_json: String::new(),
3436 source: crate::types::CancelSource::default(),
3437 };
3438 match b.fail_execution(args).await.unwrap_err() {
3439 EngineError::Unavailable { op } => assert_eq!(op, "fail_execution"),
3440 other => panic!("expected Unavailable, got {other:?}"),
3441 }
3442 }
3443
3444 #[cfg(feature = "core")]
3445 #[tokio::test]
3446 async fn default_renew_lease_is_unavailable() {
3447 use crate::contracts::RenewLeaseArgs;
3448 use crate::types::{ExecutionId, FlowId};
3449 let b = DefaultBackend;
3450 let config = crate::partition::PartitionConfig::default();
3451 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3452 let args = RenewLeaseArgs {
3453 execution_id: eid,
3454 attempt_index: AttemptIndex::new(0),
3455 fence: None,
3456 lease_ttl_ms: 1_000,
3457 lease_history_grace_ms: 60_000,
3458 };
3459 match b.renew_lease(args).await.unwrap_err() {
3460 EngineError::Unavailable { op } => assert_eq!(op, "renew_lease"),
3461 other => panic!("expected Unavailable, got {other:?}"),
3462 }
3463 }
3464
3465 #[cfg(feature = "core")]
3466 #[tokio::test]
3467 async fn default_resume_execution_is_unavailable() {
3468 use crate::contracts::ResumeExecutionArgs;
3469 use crate::types::{ExecutionId, FlowId};
3470 let b = DefaultBackend;
3471 let config = crate::partition::PartitionConfig::default();
3472 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3473 let args = ResumeExecutionArgs {
3474 execution_id: eid,
3475 trigger_type: "signal".to_owned(),
3476 resume_delay_ms: 0,
3477 };
3478 match b.resume_execution(args).await.unwrap_err() {
3479 EngineError::Unavailable { op } => assert_eq!(op, "resume_execution"),
3480 other => panic!("expected Unavailable, got {other:?}"),
3481 }
3482 }
3483
3484 #[cfg(feature = "core")]
3485 #[tokio::test]
3486 async fn default_check_admission_is_unavailable() {
3487 use crate::contracts::CheckAdmissionArgs;
3488 use crate::types::{ExecutionId, FlowId, QuotaPolicyId};
3489 let b = DefaultBackend;
3490 let config = crate::partition::PartitionConfig::default();
3491 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3492 let args = CheckAdmissionArgs {
3493 execution_id: eid,
3494 now: TimestampMs::from_millis(0),
3495 window_seconds: 60,
3496 rate_limit: 10,
3497 concurrency_cap: 1,
3498 jitter_ms: None,
3499 };
3500 let qid = QuotaPolicyId::new();
3501 match b
3502 .check_admission(&qid, "default", args)
3503 .await
3504 .unwrap_err()
3505 {
3506 EngineError::Unavailable { op } => assert_eq!(op, "check_admission"),
3507 other => panic!("expected Unavailable, got {other:?}"),
3508 }
3509 }
3510
3511 #[cfg(feature = "core")]
3512 #[tokio::test]
3513 async fn default_evaluate_flow_eligibility_is_unavailable() {
3514 use crate::contracts::EvaluateFlowEligibilityArgs;
3515 use crate::types::{ExecutionId, FlowId};
3516 let b = DefaultBackend;
3517 let config = crate::partition::PartitionConfig::default();
3518 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3519 let args = EvaluateFlowEligibilityArgs { execution_id: eid };
3520 match b.evaluate_flow_eligibility(args).await.unwrap_err() {
3521 EngineError::Unavailable { op } => assert_eq!(op, "evaluate_flow_eligibility"),
3522 other => panic!("expected Unavailable, got {other:?}"),
3523 }
3524 }
3525
3526 // ── Cluster 2b-A tally-recompute reconcilers ──
3527
3528 #[cfg(feature = "core")]
3529 #[tokio::test]
3530 async fn default_reconcile_execution_index_is_unavailable() {
3531 use crate::backend::ScannerFilter;
3532 use crate::partition::{Partition, PartitionFamily};
3533 let b = DefaultBackend;
3534 let partition = Partition { family: PartitionFamily::Execution, index: 0 };
3535 let lanes = [LaneId::new("default")];
3536 let filter = ScannerFilter::default();
3537 match b
3538 .reconcile_execution_index(partition, &lanes, &filter)
3539 .await
3540 .unwrap_err()
3541 {
3542 EngineError::Unavailable { op } => assert_eq!(op, "reconcile_execution_index"),
3543 other => panic!("expected Unavailable, got {other:?}"),
3544 }
3545 }
3546
3547 #[cfg(feature = "core")]
3548 #[tokio::test]
3549 async fn default_reconcile_budget_counters_is_unavailable() {
3550 use crate::partition::{Partition, PartitionFamily};
3551 let b = DefaultBackend;
3552 let partition = Partition { family: PartitionFamily::Budget, index: 0 };
3553 match b
3554 .reconcile_budget_counters(partition, TimestampMs::from_millis(0))
3555 .await
3556 .unwrap_err()
3557 {
3558 EngineError::Unavailable { op } => assert_eq!(op, "reconcile_budget_counters"),
3559 other => panic!("expected Unavailable, got {other:?}"),
3560 }
3561 }
3562
3563 #[cfg(feature = "core")]
3564 #[tokio::test]
3565 async fn default_reconcile_quota_counters_is_unavailable() {
3566 use crate::partition::{Partition, PartitionFamily};
3567 let b = DefaultBackend;
3568 let partition = Partition { family: PartitionFamily::Quota, index: 0 };
3569 match b
3570 .reconcile_quota_counters(partition, TimestampMs::from_millis(0))
3571 .await
3572 .unwrap_err()
3573 {
3574 EngineError::Unavailable { op } => assert_eq!(op, "reconcile_quota_counters"),
3575 other => panic!("expected Unavailable, got {other:?}"),
3576 }
3577 }
3578
3579 // ── #454 trait-additions default-impl tests (4) ────────────
3580
3581 #[cfg(feature = "core")]
3582 #[tokio::test]
3583 async fn default_record_spend_is_unavailable() {
3584 use crate::contracts::RecordSpendArgs;
3585 use crate::types::{BudgetId, ExecutionId, FlowId};
3586 let b = DefaultBackend;
3587 let config = crate::partition::PartitionConfig::default();
3588 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3589 let args = RecordSpendArgs::new(
3590 BudgetId::new(),
3591 eid,
3592 std::collections::BTreeMap::new(),
3593 "k",
3594 );
3595 match b.record_spend(args).await.unwrap_err() {
3596 EngineError::Unavailable { op } => assert_eq!(op, "record_spend"),
3597 other => panic!("expected Unavailable, got {other:?}"),
3598 }
3599 }
3600
3601 #[cfg(feature = "core")]
3602 #[tokio::test]
3603 async fn default_release_budget_is_unavailable() {
3604 use crate::contracts::ReleaseBudgetArgs;
3605 use crate::types::{BudgetId, ExecutionId, FlowId};
3606 let b = DefaultBackend;
3607 let config = crate::partition::PartitionConfig::default();
3608 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3609 let args = ReleaseBudgetArgs::new(BudgetId::new(), eid);
3610 match b.release_budget(args).await.unwrap_err() {
3611 EngineError::Unavailable { op } => assert_eq!(op, "release_budget"),
3612 other => panic!("expected Unavailable, got {other:?}"),
3613 }
3614 }
3615
3616 #[cfg(feature = "core")]
3617 #[tokio::test]
3618 async fn default_deliver_approval_signal_is_unavailable() {
3619 use crate::contracts::DeliverApprovalSignalArgs;
3620 use crate::types::{ExecutionId, FlowId, LaneId, WaitpointId};
3621 let b = DefaultBackend;
3622 let config = crate::partition::PartitionConfig::default();
3623 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3624 let args = DeliverApprovalSignalArgs::new(
3625 eid,
3626 LaneId::new("default"),
3627 WaitpointId::new(),
3628 "approved",
3629 "sfx",
3630 1_000,
3631 Some(100),
3632 Some(10),
3633 );
3634 match b.deliver_approval_signal(args).await.unwrap_err() {
3635 EngineError::Unavailable { op } => assert_eq!(op, "deliver_approval_signal"),
3636 other => panic!("expected Unavailable, got {other:?}"),
3637 }
3638 }
3639
3640 #[cfg(feature = "core")]
3641 #[tokio::test]
3642 async fn default_issue_grant_and_claim_is_unavailable() {
3643 use crate::contracts::IssueGrantAndClaimArgs;
3644 use crate::types::{ExecutionId, FlowId, LaneId};
3645 let b = DefaultBackend;
3646 let config = crate::partition::PartitionConfig::default();
3647 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3648 let args = IssueGrantAndClaimArgs::new(eid, LaneId::new("default"), 30_000);
3649 match b.issue_grant_and_claim(args).await.unwrap_err() {
3650 EngineError::Unavailable { op } => assert_eq!(op, "issue_grant_and_claim"),
3651 other => panic!("expected Unavailable, got {other:?}"),
3652 }
3653 }
3654}