ff_core/engine_backend.rs
1//! The `EngineBackend` trait — abstracting FlowFabric's write surface.
2//!
3//! **RFC-012 Stage 1a:** this is the trait landing. The
4//! Valkey-backed impl lives in `ff-backend-valkey`; future backends
5//! (Postgres) add a sibling crate with their own impl. ff-sdk's
6//! `FlowFabricWorker` gains `connect_with(backend)` /
7//! `backend(&self)` accessors so consumers that want to bring their
8//! own backend (tests, future non-Valkey deployments) can hand one
9//! in. The hot-path migration of `ClaimedTask` / `FlowFabricWorker`
10//! to forward through the trait lands across Stages 1b-1d.
11//!
12//! # Object safety
13//!
14//! `EngineBackend` is object-safe: all methods are `async fn` behind
15//! `#[async_trait]` and take `&self`. Consumers can hold
16//! `Arc<dyn EngineBackend>` for heterogenous-backend deployments.
17//! The trait is `Send + Sync + 'static` per RFC-012 §4.1; every impl
18//! must honour that bound.
19//!
20//! # Error surface
21//!
22//! Every method returns [`Result<_, EngineError>`]. `EngineError`'s
23//! `Transport` variant carries a boxed `dyn Error + Send + Sync`;
24//! Valkey-backed transport faults box a
25//! `ff_script::error::ScriptError` (downcast via
26//! `ff_script::engine_error_ext::transport_script_ref`). Other
27//! backends box their native error type and set the `backend` tag
28//! accordingly.
29//!
30//! # Atomicity contract
31//!
32//! Per-op state transitions MUST be atomic (RFC-012 §3.4). On Valkey
33//! this is the single-FCALL-per-op property; on Postgres it is the
34//! per-transaction property. A backend that cannot honour atomicity
35//! for a given op either MUST NOT implement `EngineBackend` or MUST
36//! return `EngineError::Unavailable { op }` for the affected method.
37//!
38//! # Replay semantics
39//!
40//! `complete`, `fail`, `cancel`, `suspend`, `delay`, `wait_children`
41//! are idempotent under replay — calling twice with the same handle
42//! and args returns the same outcome (success on first call, typed
43//! `State` / `Contention` on subsequent calls where the fence triple
44//! no longer matches a live lease).
45
46use std::time::Duration;
47
48use async_trait::async_trait;
49
50use crate::backend::{
51 AppendFrameOutcome, CancelFlowPolicy, CancelFlowWait, CapabilitySet, ClaimPolicy,
52 FailOutcome, FailureClass, FailureReason, Frame, Handle, LeaseRenewal, PendingWaitpoint,
53 PrepareOutcome, ResumeSignal, ResumeToken,
54};
55// `SummaryDocument` and `TailVisibility` are referenced only inside
56// `#[cfg(feature = "streaming")]` trait methods below, so the imports
57// must be gated to avoid an unused-imports warning on the non-streaming
58// build.
59#[cfg(feature = "streaming")]
60use crate::backend::{SummaryDocument, TailVisibility};
61use crate::contracts::{
62 CancelFlowResult, ExecutionContext, ExecutionSnapshot, FlowSnapshot, IssueReclaimGrantArgs,
63 IssueReclaimGrantOutcome, ReclaimExecutionArgs, ReclaimExecutionOutcome, ReportUsageResult,
64 RotateWaitpointHmacSecretAllArgs, RotateWaitpointHmacSecretAllResult, SeedOutcome,
65 SeedWaitpointHmacSecretArgs, SuspendArgs, SuspendOutcome,
66};
67#[cfg(feature = "core")]
68use crate::contracts::{
69 AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
70 ApplyDependencyToChildResult, BudgetStatus, CancelExecutionArgs, CancelExecutionResult,
71 CancelFlowArgs, ChangePriorityArgs, ChangePriorityResult, ClaimExecutionArgs,
72 ClaimExecutionResult, ClaimForWorkerArgs, ClaimForWorkerOutcome, ClaimResumedExecutionArgs,
73 ClaimResumedExecutionResult,
74 BlockExecutionForAdmissionArgs, BlockExecutionForAdmissionOutcome, BlockRouteArgs,
75 BlockRouteOutcome, CheckAdmissionArgs, CheckAdmissionResult,
76 ClaimGrantOutcome,
77 CompleteExecutionArgs, CompleteExecutionResult, CreateBudgetArgs, CreateBudgetResult,
78 CreateExecutionArgs, CreateExecutionResult, CreateFlowArgs, CreateFlowResult,
79 CreateQuotaPolicyArgs, CreateQuotaPolicyResult,
80 DeliverApprovalSignalArgs, DeliverSignalArgs, DeliverSignalResult, EdgeDirection, EdgeSnapshot,
81 EvaluateFlowEligibilityArgs, EvaluateFlowEligibilityResult, ExecutionInfo,
82 FailExecutionArgs, FailExecutionResult,
83 IssueClaimGrantArgs, IssueClaimGrantOutcome, IssueGrantAndClaimArgs,
84 BudgetUsageAndLimits, QuotaPolicyLimits, RecordSpendArgs, ReleaseAdmissionArgs,
85 ReleaseAdmissionResult,
86 ReleaseBudgetArgs, ScanEligibleArgs,
87 ListExecutionsPage, ListFlowsPage, ListLanesPage, ListPendingWaitpointsArgs,
88 ListPendingWaitpointsResult, ListSuspendedPage, RenewLeaseArgs, RenewLeaseResult,
89 ReplayExecutionArgs, ReplayExecutionResult,
90 ReportUsageAdminArgs, ResetBudgetArgs, ResetBudgetResult, ResumeExecutionArgs,
91 ResumeExecutionResult, RevokeLeaseArgs, RevokeLeaseResult,
92 StageDependencyEdgeArgs, StageDependencyEdgeResult,
93};
94// RFC-025 worker registry.
95#[cfg(feature = "core")]
96use crate::contracts::{
97 HeartbeatWorkerArgs, HeartbeatWorkerOutcome, ListWorkersArgs, ListWorkersResult,
98 MarkWorkerDeadArgs, MarkWorkerDeadOutcome, RegisterWorkerArgs, RegisterWorkerOutcome,
99};
100#[cfg(feature = "suspension")]
101use crate::contracts::{ListExpiredLeasesArgs, ListExpiredLeasesResult};
102#[cfg(feature = "core")]
103use crate::state::PublicState;
104#[cfg(feature = "core")]
105use crate::partition::PartitionKey;
106#[cfg(feature = "streaming")]
107use crate::contracts::{StreamCursor, StreamFrames};
108use crate::engine_error::EngineError;
109#[cfg(feature = "core")]
110use crate::types::EdgeId;
111#[cfg(feature = "core")]
112use crate::types::WaitpointId;
113use crate::types::{AttemptIndex, BudgetId, ExecutionId, FlowId, LaneId, LeaseFence, TimestampMs};
114
115/// The engine write surface — a single trait a backend implementation
116/// honours to serve a `FlowFabricWorker`.
117///
118/// See RFC-012 §3.1 for the inventory rationale and §3.3 for the
119/// type-level shape. 16 methods (Round-7 added `create_waitpoint`;
120/// `append_frame` return widened; `report_usage` return replaced —
121/// RFC-012 §R7). Issue #150 added the two trigger-surface methods
122/// (`deliver_signal` / `claim_resumed_execution`).
123///
124/// # Note on `complete` payload shape
125///
126/// The RFC §3.3 sketch uses `Option<Bytes>`; the Stage 1a trait uses
127/// `Option<Vec<u8>>` to match the existing
128/// `ff_sdk::ClaimedTask::complete` signature and avoid adding a
129/// `bytes` public-type dep for zero consumer benefit. Round-4 §7.17
130/// resolved the payload container debate to `Box<[u8]>` in the
131/// public type (see `HandleOpaque`); `Option<Vec<u8>>` is the
132/// zero-churn choice consistent with today's code. Consumers that
133/// need `&[u8]` can borrow via `.as_deref()` on the Option.
134#[async_trait]
135pub trait EngineBackend: Send + Sync + 'static {
136 // ── Claim + lifecycle ──
137
138 /// Fresh-work claim. Returns `Ok(None)` when no work is currently
139 /// available; `Err` only on transport or input-validation faults.
140 async fn claim(
141 &self,
142 lane: &LaneId,
143 capabilities: &CapabilitySet,
144 policy: ClaimPolicy,
145 ) -> Result<Option<Handle>, EngineError>;
146
147 /// Renew a held lease. Returns the updated expiry + epoch on
148 /// success; typed `State::StaleLease` / `State::LeaseExpired`
149 /// when the lease has been stolen or timed out.
150 async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError>;
151
152 /// Numeric-progress heartbeat.
153 ///
154 /// Writes scalar `progress_percent` / `progress_message` fields on
155 /// `exec_core`; each call overwrites the previous value. This does
156 /// NOT append to the output stream — stream-frame producers must use
157 /// [`append_frame`](Self::append_frame) instead.
158 async fn progress(
159 &self,
160 handle: &Handle,
161 percent: Option<u8>,
162 message: Option<String>,
163 ) -> Result<(), EngineError>;
164
165 /// Append one stream frame. Distinct from [`progress`](Self::progress)
166 /// per RFC-012 §3.1.1 K#6. Returns the backend-assigned stream entry
167 /// id and post-append frame count (RFC-012 §R7.2.1).
168 ///
169 /// Stream-frame producers (arbitrary `frame_type` + payload, consumed
170 /// via the read/tail surfaces) MUST use this method rather than
171 /// [`progress`](Self::progress); the latter updates scalar fields on
172 /// `exec_core` and is invisible to stream consumers.
173 async fn append_frame(
174 &self,
175 handle: &Handle,
176 frame: Frame,
177 ) -> Result<AppendFrameOutcome, EngineError>;
178
179 /// Terminal success. Borrows `handle` (round-4 M-D2) so callers
180 /// can retry under `EngineError::Transport` without losing the
181 /// cookie. Payload is `Option<Vec<u8>>` per the note above.
182 async fn complete(&self, handle: &Handle, payload: Option<Vec<u8>>) -> Result<(), EngineError>;
183
184 /// Terminal failure with classification. Returns [`FailOutcome`]
185 /// so the caller learns whether a retry was scheduled.
186 async fn fail(
187 &self,
188 handle: &Handle,
189 reason: FailureReason,
190 classification: FailureClass,
191 ) -> Result<FailOutcome, EngineError>;
192
193 /// Cooperative cancel by the worker holding the lease.
194 async fn cancel(&self, handle: &Handle, reason: &str) -> Result<(), EngineError>;
195
196 /// Suspend the execution awaiting a typed resume condition
197 /// (RFC-013 Stage 1d).
198 ///
199 /// Borrows `handle` (round-4 M-D2). Terminal-looking behaviour is
200 /// expressed through [`SuspendOutcome`]:
201 ///
202 /// * [`SuspendOutcome::Suspended`] — the pre-suspend handle is
203 /// logically invalidated; the fresh `HandleKind::Suspended`
204 /// handle inside the variant supersedes it. Runtime enforcement
205 /// via the fence triple: subsequent ops against the stale handle
206 /// surface as `Contention(LeaseConflict)`.
207 /// * [`SuspendOutcome::AlreadySatisfied`] — buffered signals on a
208 /// pending waitpoint already matched the resume condition at
209 /// suspension time. The lease is NOT released; the caller's
210 /// pre-suspend handle remains valid.
211 ///
212 /// See RFC-013 §2 for the type shapes, §3 for the replay /
213 /// idempotency contract, §4 for the error taxonomy.
214 async fn suspend(
215 &self,
216 handle: &Handle,
217 args: SuspendArgs,
218 ) -> Result<SuspendOutcome, EngineError>;
219
220 /// Suspend by execution id + lease fence triple, for service-layer
221 /// callers that hold a run record / lease-claim descriptor but no
222 /// worker [`Handle`] (cairn issue #322).
223 ///
224 /// Semantics mirror [`Self::suspend`] exactly — the same
225 /// [`SuspendArgs`] validation, the same [`SuspendOutcome`]
226 /// lifecycle, the same RFC-013 §3 dedup / replay contract. The
227 /// only difference is the fencing source: instead of the
228 /// `(lease_id, lease_epoch, attempt_id)` fields embedded in a
229 /// `Handle`, the backend fences against the triple passed directly.
230 /// Attempt-index, lane, and worker-instance metadata that
231 /// [`Self::suspend`] reads from the handle payload are recovered
232 /// from the backend's authoritative execution record (Valkey:
233 /// `exec_core` HGETs; Postgres: `ff_attempt` row lookup).
234 ///
235 /// The default impl returns [`EngineError::Unavailable`] so
236 /// existing backend impls remain non-breaking. Production backends
237 /// (Valkey, Postgres) override.
238 async fn suspend_by_triple(
239 &self,
240 exec_id: ExecutionId,
241 triple: LeaseFence,
242 args: SuspendArgs,
243 ) -> Result<SuspendOutcome, EngineError> {
244 let _ = (exec_id, triple, args);
245 Err(EngineError::Unavailable {
246 op: "suspend_by_triple",
247 })
248 }
249
250 /// Issue a pending waitpoint for future signal delivery.
251 ///
252 /// Waitpoints have two states in the Valkey wire contract:
253 /// **pending** (token issued, not yet backing a suspension) and
254 /// **active** (bound to a suspension). This method creates a
255 /// waitpoint in the **pending** state. A later `suspend` call
256 /// transitions a pending waitpoint to active (see Lua
257 /// `use_pending_waitpoint` ARGV flag at
258 /// `flowfabric.lua:3603,3641,3690`) — or, if buffered signals
259 /// already satisfy its condition, the suspend call returns
260 /// `SuspendOutcome::AlreadySatisfied` and the waitpoint activates
261 /// without ever releasing the lease.
262 ///
263 /// Pending-waitpoint expiry is a first-class terminal error on
264 /// the wire (`PendingWaitpointExpired` at
265 /// `ff-script/src/error.rs:170,403-408`). The attempt retains its
266 /// lease while the waitpoint is pending; signals delivered to
267 /// this waitpoint are buffered server-side (RFC-012 §R7.2.2).
268 async fn create_waitpoint(
269 &self,
270 handle: &Handle,
271 waitpoint_key: &str,
272 expires_in: Duration,
273 ) -> Result<PendingWaitpoint, EngineError>;
274
275 /// Read the HMAC token stored on a waitpoint record, keyed by
276 /// `(partition, waitpoint_id)`.
277 ///
278 /// Returns `Ok(Some(token))` when the waitpoint exists and carries
279 /// a token, `Ok(None)` when the waitpoint does not exist or no
280 /// token field is present. A missing waitpoint is not an error —
281 /// signals can legitimately arrive after a waitpoint has been
282 /// consumed or expired, and the signal-bridge authenticates on the
283 /// presence of a matching token, not on the waitpoint's liveness.
284 ///
285 /// # Use case
286 ///
287 /// Control-plane signal delivery (cairn signal-bridge): at
288 /// signal-resume time the bridge reads the token off the
289 /// waitpoint hash / row to authenticate the resume request it
290 /// subsequently issues. Previously implemented as direct
291 /// `ferriskey::Client::hget(waitpoint_key, "waitpoint_token")` —
292 /// Valkey-only. This method routes the read through the trait so
293 /// the pattern works on Postgres + SQLite as well.
294 ///
295 /// # Per-backend shape
296 ///
297 /// * **Valkey** — `HGET ff:wp:<tag>:<waitpoint_id> waitpoint_token`
298 /// on the waitpoint's partition. Empty string / missing field
299 /// maps to `None`.
300 /// * **Postgres** — `SELECT token FROM ff_waitpoint_pending
301 /// WHERE partition_key = $1 AND waitpoint_id = $2 LIMIT 1`.
302 /// Row-absent → `None`; empty `token` → `None`.
303 /// * **SQLite** — same shape as Postgres.
304 ///
305 /// The `partition` argument is the opaque [`PartitionKey`]
306 /// produced by FlowFabric (typically extracted from the
307 /// `Handle` / `ResumeToken` the waitpoint was minted against).
308 ///
309 /// # Default impl
310 ///
311 /// Returns [`EngineError::Unavailable`] with
312 /// `op = "read_waitpoint_token"` so out-of-tree backends and
313 /// in-tree backends not yet overriding this method continue to
314 /// compile. Mirrors the precedent used by
315 /// [`Self::issue_reclaim_grant`] / [`Self::reclaim_execution`].
316 #[cfg(feature = "core")]
317 async fn read_waitpoint_token(
318 &self,
319 _partition: PartitionKey,
320 _waitpoint_id: &WaitpointId,
321 ) -> Result<Option<String>, EngineError> {
322 Err(EngineError::Unavailable {
323 op: "read_waitpoint_token",
324 })
325 }
326
327 /// Non-mutating observation of signals that satisfied the handle's
328 /// resume condition.
329 async fn observe_signals(&self, handle: &Handle) -> Result<Vec<ResumeSignal>, EngineError>;
330
331 /// Consume a resume grant (via [`ResumeToken`]) to mint a
332 /// resumed-kind handle. Routes to `ff_claim_resumed_execution` on
333 /// Valkey / the epoch-bump reconciler on PG/SQLite. Returns
334 /// `Ok(None)` when the grant's target execution is no longer
335 /// resumable (already reclaimed, terminal, etc.).
336 ///
337 /// **Renamed from `claim_from_reclaim` (RFC-024 PR-B+C).** The
338 /// pre-rename name advertised "reclaim" but the semantic has
339 /// always been resume-after-suspend. The new lease-reclaim path
340 /// lives on [`Self::reclaim_execution`].
341 async fn claim_from_resume_grant(
342 &self,
343 token: ResumeToken,
344 ) -> Result<Option<Handle>, EngineError>;
345
346 /// Issue a lease-reclaim grant (RFC-024 §3.2). Admits executions
347 /// in `lease_expired_reclaimable` or `lease_revoked` state to the
348 /// reclaim path; the returned [`IssueReclaimGrantOutcome::Granted`]
349 /// carries a [`crate::contracts::ReclaimGrant`] which is then fed
350 /// to [`Self::reclaim_execution`] to mint a fresh attempt.
351 ///
352 /// Default impl returns [`EngineError::Unavailable`] — PR-D (PG),
353 /// PR-E (SQLite), and PR-F (Valkey) override with real bodies.
354 async fn issue_reclaim_grant(
355 &self,
356 _args: IssueReclaimGrantArgs,
357 ) -> Result<IssueReclaimGrantOutcome, EngineError> {
358 Err(EngineError::Unavailable {
359 op: "issue_reclaim_grant",
360 })
361 }
362
363 /// Consume a [`crate::contracts::ReclaimGrant`] to mint a fresh
364 /// attempt for a previously lease-expired / lease-revoked
365 /// execution (RFC-024 §3.2). Creates a new attempt row, bumps the
366 /// execution's `lease_reclaim_count`, and mints a
367 /// [`crate::backend::HandleKind::Reclaimed`] handle.
368 ///
369 /// Default impl returns [`EngineError::Unavailable`] — PR-D (PG),
370 /// PR-E (SQLite), and PR-F (Valkey) override with real bodies.
371 async fn reclaim_execution(
372 &self,
373 _args: ReclaimExecutionArgs,
374 ) -> Result<ReclaimExecutionOutcome, EngineError> {
375 Err(EngineError::Unavailable {
376 op: "reclaim_execution",
377 })
378 }
379
380 // Round-5 amendment: lease-releasing peers of `suspend`.
381
382 /// Park the execution until `delay_until`, releasing the lease.
383 async fn delay(&self, handle: &Handle, delay_until: TimestampMs) -> Result<(), EngineError>;
384
385 /// Mark the execution as waiting for its child flow to complete,
386 /// releasing the lease.
387 async fn wait_children(&self, handle: &Handle) -> Result<(), EngineError>;
388
389 // ── Read / admin ──
390
391 /// Snapshot an execution by id. `Ok(None)` ⇒ no such execution.
392 async fn describe_execution(
393 &self,
394 id: &ExecutionId,
395 ) -> Result<Option<ExecutionSnapshot>, EngineError>;
396
397 /// Point-read of the execution-scoped `(input_payload,
398 /// execution_kind, tags)` bundle used by the SDK worker when
399 /// assembling a `ClaimedTask` (see `ff_sdk::ClaimedTask`) after a
400 /// successful claim.
401 ///
402 /// No default impl — every `EngineBackend` must answer this
403 /// explicitly. Distinct from [`Self::describe_execution`]
404 /// (read-model projection) because the SDK needs the raw payload
405 /// bytes + kind + tags immediately post-claim, and the snapshot
406 /// projection deliberately omits the payload bytes.
407 ///
408 /// Per-backend shape:
409 ///
410 /// * **Valkey** — pipelined `GET :payload` + `HGETALL :core`
411 /// + `HGETALL :tags` on the execution's partition (same pattern
412 /// as [`Self::describe_execution`]).
413 /// * **Postgres** — single `SELECT payload, raw_fields` on
414 /// `ff_exec_core` keyed by `(partition_key, execution_id)`;
415 /// `execution_kind` + `tags` live in `raw_fields` JSONB.
416 /// * **SQLite** — identical shape to Postgres.
417 ///
418 /// Returns [`EngineError::Validation { kind: ValidationKind::InvalidInput, .. }`](crate::engine_error::EngineError::Validation)
419 /// when the execution does not exist — the SDK worker only calls
420 /// this after a successful claim, so a missing row is a loud
421 /// storage-tier invariant violation rather than a routine `Ok(None)`.
422 async fn read_execution_context(
423 &self,
424 execution_id: &ExecutionId,
425 ) -> Result<ExecutionContext, EngineError>;
426
427 /// Point-read of the execution's current attempt-index **pointer**
428 /// — the index of the currently-leased attempt row.
429 ///
430 /// Distinct from [`Self::read_total_attempt_count`]: this method
431 /// names the attempt that *already exists* (pointer), whereas
432 /// `read_total_attempt_count` is the monotonic claim counter used
433 /// to compute the next fresh attempt index. See the sibling's
434 /// rustdoc for the retry-path scenario that motivates the split.
435 ///
436 /// Used on the SDK worker's `claim_from_resume_grant` path —
437 /// specifically the private `claim_resumed_execution` helper —
438 /// immediately before dispatching [`Self::claim_resumed_execution`].
439 /// The returned index is fed into
440 /// [`ClaimResumedExecutionArgs::current_attempt_index`](crate::contracts::ClaimResumedExecutionArgs)
441 /// so the backend's script / transaction targets the correct
442 /// existing attempt row (KEYS[6] on Valkey; `ff_attempt` PK tuple
443 /// on PG/SQLite).
444 ///
445 /// Per-backend shape:
446 ///
447 /// * **Valkey** — `HGET {exec}:core current_attempt_index` on the
448 /// execution's partition. Single command. Both the
449 /// **missing-field** case (`exec_core` present but
450 /// `current_attempt_index` absent or empty-string, i.e. pre-claim
451 /// state) **and** the **missing-row** case (no `exec_core` hash
452 /// at all) read back as `AttemptIndex(0)`. This preserves the
453 /// pre-PR-3 inline-`HGET` semantic and is safe because Valkey's
454 /// happy path requires `exec_core` to exist before this method
455 /// is reached — the SDK only calls `read_current_attempt_index`
456 /// post-grant, and grant issuance is gated on `exec_core`
457 /// presence. A genuinely absent row would surface as the proper
458 /// business-logic error (`NotAResumedExecution` /
459 /// `ExecutionNotLeaseable`) on the downstream FCALL.
460 /// * **Postgres** — `SELECT attempt_index FROM ff_exec_core
461 /// WHERE partition_key = $1 AND execution_id = $2`. The column
462 /// is `NOT NULL DEFAULT 0` so a pre-claim row reads back as `0`
463 /// (matching Valkey's missing-field case). **Missing row**
464 /// surfaces as [`EngineError::Validation { kind:
465 /// ValidationKind::InvalidInput, .. }`](crate::engine_error::EngineError::Validation)
466 /// — diverges from Valkey's missing-row `→ 0` mapping.
467 /// * **SQLite** — `SELECT attempt_index FROM ff_exec_core
468 /// WHERE partition_key = ? AND execution_id = ?`; identical
469 /// semantics to Postgres (missing-row → `InvalidInput`).
470 ///
471 /// **Cross-backend asymmetry on missing row is intentional.** The
472 /// SDK happy path never observes it (grant issuance on Valkey
473 /// requires `exec_core`, and PG/SQLite currently return
474 /// `Unavailable` from `claim_from_grant` per
475 /// `project_claim_from_grant_pg_sqlite_gap.md`). Consumers writing
476 /// backend-agnostic tooling against this method directly must
477 /// treat the missing-row case as backend-dependent — match on
478 /// `InvalidInput` for PG/SQLite, and treat an unexpected `0` as
479 /// the Valkey equivalent signal.
480 ///
481 /// The default impl returns [`EngineError::Unavailable`] so the
482 /// trait addition is non-breaking for out-of-tree backends (same
483 /// precedent as [`Self::read_execution_context`] landing in v0.12
484 /// PR-1).
485 async fn read_current_attempt_index(
486 &self,
487 _execution_id: &ExecutionId,
488 ) -> Result<AttemptIndex, EngineError> {
489 Err(EngineError::Unavailable {
490 op: "read_current_attempt_index",
491 })
492 }
493
494 /// Point-read of the execution's **total attempt counter** — the
495 /// monotonic count of claims that have ever fired against this
496 /// execution (including the in-flight one once claimed).
497 ///
498 /// Used on the SDK worker's `claim_from_grant` / `claim_execution`
499 /// path — the next attempt-index for a fresh claim is this
500 /// counter's current value (so `1` on the second retry after the
501 /// first attempt failed terminally). This is semantically distinct
502 /// from [`Self::read_current_attempt_index`], which is a *pointer*
503 /// at the currently-leased attempt row and is only meaningful on
504 /// the `claim_from_resume_grant` path (where a live attempt already
505 /// exists and we want to re-seat its lease rather than mint a new
506 /// attempt row).
507 ///
508 /// Reading the pointer on the `claim_from_grant` path was a live
509 /// bug: on the retry-of-a-retry scenario the pointer still named
510 /// the *previous* terminal-failed attempt, so the newly-minted
511 /// attempt collided with it (Valkey KEYS[6]) or mis-targeted the
512 /// PG/SQLite `ff_attempt` PK tuple. This method fixes that by
513 /// reading the counter that Lua 5920 / PG `ff_claim_execution` /
514 /// SQLite `claim_impl` all already consult when computing the
515 /// next attempt index.
516 ///
517 /// Per-backend shape:
518 ///
519 /// * **Valkey** — `HGET {exec}:core total_attempt_count` on the
520 /// execution's partition. Single command; pre-claim read (field
521 /// absent or empty) maps to `0`.
522 /// * **Postgres** — `SELECT raw_fields->>'total_attempt_count'
523 /// FROM ff_exec_core WHERE (partition_key, execution_id) = ...`.
524 /// The field lives in the JSONB `raw_fields` bag rather than a
525 /// dedicated column (mirrors how `create_execution_impl` seeds
526 /// it on row creation). Missing row → `InvalidInput`; missing
527 /// field → `0`.
528 /// * **SQLite** — `SELECT CAST(json_extract(raw_fields,
529 /// '$.total_attempt_count') AS INTEGER) FROM ff_exec_core
530 /// WHERE ...`. Same JSON-in-`raw_fields` shape as PG; uses the
531 /// same `json_extract` idiom already employed in
532 /// `ff-backend-sqlite/src/queries/operator.rs` for replay_count.
533 ///
534 /// The default impl returns [`EngineError::Unavailable`] so the
535 /// trait addition is non-breaking for out-of-tree backends (same
536 /// precedent as [`Self::read_current_attempt_index`] landing in
537 /// v0.12 PR-3).
538 async fn read_total_attempt_count(
539 &self,
540 _execution_id: &ExecutionId,
541 ) -> Result<AttemptIndex, EngineError> {
542 Err(EngineError::Unavailable {
543 op: "read_total_attempt_count",
544 })
545 }
546
547 /// Snapshot a flow by id. `Ok(None)` ⇒ no such flow.
548 async fn describe_flow(&self, id: &FlowId) -> Result<Option<FlowSnapshot>, EngineError>;
549
550 // ── Namespaced tag point-writes / reads (issue #433) ──
551
552 /// Set a single namespaced tag on an execution. Tag `key` MUST match
553 /// the reserved caller-namespace pattern `^[a-z][a-z0-9_]*\.[a-z0-9_][a-z0-9_.]*$` —
554 /// i.e. `<caller>.<field>` — or the call returns
555 /// [`EngineError::Validation { kind: ValidationKind::InvalidInput, .. }`](crate::engine_error::EngineError::Validation)
556 /// with the offending key in `detail`. `value` is arbitrary UTF-8.
557 ///
558 /// The namespace prefix is carried inline in `key` (e.g.
559 /// `"cairn.session_id"`) — there is no separate `namespace` arg.
560 /// This matches the existing `ff_set_execution_tags` wire shape and
561 /// the flow-tag projection in [`ExecutionSnapshot::tags`].
562 ///
563 /// Validation is performed by each overriding backend impl via
564 /// [`validate_tag_key`] **before** the wire hop so PG / SQLite /
565 /// Valkey reject the same set of keys. The default trait impl
566 /// returns [`EngineError::Unavailable`] without running validation
567 /// — there is no meaningful storage to validate against on an
568 /// unsupported backend, and surfacing `Unavailable` before
569 /// `Validation` matches the precedence used elsewhere on the trait.
570 /// Backends MAY additionally validate on the storage tier (Valkey's
571 /// Lua path does, with a more permissive prefix-only check).
572 ///
573 /// Per-backend shape:
574 ///
575 /// * **Valkey** — `ff_set_execution_tags` FCALL with a single
576 /// `{key → value}` pair. Routes through the existing Lua
577 /// contract (no new wire format).
578 /// * **Postgres** — `UPDATE ff_exec_core SET raw_fields = jsonb_set(
579 /// coalesce(raw_fields, '{}'::jsonb), '{tags,<key>}', to_jsonb($value))
580 /// WHERE (partition_key, execution_id) = ...`. Same storage shape
581 /// read by [`Self::describe_execution`] / [`Self::read_execution_context`].
582 /// * **SQLite** — `UPDATE ff_exec_core SET raw_fields = json_set(
583 /// coalesce(raw_fields, '{}'), '$.tags."<key>"', $value) WHERE ...`.
584 /// The key is quoted in the JSON path so dots inside the
585 /// namespaced key (e.g. `cairn.session_id`) are treated as a
586 /// single literal member name rather than JSON-path separators —
587 /// yielding the same flat `raw_fields.tags` shape as PG.
588 ///
589 /// Missing execution surfaces as
590 /// [`EngineError::NotFound { entity: "execution" }`](crate::engine_error::EngineError::NotFound)
591 /// — matches the Valkey FCALL's `execution_not_found` mapping and
592 /// the existing `ScriptError::ExecutionNotFound` → `EngineError`
593 /// conversion (`ff_script::engine_error_ext`).
594 ///
595 /// The default impl returns [`EngineError::Unavailable`] so the
596 /// trait addition is non-breaking for out-of-tree backends.
597 async fn set_execution_tag(
598 &self,
599 _execution_id: &ExecutionId,
600 _key: &str,
601 _value: &str,
602 ) -> Result<(), EngineError> {
603 Err(EngineError::Unavailable {
604 op: "set_execution_tag",
605 })
606 }
607
608 /// Set a single namespaced tag on a flow. Same namespace rule as
609 /// [`Self::set_execution_tag`]: `key` MUST match
610 /// `^[a-z][a-z0-9_]*\.[a-z0-9_][a-z0-9_.]*$`.
611 ///
612 /// Per-backend shape:
613 ///
614 /// * **Valkey** — `ff_set_flow_tags` FCALL with a single pair.
615 /// Tags land on the dedicated `ff:flow:{fp:N}:<flow_id>:tags`
616 /// hash, not on the `flow_core` hash (diverges from the
617 /// execution shape — execution tags live on `ff:exec:...:tags`
618 /// by the same split). **Lazy migration on first write**: the
619 /// Lua (`ff_script::flowfabric.lua`, `ff_set_flow_tags`) scans
620 /// `flow_core` once per flow for pre-58.4 inline namespaced
621 /// fields (anything matching `^[a-z][a-z0-9_]*\.`), HSETs them
622 /// onto `:tags`, HDELs them from `flow_core`, and stamps
623 /// `tags_migrated=1` on `flow_core` so subsequent calls
624 /// short-circuit to O(1). This heals flows created before
625 /// RFC-058.4 landed; well-formed flows pay the migration cost
626 /// only on their very first tag write. Callers MUST read tags
627 /// via [`Self::get_flow_tag`] (`HGET :tags <key>`) — direct
628 /// `HGETALL` against `flow_core` will not see post-migration
629 /// values.
630 ///
631 /// **Cross-backend parity caveat on `describe_flow`**: the
632 /// pre-existing `ValkeyBackend::describe_flow` /
633 /// `FlowSnapshot::tags` read path snapshots `flow_core` fields
634 /// only and does NOT today merge the `:tags` sub-hash, whereas
635 /// Postgres `describe_flow` DOES surface flow tags via
636 /// `ff_backend_postgres::flow::extract_tags` (which reads them
637 /// off `raw_fields` — the same store `set_flow_tag` writes on
638 /// PG). Trait consumers MUST NOT assume a tag written here
639 /// will be visible via `describe_flow` on every backend: on
640 /// Valkey, callers that need the full tag set should
641 /// complement the snapshot with per-key [`Self::get_flow_tag`]
642 /// reads. Extending Valkey `describe_flow` to merge `:tags`
643 /// is additive and out of scope for this trait addition.
644 /// * **Postgres** — `UPDATE ff_flow_core SET raw_fields =
645 /// jsonb_set(..., '{<key>}', ...)` — flow tags are stored as
646 /// top-level `raw_fields` keys (matches
647 /// `ff_backend_postgres::flow::extract_tags`). No `tags` nesting
648 /// on flows, which diverges from the execution shape.
649 /// * **SQLite** — mirrors PG: `UPDATE ff_flow_core SET raw_fields =
650 /// json_set(..., '$."<key>"', $value) WHERE ...`. The key is
651 /// quoted so the dotted namespaced key lands as a single flat
652 /// top-level member of `raw_fields`.
653 ///
654 /// Missing flow surfaces as
655 /// [`EngineError::NotFound { entity: "flow" }`](crate::engine_error::EngineError::NotFound)
656 /// (matches the Valkey FCALL's `flow_not_found` mapping).
657 ///
658 /// The default impl returns [`EngineError::Unavailable`].
659 async fn set_flow_tag(
660 &self,
661 _flow_id: &FlowId,
662 _key: &str,
663 _value: &str,
664 ) -> Result<(), EngineError> {
665 Err(EngineError::Unavailable {
666 op: "set_flow_tag",
667 })
668 }
669
670 /// Read a single namespaced execution tag. Returns `Ok(None)` when
671 /// the tag is absent **or** the execution row does not exist —
672 /// the two cases are not distinguished on the read path. Callers
673 /// that need to distinguish should call [`Self::describe_execution`]
674 /// first (an `Ok(None)` from that method proves the execution is
675 /// absent). This matches Valkey's native `HGET` semantics and
676 /// keeps the read path at a single round-trip on every backend.
677 ///
678 /// `key` must pass [`validate_tag_key`] — a malformed key can
679 /// never be present in storage so the call short-circuits with
680 /// [`EngineError::Validation { kind: ValidationKind::InvalidInput, .. }`](crate::engine_error::EngineError::Validation)
681 /// rather than round-tripping.
682 ///
683 /// Per-backend shape:
684 ///
685 /// * **Valkey** — `HGET :tags <key>` on the execution's partition.
686 /// * **Postgres** — `SELECT raw_fields->'tags'->><key> FROM ff_exec_core
687 /// WHERE ...` with `fetch_optional` → missing row collapses to `None`.
688 /// * **SQLite** — `SELECT json_extract(raw_fields, '$.tags."<key>"')
689 /// FROM ff_exec_core WHERE ...` with the same collapse. The key is
690 /// quoted in the JSON path so dotted namespaced keys resolve to
691 /// the flat literal member written by `set_execution_tag`.
692 ///
693 /// The default impl returns [`EngineError::Unavailable`].
694 async fn get_execution_tag(
695 &self,
696 _execution_id: &ExecutionId,
697 _key: &str,
698 ) -> Result<Option<String>, EngineError> {
699 Err(EngineError::Unavailable {
700 op: "get_execution_tag",
701 })
702 }
703
704 /// Read an execution's `namespace` scalar. Returns `Ok(None)` when
705 /// the row is absent or the field is unset. Dedicated point-read
706 /// used by the scanner per-candidate filter (`should_skip_candidate`)
707 /// to preserve the 1-HGET cost contract documented in
708 /// `ff_engine::scanner::should_skip_candidate` — `describe_execution`
709 /// is heavier (HGETALL / full snapshot) and unnecessary when only
710 /// the namespace scalar is needed.
711 ///
712 /// Per-backend shape:
713 ///
714 /// * **Valkey** — `HGET :core namespace` on the execution's partition
715 /// (single field read on the already-hot exec_core hash).
716 /// * **Postgres** — `SELECT raw_fields->>'namespace' FROM ff_exec_core
717 /// WHERE partition_key = $1 AND execution_id = $2`.
718 /// * **SQLite** — `SELECT json_extract(raw_fields, '$.namespace')
719 /// FROM ff_exec_core WHERE ...`.
720 ///
721 /// The default impl returns [`EngineError::Unavailable`].
722 async fn get_execution_namespace(
723 &self,
724 _execution_id: &ExecutionId,
725 ) -> Result<Option<String>, EngineError> {
726 Err(EngineError::Unavailable {
727 op: "get_execution_namespace",
728 })
729 }
730
731 /// Read a single namespaced flow tag. Returns `Ok(None)` when
732 /// the tag is absent **or** the flow row does not exist (same
733 /// collapse semantics as [`Self::get_execution_tag`]). Symmetry
734 /// partner — consumers like cairn read `cairn.session_id` off
735 /// flows for archival.
736 ///
737 /// `key` must pass [`validate_tag_key`].
738 ///
739 /// Per-backend shape:
740 ///
741 /// * **Valkey** — `HGET :tags <key>` on the flow's partition.
742 /// * **Postgres** — `SELECT raw_fields->><key> FROM ff_flow_core
743 /// WHERE ...` (top-level `raw_fields` key, matches the flow-tag
744 /// storage shape).
745 /// * **SQLite** — `SELECT json_extract(raw_fields, '$."<key>"')
746 /// FROM ff_flow_core WHERE ...` (quoted key — see
747 /// `set_flow_tag`).
748 ///
749 /// The default impl returns [`EngineError::Unavailable`].
750 async fn get_flow_tag(
751 &self,
752 _flow_id: &FlowId,
753 _key: &str,
754 ) -> Result<Option<String>, EngineError> {
755 Err(EngineError::Unavailable {
756 op: "get_flow_tag",
757 })
758 }
759
760 /// List dependency edges adjacent to an execution. Read-only; the
761 /// backend resolves the subject execution's flow, reads the
762 /// direction-specific adjacency SET, and decodes each member's
763 /// flow-scoped `edge:<edge_id>` hash.
764 ///
765 /// Returns an empty `Vec` when the subject has no edges on the
766 /// requested side — including standalone executions (no owning
767 /// flow). Ordering is unspecified: the underlying adjacency SET
768 /// is an unordered SMEMBERS read. Callers that need deterministic
769 /// order should sort by [`EdgeSnapshot::edge_id`] /
770 /// [`EdgeSnapshot::created_at`] themselves.
771 ///
772 /// Parse failures on the edge hash surface as
773 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]
774 /// — unknown fields, missing required fields, endpoint mismatches
775 /// against the adjacency SET all fail loud rather than silently
776 /// returning partial results.
777 ///
778 /// Gated on the `core` feature — edge reads are part of the
779 /// minimal engine surface a Postgres-style backend must honour.
780 ///
781 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
782 #[cfg(feature = "core")]
783 async fn list_edges(
784 &self,
785 _flow_id: &FlowId,
786 _direction: EdgeDirection,
787 ) -> Result<Vec<EdgeSnapshot>, EngineError> {
788 Err(EngineError::Unavailable { op: "list_edges" })
789 }
790
791 /// Snapshot a single dependency edge by its owning flow + edge id.
792 ///
793 /// `Ok(None)` when the edge hash is absent (never staged, or
794 /// staged under a different flow than `flow_id`). Parse failures
795 /// on a present edge hash surface as
796 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]
797 /// — the stored `flow_id` field is cross-checked against the
798 /// caller's expected `flow_id` so a wrong-key read fails loud
799 /// rather than returning an unrelated edge.
800 ///
801 /// Gated on the `core` feature — single-edge reads are part of
802 /// the minimal snapshot surface an alternate backend must honour
803 /// alongside [`Self::describe_execution`] / [`Self::describe_flow`]
804 /// / [`Self::list_edges`].
805 ///
806 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
807 #[cfg(feature = "core")]
808 async fn describe_edge(
809 &self,
810 _flow_id: &FlowId,
811 _edge_id: &EdgeId,
812 ) -> Result<Option<EdgeSnapshot>, EngineError> {
813 Err(EngineError::Unavailable {
814 op: "describe_edge",
815 })
816 }
817
818 /// Resolve an execution's owning flow id, if any.
819 ///
820 /// `Ok(None)` when the execution's core record is absent or has
821 /// no associated flow (standalone execution). A present-but-
822 /// malformed `flow_id` field surfaces as
823 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`].
824 ///
825 /// Gated on the `core` feature. Used by ff-sdk's
826 /// `list_outgoing_edges` / `list_incoming_edges` to pivot from a
827 /// consumer-supplied `ExecutionId` to the `FlowId` required by
828 /// [`Self::list_edges`]. A Valkey backend serves this with a
829 /// single `HGET exec_core flow_id`; a Postgres backend serves it
830 /// with the equivalent single-column row lookup.
831 ///
832 /// [`EngineError::Validation { kind: ValidationKind::Corruption, .. }`]: crate::engine_error::EngineError::Validation
833 #[cfg(feature = "core")]
834 async fn resolve_execution_flow_id(
835 &self,
836 _eid: &ExecutionId,
837 ) -> Result<Option<FlowId>, EngineError> {
838 Err(EngineError::Unavailable {
839 op: "resolve_execution_flow_id",
840 })
841 }
842
843 /// List flows on a partition with cursor-based pagination (issue
844 /// #185).
845 ///
846 /// Returns a [`ListFlowsPage`] of [`FlowSummary`](crate::contracts::FlowSummary)
847 /// rows ordered by `flow_id` (UUID byte-lexicographic). `cursor`
848 /// is `None` for the first page; callers forward the returned
849 /// `next_cursor` verbatim to continue iteration, and the listing
850 /// is exhausted when `next_cursor` is `None`. `limit` is the
851 /// maximum number of rows to return on this page — implementations
852 /// MAY return fewer (end of partition) but MUST NOT exceed it.
853 ///
854 /// Ordering rationale: flow ids are UUIDs, and both Valkey
855 /// (sort after-the-fact) and Postgres (`ORDER BY flow_id`) can
856 /// agree on byte-lexicographic order — the same order
857 /// `FlowId::to_string()` produces for canonical hyphenated UUIDs.
858 /// Mapping to `cursor > flow_id` keeps the contract backend-
859 /// independent.
860 ///
861 /// # Postgres implementation pattern
862 ///
863 /// A Postgres-backed implementation serves this directly with
864 ///
865 /// ```sql
866 /// SELECT flow_id, created_at_ms, public_flow_state
867 /// FROM ff_flow
868 /// WHERE partition_key = $1
869 /// AND ($2::uuid IS NULL OR flow_id > $2)
870 /// ORDER BY flow_id
871 /// LIMIT $3 + 1;
872 /// ```
873 ///
874 /// — reading one extra row to decide whether `next_cursor` should
875 /// be set to the last row's `flow_id`. The Valkey implementation
876 /// maintains the `ff:idx:{fp:N}:flow_index` SET and performs the
877 /// sort + slice client-side (SMEMBERS then sort-by-UUID-bytes),
878 /// pipelining `HGETALL flow_core` for each row on the page.
879 ///
880 /// Gated on the `core` feature — flow listing is part of the
881 /// minimal engine surface a Postgres-style backend must honour.
882 #[cfg(feature = "core")]
883 async fn list_flows(
884 &self,
885 _partition: PartitionKey,
886 _cursor: Option<FlowId>,
887 _limit: usize,
888 ) -> Result<ListFlowsPage, EngineError> {
889 Err(EngineError::Unavailable { op: "list_flows" })
890 }
891
892 /// Enumerate registered lanes with cursor-based pagination.
893 ///
894 /// Lanes are global (not partition-scoped) — the backend serves
895 /// this from its lane registry and does NOT accept a
896 /// [`crate::partition::Partition`] argument. Results are sorted
897 /// by [`LaneId`] name so the ordering is stable across calls and
898 /// cursors address a deterministic position in the sort.
899 ///
900 /// * `cursor` — exclusive lower bound. `None` starts from the
901 /// first lane. To continue a walk, pass the previous page's
902 /// [`ListLanesPage::next_cursor`].
903 /// * `limit` — hard cap on the number of lanes returned in the
904 /// page. Backends MAY round this down when the registry size
905 /// is smaller; they MUST NOT return more than `limit`.
906 ///
907 /// [`ListLanesPage::next_cursor`] is `Some(last_lane_in_page)`
908 /// iff at least one more lane exists after the returned page,
909 /// and `None` on the final page. Callers loop until `next_cursor`
910 /// is `None` to read the full registry.
911 ///
912 /// Gated on the `core` feature — lane enumeration is part of the
913 /// minimal snapshot surface an alternate backend must honour
914 /// alongside [`Self::describe_flow`] / [`Self::list_edges`].
915 #[cfg(feature = "core")]
916 async fn list_lanes(
917 &self,
918 _cursor: Option<LaneId>,
919 _limit: usize,
920 ) -> Result<ListLanesPage, EngineError> {
921 Err(EngineError::Unavailable { op: "list_lanes" })
922 }
923
924 /// List suspended executions in one partition, cursor-paginated,
925 /// with each entry's suspension `reason_code` populated (issue
926 /// #183).
927 ///
928 /// Consumer-facing "what's blocked on what?" panels (ff-board's
929 /// suspended-executions view, operator CLIs) need the reason in
930 /// the list response so the UI does not round-trip per row to
931 /// `describe_execution` for a field it knows it needs. `reason`
932 /// on [`SuspendedExecutionEntry`] carries the free-form
933 /// `suspension:current.reason_code` field — see the type rustdoc
934 /// for the String-not-enum rationale.
935 ///
936 /// `cursor` is opaque to callers; pass `None` to start a fresh
937 /// scan and feed the returned [`ListSuspendedPage::next_cursor`]
938 /// back in on subsequent pages until it comes back `None`.
939 /// `limit` bounds the `entries` count; backends MAY return fewer
940 /// when the partition is exhausted.
941 ///
942 /// Ordering is by ascending `suspended_at_ms` (the per-lane
943 /// suspended ZSET score == `timeout_at` or the no-timeout
944 /// sentinel) with execution id as a lex tiebreak, so cursor
945 /// continuation is deterministic across calls.
946 ///
947 /// Gated on the `core` feature — suspended-list enumeration is
948 /// part of the minimal engine surface a Postgres-style backend
949 /// must honour.
950 #[cfg(feature = "core")]
951 async fn list_suspended(
952 &self,
953 _partition: PartitionKey,
954 _cursor: Option<ExecutionId>,
955 _limit: usize,
956 ) -> Result<ListSuspendedPage, EngineError> {
957 Err(EngineError::Unavailable {
958 op: "list_suspended",
959 })
960 }
961
962 /// Forward-only paginated listing of the executions indexed under
963 /// one partition.
964 ///
965 /// Reads the partition-wide `ff:idx:{p:N}:all_executions` set,
966 /// sorts lexicographically on `ExecutionId`, and returns the page
967 /// of ids strictly greater than `cursor` (or starting from the
968 /// smallest id when `cursor = None`). The returned
969 /// [`ListExecutionsPage::next_cursor`] is the last id on the page
970 /// iff at least one more id exists past it; `None` signals
971 /// end-of-stream.
972 ///
973 /// `limit` is the maximum number of ids returned on this page. A
974 /// `limit` of `0` returns an empty page with `next_cursor = None`.
975 /// Backends MAY cap `limit` internally (Valkey: 1000) and return
976 /// fewer ids than requested; callers continue paginating until
977 /// `next_cursor == None`.
978 ///
979 /// Ordering is stable under concurrent inserts for already-emitted
980 /// ids (an id less-than-or-equal-to the caller's cursor is never
981 /// re-emitted in later pages) but new inserts past the cursor WILL
982 /// appear in subsequent pages — consistent with forward-only
983 /// cursor semantics.
984 ///
985 /// Gated on the `core` feature — partition-scoped listing is part
986 /// of the minimal engine surface every backend must honour.
987 #[cfg(feature = "core")]
988 async fn list_executions(
989 &self,
990 _partition: PartitionKey,
991 _cursor: Option<ExecutionId>,
992 _limit: usize,
993 ) -> Result<ListExecutionsPage, EngineError> {
994 Err(EngineError::Unavailable {
995 op: "list_executions",
996 })
997 }
998
999 // ── Trigger ops (issue #150) ──
1000
1001 /// Deliver an external signal to a suspended execution's waitpoint.
1002 ///
1003 /// The backend atomically records the signal, evaluates the resume
1004 /// condition, and — when satisfied — transitions the execution
1005 /// from `suspended` to `runnable` (or buffers the signal when the
1006 /// waitpoint is still `pending`). Duplicate delivery — same
1007 /// `idempotency_key` + waitpoint — surfaces as
1008 /// [`DeliverSignalResult::Duplicate`] with the pre-existing
1009 /// `signal_id` rather than mutating state twice.
1010 ///
1011 /// Input validation (HMAC token presence, payload size limits,
1012 /// signal-name shape) is the backend's responsibility; callers
1013 /// pass a fully populated [`DeliverSignalArgs`] and receive typed
1014 /// outcomes or typed errors (`ScriptError::invalid_token`,
1015 /// `ScriptError::token_expired`, `ScriptError::ExecutionNotFound`
1016 /// surfaced via [`EngineError::Transport`] on the Valkey backend).
1017 ///
1018 /// Gated on the `core` feature — signal delivery is part of the
1019 /// minimal trigger surface every backend must honour so ff-server
1020 /// / REST handlers can dispatch against `Arc<dyn EngineBackend>`
1021 /// without knowing which backend is running underneath.
1022 #[cfg(feature = "core")]
1023 async fn deliver_signal(
1024 &self,
1025 _args: DeliverSignalArgs,
1026 ) -> Result<DeliverSignalResult, EngineError> {
1027 Err(EngineError::Unavailable {
1028 op: "deliver_signal",
1029 })
1030 }
1031
1032 /// Claim a resumed execution — a previously-suspended attempt that
1033 /// has cleared its resume condition (e.g. via
1034 /// [`Self::deliver_signal`]) and now needs a worker to pick up the
1035 /// same attempt index.
1036 ///
1037 /// Distinct from [`Self::claim`] (fresh work) and
1038 /// [`Self::claim_from_resume_grant`] (grant-based ownership transfer
1039 /// after a crash): the resumed-claim path re-binds an existing
1040 /// attempt rather than minting a new one. The backend issues a
1041 /// fresh `lease_id` + bumps the `lease_epoch`, preserving
1042 /// `attempt_id` / `attempt_index` so stream frames and progress
1043 /// updates continue on the same attempt.
1044 ///
1045 /// Typed failures surface via `ScriptError` → `EngineError`:
1046 /// `NotAResumedExecution` when the attempt state is not
1047 /// `attempt_interrupted`, `ExecutionNotLeaseable` when the
1048 /// lifecycle phase is not `runnable`, and `InvalidClaimGrant`
1049 /// when the grant key is missing or was already consumed.
1050 ///
1051 /// Gated on the `core` feature — resumed-claim is part of the
1052 /// minimal trigger surface every backend must honour.
1053 #[cfg(feature = "core")]
1054 async fn claim_resumed_execution(
1055 &self,
1056 _args: ClaimResumedExecutionArgs,
1057 ) -> Result<ClaimResumedExecutionResult, EngineError> {
1058 Err(EngineError::Unavailable {
1059 op: "claim_resumed_execution",
1060 })
1061 }
1062
1063 /// Scan a lane's eligible ZSET on one partition for
1064 /// highest-priority executions awaiting a worker (v0.12 PR-5).
1065 ///
1066 /// Lifted from the SDK-side `ZRANGEBYSCORE` inline on
1067 /// `FlowFabricWorker::claim_next` — the scheduler-bypass scanner
1068 /// gated behind `direct-valkey-claim`. The trait method itself is
1069 /// backend-agnostic; consumers that drive the scanner loop
1070 /// (bench harnesses, single-tenant dev) compose it with
1071 /// [`Self::issue_claim_grant`] + [`Self::claim_execution`] to
1072 /// replicate the pre-PR-5 `claim_next` body.
1073 ///
1074 /// # Backend coverage
1075 ///
1076 /// * **Valkey** — `ZRANGEBYSCORE eligible_zset -inf +inf LIMIT 0 <limit>`
1077 /// on the lane's partition-scoped eligible key. Single
1078 /// command; no script round-trip. Wire shape is byte-for-byte
1079 /// identical to the pre-PR SDK inline call so bench traces
1080 /// match pre-PR without new `#[tracing::instrument]` span names.
1081 /// * **Postgres / SQLite** — use the `Err(Unavailable)` default.
1082 /// PG/SQLite consumers drive work through the scheduler-routed
1083 /// [`Self::claim_for_worker`] path instead of the scanner
1084 /// primitives exposed here; lifting the scheduler itself onto
1085 /// the trait is RFC-024 follow-up scope. See
1086 /// `project_claim_from_grant_pg_sqlite_gap.md` for motivation.
1087 ///
1088 /// Default impl returns [`EngineError::Unavailable`] so the trait
1089 /// addition is non-breaking for out-of-tree backends. Same
1090 /// precedent as [`Self::claim_execution`] landing in v0.12 PR-4.
1091 #[cfg(feature = "core")]
1092 async fn scan_eligible_executions(
1093 &self,
1094 _args: ScanEligibleArgs,
1095 ) -> Result<Vec<ExecutionId>, EngineError> {
1096 Err(EngineError::Unavailable {
1097 op: "scan_eligible_executions",
1098 })
1099 }
1100
1101 /// Issue a claim grant — the scheduler's admission write — for a
1102 /// single execution on a single lane (v0.12 PR-5).
1103 ///
1104 /// Lifted from the SDK-side `ff_issue_claim_grant` inline helper
1105 /// on `FlowFabricWorker::claim_next`. The backend atomically
1106 /// writes the grant hash, appends to the per-worker grant index,
1107 /// and removes the execution from the lane's eligible ZSET.
1108 ///
1109 /// Typed rejects surface via [`EngineError::Validation`]:
1110 /// `CapabilityMismatch` when the worker's capabilities do not
1111 /// cover the execution's `required_capabilities`, `InvalidInput`
1112 /// for malformed args. Transport faults surface via
1113 /// [`EngineError::Transport`].
1114 ///
1115 /// # Backend coverage
1116 ///
1117 /// * **Valkey** — one `ff_issue_claim_grant` FCALL. KEYS/ARGV
1118 /// shape is byte-for-byte identical to the pre-PR SDK inline
1119 /// call; bench traces match pre-PR.
1120 /// * **Postgres / SQLite** — `Err(Unavailable)` default; use
1121 /// [`Self::claim_for_worker`] instead. See
1122 /// [`Self::scan_eligible_executions`] for the cross-link
1123 /// rationale.
1124 #[cfg(feature = "core")]
1125 async fn issue_claim_grant(
1126 &self,
1127 _args: IssueClaimGrantArgs,
1128 ) -> Result<IssueClaimGrantOutcome, EngineError> {
1129 Err(EngineError::Unavailable {
1130 op: "issue_claim_grant",
1131 })
1132 }
1133
1134 /// Move an execution from a lane's eligible ZSET into its
1135 /// blocked_route ZSET (v0.12 PR-5).
1136 ///
1137 /// Lifted from the SDK-side `ff_block_execution_for_admission`
1138 /// inline helper on `FlowFabricWorker::claim_next`. Called after
1139 /// a [`Self::issue_claim_grant`] `CapabilityMismatch` reject —
1140 /// without a block step, the inline scanner would re-pick the
1141 /// same top-of-ZSET every tick (parity with
1142 /// `ff-scheduler::Scheduler::block_candidate`).
1143 ///
1144 /// The engine's unblock scanner periodically promotes
1145 /// blocked_route back to eligible once a worker with matching
1146 /// caps registers.
1147 ///
1148 /// # Backend coverage
1149 ///
1150 /// * **Valkey** — one `ff_block_execution_for_admission` FCALL.
1151 /// * **Postgres / SQLite** — `Err(Unavailable)` default; the
1152 /// scheduler-routed [`Self::claim_for_worker`] path handles
1153 /// admission rejects server-side.
1154 #[cfg(feature = "core")]
1155 async fn block_route(
1156 &self,
1157 _args: BlockRouteArgs,
1158 ) -> Result<BlockRouteOutcome, EngineError> {
1159 Err(EngineError::Unavailable { op: "block_route" })
1160 }
1161
1162 /// Consume a scheduler-issued claim grant to mint a fresh attempt.
1163 ///
1164 /// The SDK's grant-consumer path — paired with `FlowFabricWorker::claim_from_grant`
1165 /// in `ff-sdk` — routes through this method. The scheduler has
1166 /// already validated budget / quota / capabilities and written a
1167 /// grant (Valkey `claim_grant` hash); this call atomically
1168 /// consumes that grant and creates the attempt row, mints
1169 /// `lease_id` + `lease_epoch`, and returns a
1170 /// [`ClaimExecutionResult::Claimed`] carrying the minted lease
1171 /// triple.
1172 ///
1173 /// Distinct from [`Self::claim`] (the scheduler-bypass scanner
1174 /// used by the `direct-valkey-claim` feature) — this method
1175 /// assumes the grant already exists and skips capability / ZSET
1176 /// scanning. The Valkey impl fires exactly one `ff_claim_execution`
1177 /// FCALL.
1178 ///
1179 /// Typed failures surface via `ScriptError` → `EngineError`:
1180 /// `UseClaimResumedExecution` when the attempt is actually
1181 /// `attempt_interrupted` (caller should retry via
1182 /// [`Self::claim_resumed_execution`] — see `ContentionKind` at
1183 /// `ff_core::engine_error`), `InvalidClaimGrant` when the grant is
1184 /// missing / consumed / worker-mismatched, `CapabilityMismatch`
1185 /// when the execution's `required_capabilities` drifted after
1186 /// grant issuance.
1187 ///
1188 /// # Backend coverage
1189 ///
1190 /// * **Valkey** — implemented in `ff-backend-valkey` (one
1191 /// `ff_claim_execution` FCALL).
1192 /// * **Postgres / SQLite** — use the `Err(Unavailable)` default in
1193 /// this PR. Grants on PG / SQLite today flow through
1194 /// `PostgresScheduler::claim_for_worker` (a sibling struct, not
1195 /// an `EngineBackend` method); wiring the default-over-trait
1196 /// behaviour into a PG / SQLite `claim_execution` impl lands
1197 /// with a future RFC-024 grant-consumer extension.
1198 ///
1199 /// The default impl returns [`EngineError::Unavailable`] so the
1200 /// trait addition is non-breaking for out-of-tree backends. Same
1201 /// precedent as [`Self::read_current_attempt_index`] landing in
1202 /// v0.12 PR-3.
1203 #[cfg(feature = "core")]
1204 async fn claim_execution(
1205 &self,
1206 _args: ClaimExecutionArgs,
1207 ) -> Result<ClaimExecutionResult, EngineError> {
1208 Err(EngineError::Unavailable {
1209 op: "claim_execution",
1210 })
1211 }
1212
1213 /// Operator-initiated cancellation of a flow and (optionally) its
1214 /// member executions. See RFC-012 §3.1.1 for the policy /wait
1215 /// matrix.
1216 async fn cancel_flow(
1217 &self,
1218 id: &FlowId,
1219 policy: CancelFlowPolicy,
1220 wait: CancelFlowWait,
1221 ) -> Result<CancelFlowResult, EngineError>;
1222
1223 /// RFC-016 Stage A: set the inbound-edge-group policy for a
1224 /// downstream execution. Must be called before the first
1225 /// `add_dependency(... -> downstream_execution_id)` — the backend
1226 /// rejects with [`EngineError::Conflict`] if edges have already
1227 /// been staged for this group.
1228 ///
1229 /// Stage A honours only
1230 /// [`EdgeDependencyPolicy::AllOf`](crate::contracts::EdgeDependencyPolicy::AllOf);
1231 /// the `AnyOf` / `Quorum` variants return
1232 /// [`EngineError::Validation`] with
1233 /// `detail = "stage A supports AllOf only; AnyOf/Quorum land in stage B"`
1234 /// until Stage B's resolver lands.
1235 #[cfg(feature = "core")]
1236 async fn set_edge_group_policy(
1237 &self,
1238 _flow_id: &FlowId,
1239 _downstream_execution_id: &ExecutionId,
1240 _policy: crate::contracts::EdgeDependencyPolicy,
1241 ) -> Result<crate::contracts::SetEdgeGroupPolicyResult, EngineError> {
1242 Err(EngineError::Unavailable {
1243 op: "set_edge_group_policy",
1244 })
1245 }
1246
1247 // ── HMAC secret rotation (v0.7 migration-master Q4) ──
1248
1249 /// Rotate the waitpoint HMAC signing kid **cluster-wide**.
1250 ///
1251 /// **v0.7 migration-master Q4 (adjudicated 2026-04-24).**
1252 /// Additive trait surface so Valkey and Postgres backends can
1253 /// both expose the "rotate everywhere" semantic under one name.
1254 ///
1255 /// * Valkey impl fans out an `ff_rotate_waitpoint_hmac_secret`
1256 /// FCALL per execution partition. `entries.len() == num_flow_partitions`
1257 /// and per-partition failures are surfaced as inner `Err`
1258 /// entries — the call as a whole does not fail when one
1259 /// partition's FCALL fails, matching
1260 /// [`ff_sdk::admin::rotate_waitpoint_hmac_secret_all_partitions`]'s
1261 /// partial-success contract.
1262 /// * Postgres impl (Wave 4) writes one row to
1263 /// `ff_waitpoint_hmac(kid, secret, rotated_at)` and returns a
1264 /// single-entry vec with `partition = 0`.
1265 ///
1266 /// The default impl returns
1267 /// [`EngineError::Unavailable`] with
1268 /// `op = "rotate_waitpoint_hmac_secret_all"` so backends that
1269 /// haven't implemented the method surface the miss loudly rather
1270 /// than silently no-op'ing. Both concrete backends override.
1271 async fn rotate_waitpoint_hmac_secret_all(
1272 &self,
1273 _args: RotateWaitpointHmacSecretAllArgs,
1274 ) -> Result<RotateWaitpointHmacSecretAllResult, EngineError> {
1275 Err(EngineError::Unavailable {
1276 op: "rotate_waitpoint_hmac_secret_all",
1277 })
1278 }
1279
1280 /// Seed the initial waitpoint HMAC secret for a fresh deployment
1281 /// (issue #280).
1282 ///
1283 /// **Idempotent.** If a `current_kid` (Valkey per-partition) or
1284 /// an active kid row (Postgres) already exists with the given
1285 /// `kid`, the method returns
1286 /// [`SeedOutcome::AlreadySeeded`] without overwriting, reporting
1287 /// whether the stored secret matches the caller-supplied one via
1288 /// `same_secret`. Callers (cairn boot, operator tooling) invoke
1289 /// this on every boot and let the backend decide whether to
1290 /// install — removing the client-side "check then HSET" race that
1291 /// cairn's raw-HSET boot path silently tolerated.
1292 ///
1293 /// For rotation of an already-seeded secret, use
1294 /// [`Self::rotate_waitpoint_hmac_secret_all`] instead; seed is
1295 /// install-only.
1296 ///
1297 /// The default impl returns [`EngineError::Unavailable`] with
1298 /// `op = "seed_waitpoint_hmac_secret"` so backends that haven't
1299 /// implemented the method surface the miss loudly.
1300 async fn seed_waitpoint_hmac_secret(
1301 &self,
1302 _args: SeedWaitpointHmacSecretArgs,
1303 ) -> Result<SeedOutcome, EngineError> {
1304 Err(EngineError::Unavailable {
1305 op: "seed_waitpoint_hmac_secret",
1306 })
1307 }
1308
1309 // ── Budget ──
1310
1311 /// Report usage against a budget and check limits. Returns the
1312 /// typed [`ReportUsageResult`] variant; backends enforce
1313 /// idempotency via the caller-supplied
1314 /// [`UsageDimensions::dedup_key`] (RFC-012 §R7.2.3 — replaces
1315 /// the pre-Round-7 `AdmissionDecision` return).
1316 async fn report_usage(
1317 &self,
1318 handle: &Handle,
1319 budget: &BudgetId,
1320 dimensions: crate::backend::UsageDimensions,
1321 ) -> Result<ReportUsageResult, EngineError>;
1322
1323 // ── Stream reads (RFC-012 Stage 1c tranche-4; issue #87) ──
1324
1325 /// Read frames from a completed or in-flight attempt's stream.
1326 ///
1327 /// `from` / `to` are [`StreamCursor`] values — `StreamCursor::Start`
1328 /// / `StreamCursor::End` are equivalent to XRANGE `-` / `+`, and
1329 /// `StreamCursor::At("<id>")` reads from a concrete entry id.
1330 ///
1331 /// Input validation (count_limit bounds, cursor shape) is the
1332 /// caller's responsibility — SDK-side wrappers in
1333 /// [`ff-sdk`](https://docs.rs/ff-sdk) enforce bounds before
1334 /// forwarding. Backends MAY additionally reject out-of-range
1335 /// input via [`EngineError::Validation`].
1336 ///
1337 /// Gated on the `streaming` feature — stream reads are part of
1338 /// the stream-subset surface a backend without XREAD-like
1339 /// primitives may omit.
1340 #[cfg(feature = "streaming")]
1341 async fn read_stream(
1342 &self,
1343 _execution_id: &ExecutionId,
1344 _attempt_index: AttemptIndex,
1345 _from: StreamCursor,
1346 _to: StreamCursor,
1347 _count_limit: u64,
1348 ) -> Result<StreamFrames, EngineError> {
1349 Err(EngineError::Unavailable { op: "read_stream" })
1350 }
1351
1352 /// Tail a live attempt's stream.
1353 ///
1354 /// `after` is an exclusive [`StreamCursor`] — entries with id
1355 /// strictly greater than `after` are returned. `StreamCursor::Start`
1356 /// / `StreamCursor::End` are NOT accepted here; callers MUST pass
1357 /// a concrete id (or `StreamCursor::from_beginning()`). The SDK
1358 /// wrapper rejects the open markers before reaching the backend.
1359 ///
1360 /// `block_ms == 0` → non-blocking peek. `block_ms > 0` → blocks up
1361 /// to that many ms for a new entry.
1362 ///
1363 /// `visibility` (RFC-015 §6.1) filters the returned entries by
1364 /// their stored [`StreamMode`](crate::backend::StreamMode)
1365 /// `mode` field. Default
1366 /// [`TailVisibility::All`](crate::backend::TailVisibility::All)
1367 /// preserves v1 behaviour.
1368 ///
1369 /// Gated on the `streaming` feature — see [`read_stream`](Self::read_stream).
1370 #[cfg(feature = "streaming")]
1371 async fn tail_stream(
1372 &self,
1373 _execution_id: &ExecutionId,
1374 _attempt_index: AttemptIndex,
1375 _after: StreamCursor,
1376 _block_ms: u64,
1377 _count_limit: u64,
1378 _visibility: TailVisibility,
1379 ) -> Result<StreamFrames, EngineError> {
1380 Err(EngineError::Unavailable { op: "tail_stream" })
1381 }
1382
1383 /// Read the rolling summary document for an attempt (RFC-015 §6.3).
1384 ///
1385 /// Returns `Ok(None)` when no [`StreamMode::DurableSummary`](crate::backend::StreamMode::DurableSummary)
1386 /// frame has ever been appended for the attempt. Non-blocking Hash
1387 /// read; safe to call from any consumer without holding the lease.
1388 ///
1389 /// Gated on the `streaming` feature — summary reads are part of
1390 /// the stream-subset surface.
1391 #[cfg(feature = "streaming")]
1392 async fn read_summary(
1393 &self,
1394 _execution_id: &ExecutionId,
1395 _attempt_index: AttemptIndex,
1396 ) -> Result<Option<SummaryDocument>, EngineError> {
1397 Err(EngineError::Unavailable {
1398 op: "read_summary",
1399 })
1400 }
1401
1402 // ── RFC-017 Stage A — Ingress (5) ──────────────────────────
1403 //
1404 // Every method in this block has a default impl returning
1405 // `EngineError::Unavailable { op }` per RFC-017 §5.3. Concrete
1406 // backends override each method with a real body. A missing
1407 // override surfaces as a loud typed error at the call site rather
1408 // than a silent no-op.
1409
1410 /// Create an execution. Ingress row 6 (RFC-017 §4). Wraps
1411 /// `ff_create_execution` on Valkey; `INSERT INTO ff_execution ...`
1412 /// on Postgres. The `idempotency_key` + backend-side default
1413 /// `dedup_ttl_ms = 86400000` make duplicate submissions idempotent.
1414 #[cfg(feature = "core")]
1415 async fn create_execution(
1416 &self,
1417 _args: CreateExecutionArgs,
1418 ) -> Result<CreateExecutionResult, EngineError> {
1419 Err(EngineError::Unavailable {
1420 op: "create_execution",
1421 })
1422 }
1423
1424 /// Create a flow header. Ingress row 5.
1425 #[cfg(feature = "core")]
1426 async fn create_flow(
1427 &self,
1428 _args: CreateFlowArgs,
1429 ) -> Result<CreateFlowResult, EngineError> {
1430 Err(EngineError::Unavailable { op: "create_flow" })
1431 }
1432
1433 /// Atomically add an execution to a flow (single-FCALL co-located
1434 /// commit on Valkey; single-transaction UPSERT on Postgres).
1435 #[cfg(feature = "core")]
1436 async fn add_execution_to_flow(
1437 &self,
1438 _args: AddExecutionToFlowArgs,
1439 ) -> Result<AddExecutionToFlowResult, EngineError> {
1440 Err(EngineError::Unavailable {
1441 op: "add_execution_to_flow",
1442 })
1443 }
1444
1445 /// Stage a dependency edge between flow members. CAS-guarded on
1446 /// `graph_revision` — stale rev returns `Contention(StaleGraphRevision)`.
1447 #[cfg(feature = "core")]
1448 async fn stage_dependency_edge(
1449 &self,
1450 _args: StageDependencyEdgeArgs,
1451 ) -> Result<StageDependencyEdgeResult, EngineError> {
1452 Err(EngineError::Unavailable {
1453 op: "stage_dependency_edge",
1454 })
1455 }
1456
1457 /// Apply a staged dependency edge to its downstream child.
1458 #[cfg(feature = "core")]
1459 async fn apply_dependency_to_child(
1460 &self,
1461 _args: ApplyDependencyToChildArgs,
1462 ) -> Result<ApplyDependencyToChildResult, EngineError> {
1463 Err(EngineError::Unavailable {
1464 op: "apply_dependency_to_child",
1465 })
1466 }
1467
1468 /// Resolve one dependency edge after its upstream reached a
1469 /// terminal outcome — satisfy on "success", mark impossible
1470 /// otherwise. Idempotent (`AlreadyResolved` on replay).
1471 ///
1472 /// PR-7b Step 0 overlap-resolver: lifted here so cluster 2
1473 /// (`scanner/dependency_reconciler`) could trait-route through
1474 /// `Arc<dyn EngineBackend>` without a merge conflict with cluster
1475 /// 4. Cluster 4 (`completion_listener::spawn_dispatch_loop`)
1476 /// ultimately routed through the coarser
1477 /// [`Self::cascade_completion`] (per-payload) instead of looping
1478 /// over this per-edge method, because the Postgres cascade is
1479 /// outbox-driven rather than per-edge — see `cascade_completion`
1480 /// rustdoc "Timing semantics" for details.
1481 ///
1482 /// # Backend status
1483 ///
1484 /// - **Valkey:** wraps `ff_resolve_dependency` (RFC-016 Stage C
1485 /// signature). Atomic single-slot FCALL.
1486 /// - **Postgres:** `Unavailable`. PG's post-completion cascade is
1487 /// not per-edge; it runs via
1488 /// `ff_backend_postgres::dispatch::dispatch_completion(event_id)`
1489 /// keyed on the `ff_completion_event` outbox row. The Valkey-
1490 /// shaped per-edge resolve does not map cleanly to that model;
1491 /// PG's `dependency_reconciler` already calls `dispatch_completion`
1492 /// directly. The engine's PR-7b/final integration test expects
1493 /// `Unsupported` logs from Valkey-shaped scanners on a PG
1494 /// deployment — this surface honours that contract.
1495 /// - **SQLite:** `Unavailable` for the same reason (mirrors PG).
1496 ///
1497 /// The default impl returns [`EngineError::Unavailable`] so a
1498 /// backend that has not been migrated surfaces a typed
1499 /// `Unsupported`-grade error rather than a panic.
1500 #[cfg(feature = "core")]
1501 async fn resolve_dependency(
1502 &self,
1503 _args: crate::contracts::ResolveDependencyArgs,
1504 ) -> Result<crate::contracts::ResolveDependencyOutcome, EngineError> {
1505 Err(EngineError::Unavailable {
1506 op: "resolve_dependency",
1507 })
1508 }
1509
1510 /// Cascade a terminal-execution completion into its downstream
1511 /// edges. Consumed by
1512 /// `ff-engine::completion_listener::spawn_dispatch_loop` (PR-7b
1513 /// Cluster 4) to trait-route the post-completion DAG-promotion
1514 /// path through `Arc<dyn EngineBackend>`.
1515 ///
1516 /// Distinct from [`Self::resolve_dependency`]: that method is
1517 /// per-edge (one `ff_resolve_dependency` FCALL); this method is
1518 /// per-completion and orchestrates the full outgoing-edge walk
1519 /// plus `child_skipped` recursion (Valkey) or outbox-event
1520 /// dispatch (Postgres).
1521 ///
1522 /// # Timing semantics
1523 ///
1524 /// Backends diverge on *when* the caller observes cascade work.
1525 /// See [`CascadeOutcome`] for the full contract; the short form:
1526 ///
1527 /// - **Valkey:** synchronous. FCALL-driven walk completes inline;
1528 /// `child_skipped` descendants are recursively cascaded up to the
1529 /// internal `MAX_CASCADE_DEPTH` cap before return.
1530 /// - **Postgres:** asynchronous via the `ff_completion_event`
1531 /// outbox. The call resolves `payload` to its `event_id`, runs
1532 /// `ff_backend_postgres::dispatch::dispatch_completion`, and
1533 /// returns when the outbox row has been claimed + its direct
1534 /// hops advanced. Further-descendant cascades ride their own
1535 /// outbox events (emitted by the per-hop tx) — NOT this call.
1536 ///
1537 /// Consumers that depend on synchronous cascade must either target
1538 /// Valkey explicitly or observe PG's `dispatched_at_ms` clearance
1539 /// via the `dependency_reconciler` partial index to verify drain.
1540 ///
1541 /// The default impl returns [`EngineError::Unavailable`] so
1542 /// backends that have not been migrated surface a typed error
1543 /// rather than a panic.
1544 ///
1545 /// [`CascadeOutcome`]: crate::contracts::CascadeOutcome
1546 #[cfg(feature = "core")]
1547 async fn cascade_completion(
1548 &self,
1549 _payload: &crate::backend::CompletionPayload,
1550 ) -> Result<crate::contracts::CascadeOutcome, EngineError> {
1551 Err(EngineError::Unavailable {
1552 op: "cascade_completion",
1553 })
1554 }
1555
1556 // ── RFC-017 Stage A — Operator control (4) ─────────────────
1557
1558 /// Operator-initiated execution cancel (row 2).
1559 #[cfg(feature = "core")]
1560 async fn cancel_execution(
1561 &self,
1562 _args: CancelExecutionArgs,
1563 ) -> Result<CancelExecutionResult, EngineError> {
1564 Err(EngineError::Unavailable {
1565 op: "cancel_execution",
1566 })
1567 }
1568
1569 /// Re-score an execution's eligibility priority (row 17).
1570 #[cfg(feature = "core")]
1571 async fn change_priority(
1572 &self,
1573 _args: ChangePriorityArgs,
1574 ) -> Result<ChangePriorityResult, EngineError> {
1575 Err(EngineError::Unavailable {
1576 op: "change_priority",
1577 })
1578 }
1579
1580 /// Replay a terminal execution (row 22). Variadic KEYS handling
1581 /// (inbound-edge pre-read) is hidden inside the Valkey impl per
1582 /// RFC-017 §4 row 3.
1583 #[cfg(feature = "core")]
1584 async fn replay_execution(
1585 &self,
1586 _args: ReplayExecutionArgs,
1587 ) -> Result<ReplayExecutionResult, EngineError> {
1588 Err(EngineError::Unavailable {
1589 op: "replay_execution",
1590 })
1591 }
1592
1593 /// Operator-initiated lease revoke (row 19).
1594 #[cfg(feature = "core")]
1595 async fn revoke_lease(
1596 &self,
1597 _args: RevokeLeaseArgs,
1598 ) -> Result<RevokeLeaseResult, EngineError> {
1599 Err(EngineError::Unavailable { op: "revoke_lease" })
1600 }
1601
1602 // ── cairn #389 — service-layer typed FCALL surface ────────────
1603 //
1604 // These methods mirror `complete`/`fail`/`renew` (which take a
1605 // worker [`Handle`]) but dispatch against `(execution_id, fence)`
1606 // tuples supplied directly. Service-layer callers (cairn's
1607 // `valkey_control_plane_impl.rs`, future consumers that hold a
1608 // run/lease descriptor without a `Handle`) use these to avoid
1609 // going through the raw `ferriskey::Value` FCALL escape hatch.
1610 //
1611 // Same shape / same precedent as `suspend_by_triple` (cairn #322).
1612 //
1613 // Default impl returns [`EngineError::Unavailable`] so the trait
1614 // addition is non-breaking for out-of-tree backends. The in-tree
1615 // Valkey backend overrides; Postgres + SQLite keep the default
1616 // until follow-up parity work lands (consistent with
1617 // `issue_reclaim_grant` / `reclaim_execution` precedent).
1618
1619 /// Service-layer `complete_execution` — peer of [`Self::complete`]
1620 /// that takes a fence triple instead of a worker [`Handle`]. See
1621 /// the group preamble above for cairn-migration context.
1622 #[cfg(feature = "core")]
1623 async fn complete_execution(
1624 &self,
1625 _args: CompleteExecutionArgs,
1626 ) -> Result<CompleteExecutionResult, EngineError> {
1627 Err(EngineError::Unavailable {
1628 op: "complete_execution",
1629 })
1630 }
1631
1632 /// Service-layer `fail_execution` — peer of [`Self::fail`] that
1633 /// takes a fence triple instead of a worker [`Handle`].
1634 #[cfg(feature = "core")]
1635 async fn fail_execution(
1636 &self,
1637 _args: FailExecutionArgs,
1638 ) -> Result<FailExecutionResult, EngineError> {
1639 Err(EngineError::Unavailable {
1640 op: "fail_execution",
1641 })
1642 }
1643
1644 /// Service-layer `renew_lease` — peer of [`Self::renew`] that
1645 /// takes a fence triple instead of a worker [`Handle`].
1646 #[cfg(feature = "core")]
1647 async fn renew_lease(
1648 &self,
1649 _args: RenewLeaseArgs,
1650 ) -> Result<RenewLeaseResult, EngineError> {
1651 Err(EngineError::Unavailable { op: "renew_lease" })
1652 }
1653
1654 /// Service-layer `resume_execution` — transitions a suspended
1655 /// execution back to runnable. Distinct from
1656 /// [`Self::claim_from_resume_grant`] (which mints a worker handle
1657 /// against an already-eligible resumed execution): this method is
1658 /// the lifecycle transition primitive the control plane calls
1659 /// when an operator / auto-resume policy moves a suspended
1660 /// execution forward.
1661 ///
1662 /// The Valkey impl pre-reads `current_waitpoint_id` + `lane_id`
1663 /// from `exec_core` so callers only need the execution id + the
1664 /// trigger type — same ergonomics as `revoke_lease` reading
1665 /// `current_worker_instance_id` when callers omit it.
1666 #[cfg(feature = "core")]
1667 async fn resume_execution(
1668 &self,
1669 _args: ResumeExecutionArgs,
1670 ) -> Result<ResumeExecutionResult, EngineError> {
1671 Err(EngineError::Unavailable {
1672 op: "resume_execution",
1673 })
1674 }
1675
1676 /// Service-layer `check_admission_and_record` — atomic admission
1677 /// check against a quota policy. Callers supply the policy id +
1678 /// dimension (quota keys live on their own `{q:<policy>}`
1679 /// partition that cannot be derived from `execution_id`, so these
1680 /// travel outside [`CheckAdmissionArgs`]). `dimension` defaults
1681 /// to `"default"` inside the Valkey body when the caller passes
1682 /// an empty string — matches cairn's pre-migration default.
1683 #[cfg(feature = "core")]
1684 async fn check_admission(
1685 &self,
1686 _quota_policy_id: &crate::types::QuotaPolicyId,
1687 _dimension: &str,
1688 _args: CheckAdmissionArgs,
1689 ) -> Result<CheckAdmissionResult, EngineError> {
1690 Err(EngineError::Unavailable {
1691 op: "check_admission",
1692 })
1693 }
1694
1695 /// Generalised admission block — covers budget / quota /
1696 /// capability denial paths. `BlockingReason` selects both the
1697 /// eligibility state written to `exec_core` and the target
1698 /// `blocked_<reason>` lane index. Valkey wraps the existing
1699 /// `ff_block_execution_for_admission` FCALL (KEYS=3); PG/SQLite
1700 /// write the equivalent row transition.
1701 ///
1702 /// Companion to [`Self::block_route`] — `block_route` stays as the
1703 /// capability-mismatch shorthand; this method is the primitive
1704 /// the scheduler reaches for when the reason is known at the
1705 /// call site (budget denial, quota denial, etc.). FF #511 Phase 2b.
1706 #[cfg(feature = "core")]
1707 async fn block_execution_for_admission(
1708 &self,
1709 _args: BlockExecutionForAdmissionArgs,
1710 ) -> Result<BlockExecutionForAdmissionOutcome, EngineError> {
1711 Err(EngineError::Unavailable {
1712 op: "block_execution_for_admission",
1713 })
1714 }
1715
1716 /// FF #511 Phase 3 — typed snapshot of a budget's usage + limits
1717 /// hashes. Replaces the scheduler's Valkey-shaped HGETALL/HGET
1718 /// pattern on `ff:budget:{K}:{id}:limits` + `ff:budget:{K}:{id}:usage`.
1719 /// Returns [`BudgetUsageAndLimits::empty`] when the limits hash
1720 /// is absent ("no limits configured" — not an error).
1721 #[cfg(feature = "core")]
1722 async fn read_budget_usage_and_limits(
1723 &self,
1724 _budget_id: &BudgetId,
1725 ) -> Result<BudgetUsageAndLimits, EngineError> {
1726 Err(EngineError::Unavailable {
1727 op: "read_budget_usage_and_limits",
1728 })
1729 }
1730
1731 /// Read the admission-relevant fields of a quota policy
1732 /// (rate limit, window, concurrency cap, jitter). Replaces the
1733 /// Valkey-shaped 4-HGET pattern on `ff:quota:{K}:def` that
1734 /// `ff_scheduler` used pre-FF #511. Returns `None` when the
1735 /// policy row is absent; absence is a well-defined "no admission
1736 /// configured" signal, not an error.
1737 #[cfg(feature = "core")]
1738 async fn read_quota_policy_limits(
1739 &self,
1740 _quota_policy_id: &crate::types::QuotaPolicyId,
1741 ) -> Result<Option<QuotaPolicyLimits>, EngineError> {
1742 Err(EngineError::Unavailable {
1743 op: "read_quota_policy_limits",
1744 })
1745 }
1746
1747 /// Service-layer `evaluate_flow_eligibility` — read-only check
1748 /// that returns the execution's current eligibility state
1749 /// (`eligible`, `blocked_by_dependencies`, or a backend-specific
1750 /// status string). Called by cairn's dependency-resolution path
1751 /// to decide whether a downstream execution can proceed.
1752 #[cfg(feature = "core")]
1753 async fn evaluate_flow_eligibility(
1754 &self,
1755 _args: EvaluateFlowEligibilityArgs,
1756 ) -> Result<EvaluateFlowEligibilityResult, EngineError> {
1757 Err(EngineError::Unavailable {
1758 op: "evaluate_flow_eligibility",
1759 })
1760 }
1761
1762 // ── #454 — cairn typed-FCALL additions (4) ─────────────────
1763
1764 /// Per-execution budget spend with tenant-open dimensions.
1765 ///
1766 /// Cairn #454 Q1/Q2: takes an open-set `BTreeMap<String, u64>` of
1767 /// deltas (distinct from the fixed-shape
1768 /// [`Self::report_usage`]/[`Self::report_usage_admin`] which use
1769 /// [`UsageDimensions`]). Return shape reuses [`ReportUsageResult`]
1770 /// — same `Ok` / `SoftBreach` / `HardBreach` / `AlreadyApplied`
1771 /// variants cairn's UI already branches on.
1772 ///
1773 /// The default impl returns [`EngineError::Unavailable`] so the
1774 /// trait remains additive for backends that have not landed the
1775 /// #454 body yet.
1776 #[cfg(feature = "core")]
1777 async fn record_spend(
1778 &self,
1779 _args: RecordSpendArgs,
1780 ) -> Result<ReportUsageResult, EngineError> {
1781 Err(EngineError::Unavailable {
1782 op: "record_spend",
1783 })
1784 }
1785
1786 /// Per-execution budget attribution release.
1787 ///
1788 /// Called on execution termination to reverse this execution's
1789 /// contribution to a budget counter. Per cairn #454 clarification
1790 /// this is **per-execution**, not a whole-budget flush — the
1791 /// budget persists across executions.
1792 ///
1793 /// The default impl returns [`EngineError::Unavailable`].
1794 #[cfg(feature = "core")]
1795 async fn release_budget(
1796 &self,
1797 _args: ReleaseBudgetArgs,
1798 ) -> Result<(), EngineError> {
1799 Err(EngineError::Unavailable {
1800 op: "release_budget",
1801 })
1802 }
1803
1804 /// Release a quota-admission slot that was recorded via
1805 /// [`Self::check_admission`] / `ff_check_admission_and_record` but
1806 /// for which `issue_claim_grant` subsequently failed. Idempotent:
1807 /// releasing an already-released slot is a no-op.
1808 ///
1809 /// Valkey wraps the existing `ff_release_admission` FCALL (DEL
1810 /// guard + SREM admitted_set + DECR-if-positive concurrency
1811 /// counter). PG/SQLite write to their admission-tracking rows.
1812 ///
1813 /// Used by `ff_scheduler::Scheduler` on the claim-fail rollback
1814 /// path (FF #511). The default impl returns
1815 /// [`EngineError::Unavailable`].
1816 #[cfg(feature = "core")]
1817 async fn release_admission(
1818 &self,
1819 _args: ReleaseAdmissionArgs,
1820 ) -> Result<ReleaseAdmissionResult, EngineError> {
1821 Err(EngineError::Unavailable {
1822 op: "release_admission",
1823 })
1824 }
1825
1826 /// Operator-driven approval-signal delivery.
1827 ///
1828 /// Pre-shaped variant of [`Self::deliver_signal`] where the caller
1829 /// **does not carry the waitpoint token**. The backend reads the
1830 /// token from `ff_waitpoint_pending`, HMAC-verifies server-side,
1831 /// and dispatches. Operator API never sees the token bytes.
1832 ///
1833 /// Cairn #454 Q3 — `signal_name` is a flat string (conventional
1834 /// values `"approved"` / `"rejected"`); audit metadata lives in
1835 /// cairn's audit log, not on the FF surface.
1836 ///
1837 /// The default impl returns [`EngineError::Unavailable`].
1838 #[cfg(feature = "core")]
1839 async fn deliver_approval_signal(
1840 &self,
1841 _args: DeliverApprovalSignalArgs,
1842 ) -> Result<DeliverSignalResult, EngineError> {
1843 Err(EngineError::Unavailable {
1844 op: "deliver_approval_signal",
1845 })
1846 }
1847
1848 /// Backend-atomic `issue_claim_grant` + `claim_execution`.
1849 ///
1850 /// Cairn #454 Q4 — composing the two primitives caller-side risks
1851 /// leaking a grant if `claim_execution` fails after
1852 /// `issue_claim_grant` succeeded. This method's contract is that
1853 /// the composition is backend-atomic: Valkey fuses them in one
1854 /// FCALL; PG/SQLite fuse them in one tx.
1855 ///
1856 /// The default impl **must not** be a chained call — it returns
1857 /// [`EngineError::Unavailable`] so consumers cannot accidentally
1858 /// use a non-atomic fallback.
1859 #[cfg(feature = "core")]
1860 async fn issue_grant_and_claim(
1861 &self,
1862 _args: IssueGrantAndClaimArgs,
1863 ) -> Result<ClaimGrantOutcome, EngineError> {
1864 Err(EngineError::Unavailable {
1865 op: "issue_grant_and_claim",
1866 })
1867 }
1868
1869 // ── RFC-017 Stage A — Budget + quota admin (5) ─────────────
1870
1871 /// Create a budget definition (row 6).
1872 #[cfg(feature = "core")]
1873 async fn create_budget(
1874 &self,
1875 _args: CreateBudgetArgs,
1876 ) -> Result<CreateBudgetResult, EngineError> {
1877 Err(EngineError::Unavailable {
1878 op: "create_budget",
1879 })
1880 }
1881
1882 /// Reset a budget's usage counters (row 10).
1883 #[cfg(feature = "core")]
1884 async fn reset_budget(
1885 &self,
1886 _args: ResetBudgetArgs,
1887 ) -> Result<ResetBudgetResult, EngineError> {
1888 Err(EngineError::Unavailable { op: "reset_budget" })
1889 }
1890
1891 /// Create a quota policy (row 7).
1892 #[cfg(feature = "core")]
1893 async fn create_quota_policy(
1894 &self,
1895 _args: CreateQuotaPolicyArgs,
1896 ) -> Result<CreateQuotaPolicyResult, EngineError> {
1897 Err(EngineError::Unavailable {
1898 op: "create_quota_policy",
1899 })
1900 }
1901
1902 /// Read-only budget status for operator visibility (row 8).
1903 #[cfg(feature = "core")]
1904 async fn get_budget_status(
1905 &self,
1906 _id: &BudgetId,
1907 ) -> Result<BudgetStatus, EngineError> {
1908 Err(EngineError::Unavailable {
1909 op: "get_budget_status",
1910 })
1911 }
1912
1913 /// Admin-path `report_usage` (row 9 + RFC-017 §5 round-1 F4).
1914 /// Distinct from the existing [`Self::report_usage`] which takes
1915 /// a worker handle — the admin path has no lease context.
1916 #[cfg(feature = "core")]
1917 async fn report_usage_admin(
1918 &self,
1919 _budget: &BudgetId,
1920 _args: ReportUsageAdminArgs,
1921 ) -> Result<ReportUsageResult, EngineError> {
1922 Err(EngineError::Unavailable {
1923 op: "report_usage_admin",
1924 })
1925 }
1926
1927 // ── RFC-017 Stage A — Read + diagnostics (3) ───────────────
1928
1929 /// Fetch the stored result payload for a completed execution
1930 /// (row 4). Returns `Ok(None)` when the execution is missing, not
1931 /// yet complete, or its payload was trimmed by retention policy.
1932 async fn get_execution_result(
1933 &self,
1934 _id: &ExecutionId,
1935 ) -> Result<Option<Vec<u8>>, EngineError> {
1936 Err(EngineError::Unavailable {
1937 op: "get_execution_result",
1938 })
1939 }
1940
1941 /// List the pending-or-active waitpoints for an execution, cursor
1942 /// paginated (row 5 / §8). Stage A preserves the existing
1943 /// `PendingWaitpointInfo` shape; Stage D ships the §8 HMAC
1944 /// sanitisation + `(token_kid, token_fingerprint)` schema.
1945 #[cfg(feature = "core")]
1946 async fn list_pending_waitpoints(
1947 &self,
1948 _args: ListPendingWaitpointsArgs,
1949 ) -> Result<ListPendingWaitpointsResult, EngineError> {
1950 Err(EngineError::Unavailable {
1951 op: "list_pending_waitpoints",
1952 })
1953 }
1954
1955 /// Backend-level reachability probe (row 1). Valkey: `PING`;
1956 /// Postgres: `SELECT 1`.
1957 async fn ping(&self) -> Result<(), EngineError> {
1958 Err(EngineError::Unavailable { op: "ping" })
1959 }
1960
1961 // ── RFC-025 worker registry ────────────────────────────────
1962 //
1963 // Four core primitives + Phase-6 `list_workers` readback. See
1964 // `rfcs/RFC-025-worker-registry.md` for the full design. Default
1965 // impls return `Unavailable` so out-of-tree backends keep
1966 // compiling; every in-tree backend overrides.
1967
1968 /// Register (or idempotently refresh) a worker instance. See RFC-025 §4.
1969 #[cfg(feature = "core")]
1970 async fn register_worker(
1971 &self,
1972 _args: RegisterWorkerArgs,
1973 ) -> Result<RegisterWorkerOutcome, EngineError> {
1974 Err(EngineError::Unavailable {
1975 op: "register_worker",
1976 })
1977 }
1978
1979 /// Refresh the worker-instance liveness TTL.
1980 #[cfg(feature = "core")]
1981 async fn heartbeat_worker(
1982 &self,
1983 _args: HeartbeatWorkerArgs,
1984 ) -> Result<HeartbeatWorkerOutcome, EngineError> {
1985 Err(EngineError::Unavailable {
1986 op: "heartbeat_worker",
1987 })
1988 }
1989
1990 /// Operator-driven worker death (distinct from passive TTL expiry).
1991 #[cfg(feature = "core")]
1992 async fn mark_worker_dead(
1993 &self,
1994 _args: MarkWorkerDeadArgs,
1995 ) -> Result<MarkWorkerDeadOutcome, EngineError> {
1996 Err(EngineError::Unavailable {
1997 op: "mark_worker_dead",
1998 })
1999 }
2000
2001 /// Enumerate expired leases for reclaim-decision tooling.
2002 #[cfg(feature = "suspension")]
2003 async fn list_expired_leases(
2004 &self,
2005 _args: ListExpiredLeasesArgs,
2006 ) -> Result<ListExpiredLeasesResult, EngineError> {
2007 Err(EngineError::Unavailable {
2008 op: "list_expired_leases",
2009 })
2010 }
2011
2012 /// Enumerate live workers (RFC-025 Phase 6, §9.4).
2013 #[cfg(feature = "core")]
2014 async fn list_workers(
2015 &self,
2016 _args: ListWorkersArgs,
2017 ) -> Result<ListWorkersResult, EngineError> {
2018 Err(EngineError::Unavailable {
2019 op: "list_workers",
2020 })
2021 }
2022
2023 // ── RFC-017 Stage A — Scheduling (1) ───────────────────────
2024
2025 /// Scheduler-routed claim entrypoint (row 18, RFC-017 §7). Valkey
2026 /// forwards to its `ff_scheduler::Scheduler` cursor; Postgres
2027 /// forwards to `PostgresScheduler`'s `FOR UPDATE SKIP LOCKED`
2028 /// path.
2029 ///
2030 /// Backends that carry an embedded scheduler (e.g. `ValkeyBackend`
2031 /// constructed via `with_embedded_scheduler`, or `PostgresBackend`
2032 /// with its `with_scanners` sibling) route the claim through it.
2033 /// Backends without a wired scheduler return
2034 /// [`EngineError::Unavailable`]. HTTP consumers use
2035 /// `FlowFabricWorker::claim_via_server` instead.
2036 #[cfg(feature = "core")]
2037 async fn claim_for_worker(
2038 &self,
2039 _args: ClaimForWorkerArgs,
2040 ) -> Result<ClaimForWorkerOutcome, EngineError> {
2041 Err(EngineError::Unavailable {
2042 op: "claim_for_worker",
2043 })
2044 }
2045
2046 // ── Cross-cutting (RFC-017 Stage B trait-lift) ──────────────
2047
2048 /// Static observability label identifying the backend family in
2049 /// logs + metrics (RFC-017 §5.4 + §9 Stage B). Default impl
2050 /// returns `"unknown"` so legacy `impl EngineBackend` blocks that
2051 /// have not upgraded keep compiling; every in-tree backend
2052 /// overrides — `ValkeyBackend` → `"valkey"`, `PostgresBackend` →
2053 /// `"postgres"`.
2054 fn backend_label(&self) -> &'static str {
2055 "unknown"
2056 }
2057
2058 /// Backend downcast escape hatch (v0.12 PR-7a transitional).
2059 ///
2060 /// Scanner supervisors in `ff-engine` still dispatch through a
2061 /// concrete `ferriskey::Client`; to keep the engine's public
2062 /// boundary backend-agnostic (`Arc<dyn EngineBackend>`) while the
2063 /// scanner internals remain Valkey-shaped, the engine downcasts
2064 /// via this method and reaches in for the embedded client. Every
2065 /// backend that wants to be consumed by `Engine::start_with_completions`
2066 /// overrides this to return `self` as `&dyn Any`; the default
2067 /// returns a placeholder so a stray `downcast_ref` fails cleanly
2068 /// rather than risking unsound behaviour.
2069 ///
2070 /// v0.13 (PR-7b) will trait-ify individual scanners onto
2071 /// `EngineBackend` and retire `ff-engine`'s dependence on this
2072 /// downcast path. The method itself will remain on the trait
2073 /// (likely deprecated) rather than be removed — removing a
2074 /// public trait method is a breaking change for external
2075 /// `impl EngineBackend` blocks.
2076 fn as_any(&self) -> &(dyn std::any::Any + 'static) {
2077 // Placeholder so the default does not expose `Self` for
2078 // downcast. Backends override to return `self`.
2079 &()
2080 }
2081
2082 /// RFC-018 Stage A: snapshot of this backend's identity + the
2083 /// flat `Supports` surface it can actually service. Consumers use
2084 /// this at startup to gate UI features / choose between alternative
2085 /// code paths before dispatching. See
2086 /// `rfcs/RFC-018-backend-capability-discovery.md` for the full
2087 /// discovery contract and the four owner-adjudicated open
2088 /// questions (granularity: coarse; version: struct; sync; no
2089 /// event stream).
2090 ///
2091 /// Default: returns a value tagged `family = "unknown"` with every
2092 /// `supports.*` bool `false`, so pre-RFC-018 out-of-tree backends
2093 /// keep compiling and consumers treat "all false" as "dispatch
2094 /// and catch [`EngineError::Unavailable`]" (pre-RFC-018 behaviour).
2095 /// Concrete in-tree backends (`ValkeyBackend`, `PostgresBackend`)
2096 /// override to populate a real value.
2097 ///
2098 /// Sync (no `.await`): backend-static info should not require a
2099 /// probe on every query. Dynamic probes happen once at
2100 /// `connect*` time and cache the result.
2101 fn capabilities(&self) -> crate::capability::Capabilities {
2102 crate::capability::Capabilities::new(
2103 crate::capability::BackendIdentity::new(
2104 "unknown",
2105 crate::capability::Version::new(0, 0, 0),
2106 "unknown",
2107 ),
2108 crate::capability::Supports::none(),
2109 )
2110 }
2111
2112 /// Issue #281: run one-time backend-specific boot preparation.
2113 ///
2114 /// Intended to run ONCE per deployment startup — NOT per request.
2115 /// Idempotent and safe for consumers to call on every application
2116 /// boot; backends that have nothing to do return
2117 /// [`PrepareOutcome::NoOp`] without side effects.
2118 ///
2119 /// Per-backend behaviour:
2120 ///
2121 /// * **Valkey** — issues `FUNCTION LOAD REPLACE` for the
2122 /// `flowfabric` Lua library (with bounded retry on transient
2123 /// transport faults; permanent compile errors surface as
2124 /// [`EngineError::Transport`] without retry). Returns
2125 /// [`PrepareOutcome::Applied`] carrying
2126 /// `"FUNCTION LOAD (flowfabric lib v<N>)"`.
2127 /// * **Postgres** — returns [`PrepareOutcome::NoOp`]. Schema
2128 /// migrations are applied out-of-band per
2129 /// `rfcs/drafts/v0.7-migration-master.md §Q12`; the backend
2130 /// runs a schema-version check at connect time and refuses to
2131 /// start on mismatch, so no boot-side prepare work remains.
2132 /// * **Default impl** — returns [`PrepareOutcome::NoOp`] so
2133 /// out-of-tree backends without preparation work compile
2134 /// without boilerplate.
2135 ///
2136 /// # Relationship to the in-tree boot path
2137 ///
2138 /// `ValkeyBackend::initialize_deployment` (called from
2139 /// `Server::start_with_metrics`) already invokes
2140 /// [`ensure_library`](ff_script::loader::ensure_library) inline as
2141 /// its step 4; that path is unchanged. `prepare()` exists as a
2142 /// **trait-surface entry point** so consumers that construct an
2143 /// `Arc<dyn EngineBackend>` outside of `Server` (e.g.
2144 /// cairn-fabric's boot path at `cairn-fabric/src/boot.rs`) can
2145 /// run the same preparation without reaching into
2146 /// backend-specific modules. The overlap is intentional: calling
2147 /// both `prepare()` and `initialize_deployment` is safe because
2148 /// `FUNCTION LOAD REPLACE` is idempotent under the version
2149 /// check.
2150 ///
2151 /// # Layer forwarding
2152 ///
2153 /// Layer impls (`HookedBackend`, ff-sdk layers) do NOT forward
2154 /// `prepare` today — consistent with `backend_label` / `ping` /
2155 /// `shutdown_prepare`. Consumers that wrap a backend in layers
2156 /// MUST call `prepare()` on the raw backend before wrapping, or
2157 /// accept the default [`PrepareOutcome::NoOp`].
2158 async fn prepare(&self) -> Result<PrepareOutcome, EngineError> {
2159 Ok(PrepareOutcome::NoOp)
2160 }
2161
2162 /// Drain-before-shutdown hook (RFC-017 §5.4). The server calls
2163 /// this before draining its own background tasks so backend-
2164 /// scoped primitives (Valkey stream semaphore, Postgres sqlx
2165 /// pool, …) can close their gates and await in-flight work up to
2166 /// `grace`.
2167 ///
2168 /// Default impl returns `Ok(())` — a no-op backend has nothing
2169 /// backend-scoped to drain. Concrete backends whose data plane
2170 /// owns resources (connection pools, semaphores, listeners)
2171 /// override with a real body.
2172 async fn shutdown_prepare(&self, _grace: Duration) -> Result<(), EngineError> {
2173 Ok(())
2174 }
2175
2176 // ── RFC-017 Stage E2 — `Server::client` removal (header + reads) ───
2177
2178 /// RFC-017 Stage E2: the "header" portion of `cancel_flow` — run the
2179 /// atomic flow-state flip (Valkey: `ff_cancel_flow` FCALL; Postgres:
2180 /// `cancel_flow_once` tx), decode policy + membership, and surface
2181 /// the `flow_already_terminal` idempotency branch as a first-class
2182 /// [`CancelFlowHeader::AlreadyTerminal`] so the Server can build
2183 /// the wire [`CancelFlowResult`] without reaching for a raw
2184 /// `Client`. Separate from the existing
2185 /// [`EngineBackend::cancel_flow`] entry point (which takes the
2186 /// enum-typed `(policy, wait)` split and returns the wait-collapsed
2187 /// `CancelFlowResult`) because the Server owns its own
2188 /// wait-dispatch + member-cancel machinery via
2189 /// [`EngineBackend::cancel_execution`] + backlog ack.
2190 ///
2191 /// Default impl returns [`EngineError::Unavailable`] so un-migrated
2192 /// backends surface the miss loudly.
2193 #[cfg(feature = "core")]
2194 async fn cancel_flow_header(
2195 &self,
2196 _args: CancelFlowArgs,
2197 ) -> Result<crate::contracts::CancelFlowHeader, EngineError> {
2198 Err(EngineError::Unavailable {
2199 op: "cancel_flow_header",
2200 })
2201 }
2202
2203 /// RFC-017 Stage E2: best-effort acknowledgement that one member of
2204 /// a `cancel_all` flow has completed its per-member cancel. Drains
2205 /// the member from the flow's `pending_cancels` set and, if empty,
2206 /// removes the flow from the partition-level `cancel_backlog`
2207 /// (Valkey: `ff_ack_cancel_member` FCALL; Postgres: table write —
2208 /// default `Unavailable` until Wave 9).
2209 ///
2210 /// Failures are swallowed by the caller — the cancel-backlog
2211 /// reconciler is the authoritative drain — but a typed error here
2212 /// lets the caller log a backend-scoped context string.
2213 #[cfg(feature = "core")]
2214 async fn ack_cancel_member(
2215 &self,
2216 _flow_id: &FlowId,
2217 _execution_id: &ExecutionId,
2218 ) -> Result<(), EngineError> {
2219 Err(EngineError::Unavailable {
2220 op: "ack_cancel_member",
2221 })
2222 }
2223
2224 /// RFC-017 Stage E2: full-shape execution read used by the
2225 /// `GET /v1/executions/{id}` HTTP route. Returns the legacy
2226 /// [`ExecutionInfo`] wire shape (not the decoupled
2227 /// [`ExecutionSnapshot`]) so the existing HTTP response bytes stay
2228 /// identical across the migration.
2229 ///
2230 /// `Ok(None)` ⇒ no such execution. Default `Unavailable` because
2231 /// the Valkey HGETALL-and-parse is backend-specific.
2232 #[cfg(feature = "core")]
2233 async fn read_execution_info(
2234 &self,
2235 _id: &ExecutionId,
2236 ) -> Result<Option<ExecutionInfo>, EngineError> {
2237 Err(EngineError::Unavailable {
2238 op: "read_execution_info",
2239 })
2240 }
2241
2242 /// RFC-017 Stage E2: narrow `public_state` read used by the
2243 /// `GET /v1/executions/{id}/state` HTTP route. Returns `Ok(None)`
2244 /// when the execution is missing. Default `Unavailable`.
2245 #[cfg(feature = "core")]
2246 async fn read_execution_state(
2247 &self,
2248 _id: &ExecutionId,
2249 ) -> Result<Option<PublicState>, EngineError> {
2250 Err(EngineError::Unavailable {
2251 op: "read_execution_state",
2252 })
2253 }
2254
2255 // ── RFC-019 Stage A/B/C — Stream-cursor subscriptions ─────────
2256 //
2257 // Four owner-adjudicated families (RFC-019 §Open Questions #5):
2258 // `lease_history`, `completion`, `signal_delivery`,
2259 // `instance_tags`. Stage C (this crate) promotes each family to
2260 // a typed event enum; consumers `match` on variants instead of
2261 // parsing a backend-shaped byte blob.
2262 //
2263 // Each method returns a family-specific subscription alias (see
2264 // [`crate::stream_events`]). All defaults return
2265 // `EngineError::Unavailable` per RFC-017 trait-growth conventions.
2266
2267 /// Subscribe to lease lifecycle events (acquired / renewed /
2268 /// expired / reclaimed / revoked) for the partition this backend
2269 /// is configured with.
2270 ///
2271 /// Cross-partition fan-out is consumer-side merge: subscribe
2272 /// per-partition backend instance and interleave on the read
2273 /// side. Yields
2274 /// `Err(EngineError::StreamDisconnected { cursor })` on backend
2275 /// disconnect; resume by calling this method again with the
2276 /// returned cursor.
2277 ///
2278 /// `filter` (#282): when `filter.instance_tag` is `Some((k, v))`,
2279 /// only events whose execution carries tag `k = v` are yielded
2280 /// (matching the [`crate::backend::ScannerFilter`] surface from
2281 /// #122). Pass `&ScannerFilter::default()` for unfiltered
2282 /// behaviour. Filtering happens inside the backend stream; the
2283 /// [`crate::stream_events::LeaseHistorySubscription`] return type
2284 /// is unchanged.
2285 async fn subscribe_lease_history(
2286 &self,
2287 _cursor: crate::stream_subscribe::StreamCursor,
2288 _filter: &crate::backend::ScannerFilter,
2289 ) -> Result<crate::stream_events::LeaseHistorySubscription, EngineError> {
2290 Err(EngineError::Unavailable {
2291 op: "subscribe_lease_history",
2292 })
2293 }
2294
2295 /// Subscribe to completion events (terminal state transitions).
2296 ///
2297 /// - **Postgres**: wraps the `ff_completion_event` outbox +
2298 /// LISTEN/NOTIFY machinery. Durable via event-id cursor.
2299 /// - **Valkey**: wraps the RESP3 `ff:dag:completions` pubsub
2300 /// subscriber. Pubsub is at-most-once over the live
2301 /// subscription window; the cursor is always the empty
2302 /// sentinel. If you need at-least-once replay with durable
2303 /// cursor resume, use the Postgres backend (see
2304 /// `docs/POSTGRES_PARITY_MATRIX.md` row `subscribe_completion`).
2305 ///
2306 /// `filter` (#282): see [`Self::subscribe_lease_history`]. Valkey
2307 /// reuses the `subscribe_completions_filtered` per-event HGET
2308 /// gate; Postgres filters inline against the outbox's denormalised
2309 /// `instance_tag` column.
2310 async fn subscribe_completion(
2311 &self,
2312 _cursor: crate::stream_subscribe::StreamCursor,
2313 _filter: &crate::backend::ScannerFilter,
2314 ) -> Result<crate::stream_events::CompletionSubscription, EngineError> {
2315 Err(EngineError::Unavailable {
2316 op: "subscribe_completion",
2317 })
2318 }
2319
2320 /// Subscribe to signal-delivery events (satisfied / buffered /
2321 /// deduped).
2322 ///
2323 /// `filter` (#282): see [`Self::subscribe_lease_history`].
2324 async fn subscribe_signal_delivery(
2325 &self,
2326 _cursor: crate::stream_subscribe::StreamCursor,
2327 _filter: &crate::backend::ScannerFilter,
2328 ) -> Result<crate::stream_events::SignalDeliverySubscription, EngineError> {
2329 Err(EngineError::Unavailable {
2330 op: "subscribe_signal_delivery",
2331 })
2332 }
2333
2334 /// Subscribe to instance-tag events (tag attached / cleared).
2335 ///
2336 /// Producer wiring is deferred per #311 audit ("no concrete
2337 /// demand"); the trait method exists for API uniformity across
2338 /// the four families. Backends currently return
2339 /// `EngineError::Unavailable`.
2340 async fn subscribe_instance_tags(
2341 &self,
2342 _cursor: crate::stream_subscribe::StreamCursor,
2343 ) -> Result<crate::stream_events::InstanceTagSubscription, EngineError> {
2344 Err(EngineError::Unavailable {
2345 op: "subscribe_instance_tags",
2346 })
2347 }
2348
2349 // ── PR-7b Cluster 1 — Foundation scanner operations ─────────
2350 //
2351 // Per-execution write hooks invoked by the engine scanner loop.
2352 // Scanner bodies discover candidate executions via partition
2353 // indices (Valkey ZSETs / PG index scans) and, for each candidate,
2354 // call through to one of the methods below to perform the atomic
2355 // state-flip. Defaults return `EngineError::Unavailable { op }`;
2356 // Valkey impls wrap the corresponding `ff_*` FCALL; Postgres impls
2357 // run a single-row tx mirroring the Lua semantic.
2358
2359 /// Lease-expiry scanner hook — mark an expired lease as reclaimable
2360 /// so another worker can redeem the execution. Atomic per call.
2361 ///
2362 /// Valkey: `FCALL ff_mark_lease_expired_if_due`.
2363 /// Postgres: single-row tx on `ff_attempt` + `ff_exec_core` (see
2364 /// `ff_backend_postgres::reconcilers::lease_expiry`).
2365 async fn mark_lease_expired_if_due(
2366 &self,
2367 _partition: crate::partition::Partition,
2368 _execution_id: &ExecutionId,
2369 ) -> Result<(), EngineError> {
2370 Err(EngineError::Unavailable {
2371 op: "mark_lease_expired_if_due",
2372 })
2373 }
2374
2375 /// Delayed-promoter scanner hook — promote a delayed execution to
2376 /// `eligible_now` once its `delay_until` has passed.
2377 ///
2378 /// Valkey: `FCALL ff_promote_delayed`.
2379 /// Postgres: Wave 9 schema scope (no current impl).
2380 async fn promote_delayed(
2381 &self,
2382 _partition: crate::partition::Partition,
2383 _lane: &LaneId,
2384 _execution_id: &ExecutionId,
2385 _now_ms: TimestampMs,
2386 ) -> Result<(), EngineError> {
2387 Err(EngineError::Unavailable {
2388 op: "promote_delayed",
2389 })
2390 }
2391
2392 /// Pending-waitpoint-expiry scanner hook — close a pending
2393 /// waitpoint whose deadline has passed (wake the suspended
2394 /// execution with a timeout signal).
2395 ///
2396 /// Valkey: `FCALL ff_close_waitpoint`.
2397 /// Postgres: Wave 9 schema scope (no current impl).
2398 async fn close_waitpoint(
2399 &self,
2400 _partition: crate::partition::Partition,
2401 _execution_id: &ExecutionId,
2402 _waitpoint_id: &str,
2403 _now_ms: TimestampMs,
2404 ) -> Result<(), EngineError> {
2405 Err(EngineError::Unavailable {
2406 op: "close_waitpoint",
2407 })
2408 }
2409
2410 /// Shared hook for the attempt-timeout and execution-deadline
2411 /// scanners — terminate or retry an execution whose wall-clock
2412 /// budget has elapsed. `phase` discriminates which of the two
2413 /// scanner paths is calling so the backend can preserve diagnostic
2414 /// breadcrumbs without forking the surface.
2415 ///
2416 /// Valkey: `FCALL ff_expire_execution` (with `phase` passed through
2417 /// as an ARGV discriminator).
2418 /// Postgres: single-row tx mirror of the Lua semantic for
2419 /// `AttemptTimeout`; `ExecutionDeadline` is Wave 9 schema scope.
2420 async fn expire_execution(
2421 &self,
2422 _partition: crate::partition::Partition,
2423 _execution_id: &ExecutionId,
2424 _phase: ExpirePhase,
2425 _now_ms: TimestampMs,
2426 ) -> Result<(), EngineError> {
2427 Err(EngineError::Unavailable {
2428 op: "expire_execution",
2429 })
2430 }
2431
2432 /// Suspension-timeout scanner hook — expire a suspended execution
2433 /// whose suspension deadline has passed (wake with timeout).
2434 ///
2435 /// Valkey: `FCALL ff_expire_suspension`.
2436 /// Postgres: single-row tx on `ff_suspend` + `ff_exec_core`.
2437 async fn expire_suspension(
2438 &self,
2439 _partition: crate::partition::Partition,
2440 _execution_id: &ExecutionId,
2441 _now_ms: TimestampMs,
2442 ) -> Result<(), EngineError> {
2443 Err(EngineError::Unavailable {
2444 op: "expire_suspension",
2445 })
2446 }
2447
2448 // ── PR-7b Cluster 2 — Reconciler scanner operations ─────────
2449 //
2450 // Per-execution write hooks invoked by the `unblock` scanner. The
2451 // other two cluster-2 scanners (`budget_reset`, `dependency_reconciler`)
2452 // route through `reset_budget` + `resolve_dependency`, which are
2453 // already part of the RFC-017 service-layer surface and live above.
2454
2455 /// Unblock-scanner hook — move an execution from a blocked ZSET back
2456 /// to `eligible_now` once its blocking condition has cleared (budget
2457 /// under limit, quota window drained, or a capable worker has come
2458 /// online). `expected_blocking_reason` discriminates which of the
2459 /// `blocked:{budget,quota,route}` sets the execution is leaving and
2460 /// also fences against a stale unblock (Lua rejects if the core's
2461 /// `blocking_reason` no longer matches).
2462 ///
2463 /// # Backend status
2464 ///
2465 /// - **Valkey:** wraps `ff_unblock_execution` (RFC-010 §6). Atomic
2466 /// single-slot FCALL on `{p:N}`.
2467 /// - **Postgres:** `Unavailable` (structural). PG does not persist a
2468 /// per-reason `blocked:{budget,quota,route}` index — scheduler
2469 /// eligibility is re-evaluated live via SQL predicates on
2470 /// `ff_exec_core` + budget / quota tables (see
2471 /// `ff_backend_postgres::scheduler`). Nothing to reconcile, so the
2472 /// engine's PG scanner loop does not run this path; callers on
2473 /// a PG deployment receive the typed `Unavailable` per PR-7b/final
2474 /// contract.
2475 /// - **SQLite:** `Unavailable` for the same reason as Postgres.
2476 async fn unblock_execution(
2477 &self,
2478 _partition: crate::partition::Partition,
2479 _lane_id: &LaneId,
2480 _execution_id: &ExecutionId,
2481 _expected_blocking_reason: &str,
2482 _now_ms: TimestampMs,
2483 ) -> Result<(), EngineError> {
2484 Err(EngineError::Unavailable {
2485 op: "unblock_execution",
2486 })
2487 }
2488
2489 /// RFC-016 Stage C — sibling-cancel group drain.
2490 ///
2491 /// After `edge_cancel_dispatcher` has issued
2492 /// [`EngineBackend::cancel_execution`] against every listed
2493 /// sibling of a `(flow_id, downstream_eid)` group, this trait
2494 /// method atomically removes the tuple from the partition-local
2495 /// pending-cancel-groups index and clears the per-group flag +
2496 /// members state.
2497 ///
2498 /// Valkey: `FCALL ff_drain_sibling_cancel_group` (SREM +
2499 /// HDEL in one Lua unit).
2500 /// Postgres: `Unavailable` — PG's Wave-6b
2501 /// `reconcilers::edge_cancel_dispatcher::dispatcher_tick`
2502 /// owns the equivalent row drain inside its own per-group
2503 /// transaction; the Valkey-shaped per-tuple call is not used on
2504 /// the PG scanner path.
2505 /// SQLite: `Unavailable` (mirrors PG; RFC-023 scope).
2506 #[cfg(feature = "core")]
2507 async fn drain_sibling_cancel_group(
2508 &self,
2509 _flow_partition: crate::partition::Partition,
2510 _flow_id: &FlowId,
2511 _downstream_eid: &ExecutionId,
2512 ) -> Result<(), EngineError> {
2513 Err(EngineError::Unavailable {
2514 op: "drain_sibling_cancel_group",
2515 })
2516 }
2517
2518 /// RFC-016 Stage D — sibling-cancel group reconcile (Invariant
2519 /// Q6 safety net).
2520 ///
2521 /// Re-examines a `(flow_id, downstream_eid)` tuple in the
2522 /// partition-local pending-cancel-groups index and returns one
2523 /// of three atomic dispositions — see
2524 /// [`SiblingCancelReconcileAction`] — so the crash-recovery
2525 /// reconciler can drain stale or completed tuples without
2526 /// fighting the Stage-C dispatcher.
2527 ///
2528 /// Valkey: `FCALL ff_reconcile_sibling_cancel_group`.
2529 /// Postgres: `Unavailable` — PG's
2530 /// `reconcilers::edge_cancel_reconciler::reconciler_tick`
2531 /// owns the row-level reconcile inside its own batched tx.
2532 /// SQLite: `Unavailable` (mirrors PG).
2533 #[cfg(feature = "core")]
2534 async fn reconcile_sibling_cancel_group(
2535 &self,
2536 _flow_partition: crate::partition::Partition,
2537 _flow_id: &FlowId,
2538 _downstream_eid: &ExecutionId,
2539 ) -> Result<SiblingCancelReconcileAction, EngineError> {
2540 Err(EngineError::Unavailable {
2541 op: "reconcile_sibling_cancel_group",
2542 })
2543 }
2544
2545 // ── PR-7b Cluster 2b-A — Tally-recompute reconciler scanners ─────
2546 //
2547 // Coarse trait methods: each runs a full scan-and-fix pass over
2548 // one partition. Unlike cluster-2's per-candidate write hooks,
2549 // these scanners do multi-round-trip discovery (SSCAN / HMGET /
2550 // HGETALL / ZSCORE / ZREMRANGEBYSCORE) interleaved with conditional
2551 // writes. Moving the whole pass into the trait impl achieves the
2552 // PR-7b/final contract ("scanner trait leaks no `ferriskey::Client`")
2553 // without atomising operations that are inherently not atomic.
2554 //
2555 // Backend status (all three methods):
2556 //
2557 // - **Valkey:** real scan-loop in the trait impl; identical command
2558 // shape to the pre-routing scanner path.
2559 // - **Postgres:** default `Unavailable`. PG mutates budget / quota
2560 // counters and scheduling state inside SERIALIZABLE transactions,
2561 // and the scheduler evaluates eligibility via live SQL predicates
2562 // on `ff_exec_core` + budget / quota tables — no persistent index
2563 // or counter hash that can drift, hence nothing to reconcile. The
2564 // engine's PG scanner supervisor does not run these paths.
2565 // - **SQLite:** same as Postgres (RFC-023: SQLite runs its own
2566 // scanner supervisor; these FF-owned tally-recompute scanners
2567 // are not registered).
2568
2569 /// Index-reconciler pass — walks `ff:idx:{p:N}:all_executions`
2570 /// and verifies each execution appears in the correct scheduling
2571 /// sorted set (`eligible` / `delayed` / `blocked:*` / `active` /
2572 /// `suspended` / `terminal`) for its current `(lifecycle_phase,
2573 /// eligibility_state, ownership_state)` triple. Phase 1 is
2574 /// log-only; auto-fix is deferred to a later phase (RFC-010 §6.14).
2575 async fn reconcile_execution_index(
2576 &self,
2577 _partition: crate::partition::Partition,
2578 _lanes: &[LaneId],
2579 _filter: &crate::backend::ScannerFilter,
2580 ) -> Result<ReconcileCounts, EngineError> {
2581 Err(EngineError::Unavailable {
2582 op: "reconcile_execution_index",
2583 })
2584 }
2585
2586 /// Budget-reconciler pass — walks `ff:budget:{b:M}:policies_idx`,
2587 /// reads each budget's definition / usage / limits, and reconciles
2588 /// the `breached_at` marker against hard limits. Resetting budgets
2589 /// (non-zero `reset_interval_ms`) are skipped — they are handled
2590 /// by the `budget_reset` scanner (cluster 2). Drops index entries
2591 /// for budgets whose definition hash has been deleted (retention
2592 /// purge / manual). RFC-008 §Budget Reconciliation, RFC-010 §6.5.
2593 async fn reconcile_budget_counters(
2594 &self,
2595 _partition: crate::partition::Partition,
2596 _now_ms: TimestampMs,
2597 ) -> Result<ReconcileCounts, EngineError> {
2598 Err(EngineError::Unavailable {
2599 op: "reconcile_budget_counters",
2600 })
2601 }
2602
2603 /// Quota-reconciler pass — walks `ff:quota:{q:M}:policies_idx`,
2604 /// trims expired entries from rate-limit sliding windows, and
2605 /// recomputes each policy's concurrency counter by walking its
2606 /// `admitted_set` and pruning entries whose admission guard key
2607 /// has TTLed out. RFC-008 §Quota Reconciliation, RFC-010 §6.6.
2608 async fn reconcile_quota_counters(
2609 &self,
2610 _partition: crate::partition::Partition,
2611 _now_ms: TimestampMs,
2612 ) -> Result<ReconcileCounts, EngineError> {
2613 Err(EngineError::Unavailable {
2614 op: "reconcile_quota_counters",
2615 })
2616 }
2617
2618 // ── PR-7b Cluster 2b-B — Projection + retention scanners ──────
2619 //
2620 // Two additional non-FCALL scanners routed through the trait so
2621 // engine-side scanners stop touching `ferriskey::Client` directly.
2622 // Both ride on aggregate reads + derived writes; neither is a thin
2623 // single-FCALL passthrough.
2624
2625 /// Flow-projector scanner hook — sample flow members, derive an
2626 /// aggregate `public_flow_state` + per-state counts, and write them
2627 /// to the flow summary projection. Returns `Ok(true)` when the
2628 /// summary was updated, `Ok(false)` when the flow had no members
2629 /// or the index entry was defensively pruned (core missing).
2630 ///
2631 /// The derived `public_flow_state` written here is distinct from
2632 /// `ff_flow_core.public_flow_state` — the former is a rollup
2633 /// dashboard field, the latter is the authoritative mutation-guard
2634 /// state owned by `create_flow` / `cancel_flow`. See
2635 /// `ff_engine::scanner::flow_projector` module doc for the
2636 /// two-sources contract.
2637 ///
2638 /// # Backend status
2639 ///
2640 /// - **Valkey:** lifts the pre-PR-7b Rust-composed
2641 /// SCARD + SRANDMEMBER + per-member HGET + HSET summary pattern.
2642 /// No new Lua function; the aggregation is inherently
2643 /// multi-round-trip (cross-partition member reads) and atomicity
2644 /// is neither required nor achievable against 256 partitions.
2645 /// - **Postgres:** SELECT aggregates from `ff_exec_core` grouped by
2646 /// `public_state` for the flow's members, INSERT ... ON CONFLICT
2647 /// DO UPDATE into `ff_flow_summary` (migration 0019). One query
2648 /// per flow; partition-local aggregation.
2649 /// - **SQLite:** `Unavailable` per RFC-023 Phase 3.5 (flow summary
2650 /// projection is a shared-deployment dashboard feature; local
2651 /// single-tenant SQLite deployments don't need it).
2652 #[cfg(feature = "core")]
2653 async fn project_flow_summary(
2654 &self,
2655 _partition: crate::partition::Partition,
2656 _flow_id: &FlowId,
2657 _now_ms: TimestampMs,
2658 ) -> Result<bool, EngineError> {
2659 Err(EngineError::Unavailable {
2660 op: "project_flow_summary",
2661 })
2662 }
2663
2664 /// Retention-trimmer scanner hook — delete terminal executions and
2665 /// all their subordinate keys/rows once they are older than the
2666 /// configured retention window. Returns the number of executions
2667 /// actually purged in this call (so the scanner can loop when it
2668 /// hits `batch_size`).
2669 ///
2670 /// Retention trimming is inherently a scan-and-delete loop over
2671 /// time; the trait surface exists to remove engine-side Valkey
2672 /// coupling, **not** to atomise the operation into a single
2673 /// round-trip. Implementations may issue multiple round-trips
2674 /// (e.g. per-execution cascade across sibling tables); the trait
2675 /// contract is "make progress, bounded by batch_size".
2676 ///
2677 /// # Backend status
2678 ///
2679 /// - **Valkey:** lifts the pre-PR-7b ZRANGEBYSCORE + per-exec
2680 /// cascade-delete loop verbatim. Cluster-safe (all keys for one
2681 /// execution live on the same `{p:N}` slot).
2682 /// - **Postgres:** `DELETE FROM ff_exec_core` for terminal rows
2683 /// past the cutoff plus explicit cascade DELETEs on every
2684 /// execution-scoped sibling table (no FK CASCADE in the schema).
2685 /// Single transaction per batch.
2686 /// - **SQLite:** `Unavailable` per RFC-023 Phase 3.5. Single-tenant
2687 /// local deployments typically manage their own DB lifecycle and
2688 /// do not need a retention scanner.
2689 #[cfg(feature = "core")]
2690 async fn trim_retention(
2691 &self,
2692 _partition: crate::partition::Partition,
2693 _lane_id: &LaneId,
2694 _retention_ms: u64,
2695 _now_ms: TimestampMs,
2696 _batch_size: u32,
2697 _filter: &crate::backend::ScannerFilter,
2698 ) -> Result<u32, EngineError> {
2699 Err(EngineError::Unavailable {
2700 op: "trim_retention",
2701 })
2702 }
2703
2704 // ── PR-7b Wave 0a: exec_core field read primitive ──
2705
2706 /// Point-read N fields from the `exec_core` hash for a given
2707 /// execution. Returns a map of field-name → Option<String>
2708 /// (None for fields absent or stored as NULL). Scanner call
2709 /// sites formerly issuing raw `HGET`/`HMGET` on `ExecKeyContext::core()`
2710 /// route through this trait method (cairn #436 / PR-7b Wave 0a).
2711 ///
2712 /// Field values are coerced to String at the trait boundary for
2713 /// wire compatibility with the Valkey HGET shape. Consumers parse
2714 /// specific fields (`lane_id`, `current_attempt_index`, etc.) from
2715 /// the returned strings as needed.
2716 ///
2717 /// - **Valkey:** `HMGET exec_core_key f1 f2 ...`, zipping names to values.
2718 /// - **Postgres:** `SELECT` against `ff_exec_core` with dynamic column
2719 /// extraction; fields in `raw_fields` JSONB are extracted via `->>`.
2720 /// - **SQLite:** equivalent with `json_extract(raw_fields, '$.field')`.
2721 ///
2722 /// Default body returns `Unavailable` so non-v0.13 backends remain
2723 /// compile-compatible.
2724 #[cfg(feature = "core")]
2725 async fn read_exec_core_fields(
2726 &self,
2727 _partition: crate::partition::Partition,
2728 _execution_id: &crate::types::ExecutionId,
2729 _fields: &[&str],
2730 ) -> Result<std::collections::HashMap<String, Option<String>>, EngineError> {
2731 Err(EngineError::Unavailable {
2732 op: "read_exec_core_fields",
2733 })
2734 }
2735
2736 // ── PR-7b Wave 0a: backend clock primitive ──
2737
2738 /// Returns the backend's current wall-clock epoch milliseconds.
2739 ///
2740 /// Used by 15 scanners to compute "due" thresholds before issuing
2741 /// per-partition due-scans. Previously a Valkey-only helper in
2742 /// `ff-engine`'s `scanner::lease_expiry` issuing `TIME`; this
2743 /// trait method is the backend-agnostic replacement (cairn #436 /
2744 /// PR-7b Wave 0a).
2745 ///
2746 /// Every in-tree backend overrides. The default falls back to
2747 /// `SystemTime::now()` so out-of-tree `EngineBackend` impls (e.g.
2748 /// cairn mocks, test doubles) stay source-compatible across v0.12
2749 /// → v0.13.
2750 ///
2751 /// - **Valkey:** `TIME` command.
2752 /// - **Postgres:** `SELECT (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint`
2753 /// (not `now()` — `now()` is the transaction start timestamp,
2754 /// which is stale under any long-running tx; scanners need the
2755 /// true wall-clock read).
2756 /// - **SQLite:** `SELECT CAST((julianday('now') - 2440587.5) * 86400000 AS INTEGER)`.
2757 #[cfg(feature = "core")]
2758 async fn server_time_ms(&self) -> Result<u64, EngineError> {
2759 use std::time::{SystemTime, UNIX_EPOCH};
2760 let d = SystemTime::now()
2761 .duration_since(UNIX_EPOCH)
2762 .map_err(|e| EngineError::Transport {
2763 backend: "system-clock",
2764 source: format!("server_time_ms default: {e}").into(),
2765 })?;
2766 Ok(d.as_millis() as u64)
2767 }
2768}
2769
2770/// RFC-016 Stage D outcome of a single
2771/// [`EngineBackend::reconcile_sibling_cancel_group`] call.
2772///
2773/// Cardinality is intentionally bounded to three so the
2774/// `ff_sibling_cancel_reconcile_total{action}` metric label stays
2775/// closed.
2776#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2777pub enum SiblingCancelReconcileAction {
2778 /// Pending tuple was stale (flag cleared or edgegroup absent);
2779 /// SREM'd / DELETE'd without touching the group.
2780 SremmedStale,
2781 /// Flag still set but every listed sibling is already terminal —
2782 /// an interrupted drain. Flag cleared + tuple drained.
2783 CompletedDrain,
2784 /// Flag set and at least one sibling non-terminal — dispatcher
2785 /// owns this tuple; left untouched.
2786 NoOp,
2787}
2788
2789impl SiblingCancelReconcileAction {
2790 /// Short label for observability (matches the Lua + PG action
2791 /// strings exactly).
2792 pub fn as_str(&self) -> &'static str {
2793 match self {
2794 Self::SremmedStale => "sremmed_stale",
2795 Self::CompletedDrain => "completed_drain",
2796 Self::NoOp => "no_op",
2797 }
2798 }
2799}
2800
2801/// Result of one reconciler scan pass over a single partition.
2802///
2803/// `processed` counts candidates where the reconciler detected a
2804/// correctable condition (drift, breach flip, stale guard). `errors`
2805/// counts transport / parse failures. Scanners sum these into the
2806/// engine's per-pass metrics exactly as they did before PR-7b/2b-A.
2807#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
2808pub struct ReconcileCounts {
2809 /// Items the scanner corrected (drift fix, breach flip, guard
2810 /// eviction). Zero when the partition is healthy.
2811 pub processed: u32,
2812 /// Items the scanner could not process due to transport or parse
2813 /// errors. Non-zero values surface through scanner metrics and
2814 /// should be investigated.
2815 pub errors: u32,
2816}
2817
2818/// Which scanner invoked [`EngineBackend::expire_execution`].
2819///
2820/// The attempt-timeout and execution-deadline scanners share a single
2821/// trait method — their underlying state flip is identical (terminate
2822/// or retry per retry policy); the distinction is purely diagnostic
2823/// (which deadline elapsed) and carried through to the backend so the
2824/// same Lua / SQL path can log or tag appropriately.
2825#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2826pub enum ExpirePhase {
2827 /// Invoked by `attempt_timeout` scanner — the per-attempt
2828 /// wall-clock budget elapsed.
2829 AttemptTimeout,
2830 /// Invoked by `execution_deadline` scanner — the whole-execution
2831 /// wall-clock deadline elapsed.
2832 ExecutionDeadline,
2833}
2834
2835impl ExpirePhase {
2836 /// Short string tag suitable for Lua ARGV or Postgres breadcrumbs.
2837 pub fn as_str(&self) -> &'static str {
2838 match self {
2839 Self::AttemptTimeout => "attempt_timeout",
2840 Self::ExecutionDeadline => "execution_deadline",
2841 }
2842 }
2843}
2844
2845/// Object-safety assertion: `dyn EngineBackend` compiles iff every
2846/// method is dyn-compatible. Kept as a compile-time guard so a future
2847/// trait change that accidentally breaks dyn-safety fails the build
2848/// at this site rather than at every downstream `Arc<dyn
2849/// EngineBackend>` use.
2850#[allow(dead_code)]
2851fn _assert_dyn_compatible(_: &dyn EngineBackend) {}
2852
2853/// Polling interval for [`wait_for_flow_cancellation`]. Tight enough
2854/// that a local single-node cancel cascade observes `cancelled` within
2855/// one or two polls; slack enough that a `WaitIndefinite` caller does
2856/// not hammer `describe_flow` on a live cluster.
2857const CANCEL_WAIT_POLL_INTERVAL: Duration = Duration::from_millis(100);
2858
2859/// Defensive ceiling for [`CancelFlowWait::WaitIndefinite`] — if the
2860/// reconciler cascade has not converged in five minutes, something is
2861/// wedged and returning `Timeout` is strictly more useful than blocking
2862/// forever. RFC-012 §3.1.1 expects real-world cascades to finish within
2863/// `reconciler_interval + grace`, which is orders of magnitude below
2864/// this.
2865const CANCEL_WAIT_INDEFINITE_CEILING: Duration = Duration::from_secs(300);
2866
2867/// Poll `backend.describe_flow(flow_id)` until `public_flow_state` is
2868/// `"cancelled"` or `deadline` elapses.
2869///
2870/// Shared by every backend's `cancel_flow` trait impl that honours
2871/// [`CancelFlowWait::WaitTimeout`] / [`CancelFlowWait::WaitIndefinite`].
2872/// The underlying `cancel_flow` FCALL / SQL transaction flips the
2873/// flow-level state synchronously; member cancellations dispatch
2874/// asynchronously via the reconciler, which also flips
2875/// `public_flow_state` to `cancelled` once the cascade completes. This
2876/// helper waits for that terminal flip.
2877///
2878/// Returns:
2879/// * `Ok(())` once `public_flow_state = "cancelled"` is observed.
2880/// * `Err(EngineError::Timeout { op: "cancel_flow", elapsed })` when
2881/// `deadline` elapses first. `elapsed` is the wait budget (the
2882/// requested timeout), not wall-clock precision.
2883/// * `Err(e)` if `describe_flow` itself errors (propagated).
2884pub async fn wait_for_flow_cancellation<B: EngineBackend + ?Sized>(
2885 backend: &B,
2886 flow_id: &crate::types::FlowId,
2887 deadline: Duration,
2888) -> Result<(), EngineError> {
2889 let start = std::time::Instant::now();
2890 loop {
2891 match backend.describe_flow(flow_id).await? {
2892 Some(snap) if snap.public_flow_state == "cancelled" => return Ok(()),
2893 // `None` (flow removed) is also terminal from the caller's
2894 // perspective — nothing left to wait on.
2895 None => return Ok(()),
2896 Some(_) => {}
2897 }
2898 if start.elapsed() >= deadline {
2899 return Err(EngineError::Timeout {
2900 op: "cancel_flow",
2901 elapsed: deadline,
2902 });
2903 }
2904 tokio::time::sleep(CANCEL_WAIT_POLL_INTERVAL).await;
2905 }
2906}
2907
2908/// Convert a [`CancelFlowWait`] into the deadline passed to
2909/// [`wait_for_flow_cancellation`]. `NoWait` returns `None` — the caller
2910/// must skip the wait entirely.
2911pub fn cancel_flow_wait_deadline(wait: CancelFlowWait) -> Option<Duration> {
2912 // `CancelFlowWait` is `#[non_exhaustive]`; this match lives in the
2913 // defining crate so the exhaustiveness check keeps the compiler
2914 // honest. Future variants must be wired here explicitly.
2915 match wait {
2916 CancelFlowWait::NoWait => None,
2917 CancelFlowWait::WaitTimeout(d) => Some(d),
2918 CancelFlowWait::WaitIndefinite => Some(CANCEL_WAIT_INDEFINITE_CEILING),
2919 }
2920}
2921
2922/// Validate a caller-namespaced tag key against the regex
2923/// `^[a-z][a-z0-9_]*\.[a-z0-9_][a-z0-9_.]*$`.
2924///
2925/// The Rust trait-side check is **stricter than the Valkey Lua
2926/// contracts** (`ff_set_execution_tags` / `ff_set_flow_tags`), which
2927/// only check `^[a-z][a-z0-9_]*%.[^.]` — namespace prefix + first
2928/// suffix char, with the rest of the suffix unvalidated. Every
2929/// backend-side impl (`ff-backend-{valkey,postgres,sqlite}`) calls
2930/// this helper **before** the wire hop so the effective parity
2931/// contract is this full-key regex; Valkey's Lua is an additional,
2932/// more permissive server-side guard, not the parity-of-record.
2933///
2934/// A key passes iff:
2935///
2936/// * it begins with an ASCII lowercase letter;
2937/// * all characters up to the first `.` are lowercase alnum or `_`;
2938/// * the first `.` is followed by at least one non-`.` character
2939/// (so `cairn.` and `cairn..x` fail — the `<field>` must be non-empty).
2940///
2941/// Shared entry point for [`EngineBackend::set_execution_tag`] /
2942/// [`EngineBackend::set_flow_tag`] / [`EngineBackend::get_execution_tag`] /
2943/// [`EngineBackend::get_flow_tag`] so every backend rejects the same
2944/// keyspace. On rejection, returns
2945/// [`EngineError::Validation { kind: ValidationKind::InvalidInput, .. }`](crate::engine_error::EngineError::Validation)
2946/// with the offending key in `detail`.
2947///
2948/// `#[allow(clippy::result_large_err)]` — `EngineError` is the uniform
2949/// error across the whole `EngineBackend` trait surface (see trait method
2950/// signatures above); boxing it here alone would introduce a
2951/// gratuitous signature deviation. Clippy 1.95 flags free functions but
2952/// not trait methods; this function mirrors the trait-method convention.
2953#[allow(clippy::result_large_err)]
2954pub fn validate_tag_key(key: &str) -> Result<(), EngineError> {
2955 use crate::engine_error::ValidationKind;
2956
2957 let bad = || EngineError::Validation {
2958 kind: ValidationKind::InvalidInput,
2959 detail: format!(
2960 "invalid tag key: {key:?} (must match ^[a-z][a-z0-9_]*\\.[a-z0-9_][a-z0-9_.]*$)"
2961 ),
2962 };
2963
2964 let mut chars = key.chars();
2965 let first = chars.next().ok_or_else(bad)?;
2966 if !first.is_ascii_lowercase() {
2967 return Err(bad());
2968 }
2969 // Phase 1: consume the namespace prefix up to (but not including) the first `.`.
2970 // Chars must be `[a-z0-9_]`.
2971 let mut saw_dot = false;
2972 for c in chars.by_ref() {
2973 if c == '.' {
2974 saw_dot = true;
2975 break;
2976 }
2977 if !(c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_') {
2978 return Err(bad());
2979 }
2980 }
2981 if !saw_dot {
2982 return Err(bad());
2983 }
2984 // Phase 2: the first char after the first `.` must exist and be a
2985 // non-dot member char `[a-z0-9_]` (matches Valkey's Lua regex
2986 // `^[a-z][a-z0-9_]*%.[^.]` — a second consecutive dot is rejected).
2987 let second = chars.next().ok_or_else(bad)?;
2988 if !(second.is_ascii_lowercase() || second.is_ascii_digit() || second == '_') {
2989 return Err(bad());
2990 }
2991 // Phase 3 (Finding 2 tightening): every remaining char MUST be
2992 // `[a-z0-9_.]`. Before this, the suffix was unvalidated beyond
2993 // its first char — so `cairn.foo bar`, `cairn.Foo`,
2994 // `cairn.foo"bar`, `cairn.foo-bar` all passed trait-side validation
2995 // even though they would break SQLite JSON-path quoting, the Lua
2996 // HSET field-name conventions, or consumer grep patterns. Valkey's
2997 // Lua prefix regex is identically permissive on the suffix; this
2998 // tightens both layers from the Rust side. Dots in the suffix
2999 // remain legal (`app.sub.field` is valid) to preserve the existing
3000 // accepted shape.
3001 for c in chars {
3002 if !(c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_' || c == '.') {
3003 return Err(bad());
3004 }
3005 }
3006 Ok(())
3007}
3008
3009#[cfg(test)]
3010mod tests {
3011 use super::*;
3012
3013 /// A zero-state backend stub used to exercise the default
3014 /// `capabilities()` impl without pulling in a real
3015 /// transport. Only the default method is under test here; every
3016 /// other method is unreachable on this type.
3017 struct DefaultBackend;
3018
3019 #[async_trait]
3020 impl EngineBackend for DefaultBackend {
3021 async fn claim(
3022 &self,
3023 _lane: &LaneId,
3024 _capabilities: &CapabilitySet,
3025 _policy: ClaimPolicy,
3026 ) -> Result<Option<Handle>, EngineError> {
3027 unreachable!()
3028 }
3029 async fn renew(&self, _handle: &Handle) -> Result<LeaseRenewal, EngineError> {
3030 unreachable!()
3031 }
3032 async fn progress(
3033 &self,
3034 _handle: &Handle,
3035 _percent: Option<u8>,
3036 _message: Option<String>,
3037 ) -> Result<(), EngineError> {
3038 unreachable!()
3039 }
3040 async fn append_frame(
3041 &self,
3042 _handle: &Handle,
3043 _frame: Frame,
3044 ) -> Result<AppendFrameOutcome, EngineError> {
3045 unreachable!()
3046 }
3047 async fn complete(
3048 &self,
3049 _handle: &Handle,
3050 _payload: Option<Vec<u8>>,
3051 ) -> Result<(), EngineError> {
3052 unreachable!()
3053 }
3054 async fn fail(
3055 &self,
3056 _handle: &Handle,
3057 _reason: FailureReason,
3058 _classification: FailureClass,
3059 ) -> Result<FailOutcome, EngineError> {
3060 unreachable!()
3061 }
3062 async fn cancel(&self, _handle: &Handle, _reason: &str) -> Result<(), EngineError> {
3063 unreachable!()
3064 }
3065 async fn suspend(
3066 &self,
3067 _handle: &Handle,
3068 _args: SuspendArgs,
3069 ) -> Result<SuspendOutcome, EngineError> {
3070 unreachable!()
3071 }
3072 async fn create_waitpoint(
3073 &self,
3074 _handle: &Handle,
3075 _waitpoint_key: &str,
3076 _expires_in: Duration,
3077 ) -> Result<PendingWaitpoint, EngineError> {
3078 unreachable!()
3079 }
3080 async fn observe_signals(
3081 &self,
3082 _handle: &Handle,
3083 ) -> Result<Vec<ResumeSignal>, EngineError> {
3084 unreachable!()
3085 }
3086 async fn claim_from_resume_grant(
3087 &self,
3088 _token: ResumeToken,
3089 ) -> Result<Option<Handle>, EngineError> {
3090 unreachable!()
3091 }
3092 async fn delay(
3093 &self,
3094 _handle: &Handle,
3095 _delay_until: TimestampMs,
3096 ) -> Result<(), EngineError> {
3097 unreachable!()
3098 }
3099 async fn wait_children(&self, _handle: &Handle) -> Result<(), EngineError> {
3100 unreachable!()
3101 }
3102 async fn describe_execution(
3103 &self,
3104 _id: &ExecutionId,
3105 ) -> Result<Option<ExecutionSnapshot>, EngineError> {
3106 unreachable!()
3107 }
3108 async fn read_execution_context(
3109 &self,
3110 _execution_id: &ExecutionId,
3111 ) -> Result<ExecutionContext, EngineError> {
3112 Ok(ExecutionContext::new(
3113 Vec::new(),
3114 String::new(),
3115 std::collections::HashMap::new(),
3116 ))
3117 }
3118 async fn read_current_attempt_index(
3119 &self,
3120 _execution_id: &ExecutionId,
3121 ) -> Result<AttemptIndex, EngineError> {
3122 Ok(AttemptIndex::new(0))
3123 }
3124 async fn read_total_attempt_count(
3125 &self,
3126 _execution_id: &ExecutionId,
3127 ) -> Result<AttemptIndex, EngineError> {
3128 Ok(AttemptIndex::new(0))
3129 }
3130 async fn describe_flow(
3131 &self,
3132 _id: &FlowId,
3133 ) -> Result<Option<FlowSnapshot>, EngineError> {
3134 unreachable!()
3135 }
3136 #[cfg(feature = "core")]
3137 async fn list_edges(
3138 &self,
3139 _flow_id: &FlowId,
3140 _direction: EdgeDirection,
3141 ) -> Result<Vec<EdgeSnapshot>, EngineError> {
3142 unreachable!()
3143 }
3144 #[cfg(feature = "core")]
3145 async fn describe_edge(
3146 &self,
3147 _flow_id: &FlowId,
3148 _edge_id: &EdgeId,
3149 ) -> Result<Option<EdgeSnapshot>, EngineError> {
3150 unreachable!()
3151 }
3152 #[cfg(feature = "core")]
3153 async fn resolve_execution_flow_id(
3154 &self,
3155 _eid: &ExecutionId,
3156 ) -> Result<Option<FlowId>, EngineError> {
3157 unreachable!()
3158 }
3159 #[cfg(feature = "core")]
3160 async fn list_flows(
3161 &self,
3162 _partition: PartitionKey,
3163 _cursor: Option<FlowId>,
3164 _limit: usize,
3165 ) -> Result<ListFlowsPage, EngineError> {
3166 unreachable!()
3167 }
3168 #[cfg(feature = "core")]
3169 async fn list_lanes(
3170 &self,
3171 _cursor: Option<LaneId>,
3172 _limit: usize,
3173 ) -> Result<ListLanesPage, EngineError> {
3174 unreachable!()
3175 }
3176 #[cfg(feature = "core")]
3177 async fn list_suspended(
3178 &self,
3179 _partition: PartitionKey,
3180 _cursor: Option<ExecutionId>,
3181 _limit: usize,
3182 ) -> Result<ListSuspendedPage, EngineError> {
3183 unreachable!()
3184 }
3185 #[cfg(feature = "core")]
3186 async fn list_executions(
3187 &self,
3188 _partition: PartitionKey,
3189 _cursor: Option<ExecutionId>,
3190 _limit: usize,
3191 ) -> Result<ListExecutionsPage, EngineError> {
3192 unreachable!()
3193 }
3194 #[cfg(feature = "core")]
3195 async fn deliver_signal(
3196 &self,
3197 _args: DeliverSignalArgs,
3198 ) -> Result<DeliverSignalResult, EngineError> {
3199 unreachable!()
3200 }
3201 #[cfg(feature = "core")]
3202 async fn claim_resumed_execution(
3203 &self,
3204 _args: ClaimResumedExecutionArgs,
3205 ) -> Result<ClaimResumedExecutionResult, EngineError> {
3206 unreachable!()
3207 }
3208 async fn cancel_flow(
3209 &self,
3210 _id: &FlowId,
3211 _policy: CancelFlowPolicy,
3212 _wait: CancelFlowWait,
3213 ) -> Result<CancelFlowResult, EngineError> {
3214 unreachable!()
3215 }
3216 #[cfg(feature = "core")]
3217 async fn set_edge_group_policy(
3218 &self,
3219 _flow_id: &FlowId,
3220 _downstream_execution_id: &ExecutionId,
3221 _policy: crate::contracts::EdgeDependencyPolicy,
3222 ) -> Result<crate::contracts::SetEdgeGroupPolicyResult, EngineError> {
3223 unreachable!()
3224 }
3225 async fn report_usage(
3226 &self,
3227 _handle: &Handle,
3228 _budget: &BudgetId,
3229 _dimensions: crate::backend::UsageDimensions,
3230 ) -> Result<ReportUsageResult, EngineError> {
3231 unreachable!()
3232 }
3233 #[cfg(feature = "streaming")]
3234 async fn read_stream(
3235 &self,
3236 _execution_id: &ExecutionId,
3237 _attempt_index: AttemptIndex,
3238 _from: StreamCursor,
3239 _to: StreamCursor,
3240 _count_limit: u64,
3241 ) -> Result<StreamFrames, EngineError> {
3242 unreachable!()
3243 }
3244 #[cfg(feature = "streaming")]
3245 async fn tail_stream(
3246 &self,
3247 _execution_id: &ExecutionId,
3248 _attempt_index: AttemptIndex,
3249 _after: StreamCursor,
3250 _block_ms: u64,
3251 _count_limit: u64,
3252 _visibility: TailVisibility,
3253 ) -> Result<StreamFrames, EngineError> {
3254 unreachable!()
3255 }
3256 #[cfg(feature = "streaming")]
3257 async fn read_summary(
3258 &self,
3259 _execution_id: &ExecutionId,
3260 _attempt_index: AttemptIndex,
3261 ) -> Result<Option<SummaryDocument>, EngineError> {
3262 unreachable!()
3263 }
3264 }
3265
3266 /// The default `capabilities()` impl returns a value tagged
3267 /// `family = "unknown"` with every `supports.*` bool false, so
3268 /// pre-RFC-018 out-of-tree backends keep compiling and consumers
3269 /// can distinguish "backend predates RFC-018" from "backend
3270 /// reports concrete bools." Every concrete in-tree backend
3271 /// overrides.
3272 #[test]
3273 fn default_capabilities_is_unknown_family_all_false() {
3274 let b = DefaultBackend;
3275 let caps = b.capabilities();
3276 assert_eq!(caps.identity.family, "unknown");
3277 assert_eq!(
3278 caps.identity.version,
3279 crate::capability::Version::new(0, 0, 0)
3280 );
3281 assert_eq!(caps.identity.rfc017_stage, "unknown");
3282 // Every field false on the default (matches `Supports::none()`).
3283 assert_eq!(caps.supports, crate::capability::Supports::none());
3284 }
3285
3286 // ── resolve_dependency (PR-7b Step 0) ──
3287
3288 /// Default impl returns `Unavailable { op: "resolve_dependency" }`
3289 /// so pre-migration backends surface a typed Unsupported-grade
3290 /// error rather than panic. Both cluster 2's dependency_reconciler
3291 /// and cluster 4's completion_listener route through this method;
3292 /// the default is the safety net for non-Valkey deployments.
3293 #[cfg(feature = "core")]
3294 #[tokio::test]
3295 async fn default_resolve_dependency_is_unavailable() {
3296 use crate::contracts::ResolveDependencyArgs;
3297 use crate::partition::{Partition, PartitionFamily};
3298 use crate::types::{AttemptIndex, EdgeId, FlowId, LaneId};
3299
3300 let b = DefaultBackend;
3301 let partition = Partition {
3302 family: PartitionFamily::Flow,
3303 index: 0,
3304 };
3305 let args = ResolveDependencyArgs::new(
3306 partition,
3307 FlowId::parse("11111111-1111-1111-1111-111111111111").unwrap(),
3308 ExecutionId::parse("{fp:0}:22222222-2222-2222-2222-222222222222").unwrap(),
3309 ExecutionId::parse("{fp:0}:33333333-3333-3333-3333-333333333333").unwrap(),
3310 EdgeId::parse("44444444-4444-4444-4444-444444444444").unwrap(),
3311 LaneId::new("default"),
3312 AttemptIndex::new(0),
3313 "success".to_owned(),
3314 TimestampMs::now(),
3315 );
3316 match b.resolve_dependency(args).await {
3317 Err(EngineError::Unavailable { op }) => {
3318 assert_eq!(op, "resolve_dependency");
3319 }
3320 other => panic!("expected Unavailable, got {other:?}"),
3321 }
3322 }
3323
3324 // ── cascade_completion (PR-7b Cluster 4) ──
3325
3326 /// Default impl returns `Unavailable { op: "cascade_completion" }`
3327 /// so pre-migration / non-core builds surface a typed error rather
3328 /// than panic. The push-based completion listener routes through
3329 /// this method; the default is the safety net for deployments
3330 /// whose backend doesn't cascade (e.g. SQLite in the current
3331 /// Wave 9 scope).
3332 #[cfg(feature = "core")]
3333 #[tokio::test]
3334 async fn default_cascade_completion_is_unavailable() {
3335 use crate::backend::CompletionPayload;
3336
3337 let b = DefaultBackend;
3338 let eid = ExecutionId::parse(
3339 "{fp:0}:66666666-6666-6666-6666-666666666666",
3340 )
3341 .unwrap();
3342 let payload = CompletionPayload::new(eid, "success", None, TimestampMs::now());
3343 match b.cascade_completion(&payload).await {
3344 Err(EngineError::Unavailable { op }) => {
3345 assert_eq!(op, "cascade_completion");
3346 }
3347 other => panic!("expected Unavailable, got {other:?}"),
3348 }
3349 }
3350
3351 // ── unblock_execution (PR-7b Cluster 2) ──
3352
3353 /// Default impl returns `Unavailable { op: "unblock_execution" }`
3354 /// so non-Valkey deployments get a typed `Unavailable` rather than
3355 /// a panic. The engine scanner loop on PG/SQLite skips this path
3356 /// entirely (scheduler eligibility is re-evaluated live via SQL
3357 /// predicates), but a stray direct call must still fail gracefully.
3358 #[cfg(feature = "core")]
3359 #[tokio::test]
3360 async fn default_unblock_execution_is_unavailable() {
3361 use crate::partition::{Partition, PartitionFamily};
3362
3363 let b = DefaultBackend;
3364 let partition = Partition {
3365 family: PartitionFamily::Execution,
3366 index: 0,
3367 };
3368 let eid = ExecutionId::parse(
3369 "{fp:0}:55555555-5555-5555-5555-555555555555",
3370 )
3371 .unwrap();
3372 let lane = LaneId::new("default");
3373 match b
3374 .unblock_execution(
3375 partition,
3376 &lane,
3377 &eid,
3378 "waiting_for_budget",
3379 TimestampMs::from_millis(0),
3380 )
3381 .await
3382 {
3383 Err(EngineError::Unavailable { op }) => {
3384 assert_eq!(op, "unblock_execution");
3385 }
3386 other => panic!("expected Unavailable, got {other:?}"),
3387 }
3388 }
3389
3390 // ── project_flow_summary (PR-7b Cluster 2b-B) ──
3391
3392 /// Default impl returns `Unavailable { op: "project_flow_summary" }`
3393 /// so non-Valkey deployments get a typed `Unavailable` rather than
3394 /// a panic. SQLite rides the default per RFC-023 Phase 3.5.
3395 #[cfg(feature = "core")]
3396 #[tokio::test]
3397 async fn default_project_flow_summary_is_unavailable() {
3398 use crate::partition::{Partition, PartitionFamily};
3399 use crate::types::FlowId;
3400
3401 let b = DefaultBackend;
3402 let partition = Partition {
3403 family: PartitionFamily::Flow,
3404 index: 0,
3405 };
3406 let fid = FlowId::parse("11111111-1111-1111-1111-111111111111").unwrap();
3407 match b
3408 .project_flow_summary(partition, &fid, TimestampMs::from_millis(0))
3409 .await
3410 {
3411 Err(EngineError::Unavailable { op }) => {
3412 assert_eq!(op, "project_flow_summary");
3413 }
3414 other => panic!("expected Unavailable, got {other:?}"),
3415 }
3416 }
3417
3418 // ── trim_retention (PR-7b Cluster 2b-B) ──
3419
3420 /// Default impl returns `Unavailable { op: "trim_retention" }` so
3421 /// non-Valkey deployments get a typed `Unavailable` rather than a
3422 /// panic. SQLite rides the default per RFC-023 Phase 3.5.
3423 #[cfg(feature = "core")]
3424 #[tokio::test]
3425 async fn default_trim_retention_is_unavailable() {
3426 use crate::partition::{Partition, PartitionFamily};
3427
3428 let b = DefaultBackend;
3429 let partition = Partition {
3430 family: PartitionFamily::Execution,
3431 index: 0,
3432 };
3433 let lane = LaneId::new("default");
3434 match b
3435 .trim_retention(
3436 partition,
3437 &lane,
3438 60_000,
3439 TimestampMs::from_millis(0),
3440 20,
3441 &crate::backend::ScannerFilter::NOOP,
3442 )
3443 .await
3444 {
3445 Err(EngineError::Unavailable { op }) => {
3446 assert_eq!(op, "trim_retention");
3447 }
3448 other => panic!("expected Unavailable, got {other:?}"),
3449 }
3450 }
3451
3452 // ── read_exec_core_fields (PR-7b Wave 0a) ──
3453
3454 /// Default impl returns `Unavailable { op: "read_exec_core_fields" }`
3455 /// so out-of-tree backends that pre-date v0.13 keep compiling while
3456 /// surfacing a typed error on call. Empty-field input short-circuits
3457 /// to `Ok(empty map)` in all in-tree impls; the default is triggered
3458 /// only when the trait method itself is un-overridden.
3459 #[cfg(feature = "core")]
3460 #[tokio::test]
3461 async fn default_read_exec_core_fields_is_unavailable() {
3462 use crate::partition::{Partition, PartitionFamily};
3463
3464 let b = DefaultBackend;
3465 let partition = Partition {
3466 family: PartitionFamily::Execution,
3467 index: 0,
3468 };
3469 let eid = ExecutionId::parse(
3470 "{fp:0}:66666666-6666-6666-6666-666666666666",
3471 )
3472 .unwrap();
3473 match b
3474 .read_exec_core_fields(partition, &eid, &["lane_id"])
3475 .await
3476 {
3477 Err(EngineError::Unavailable { op }) => {
3478 assert_eq!(op, "read_exec_core_fields");
3479 }
3480 other => panic!("expected Unavailable, got {other:?}"),
3481 }
3482 }
3483
3484 // ── validate_tag_key (issue #433) ──
3485
3486 #[test]
3487 fn validate_tag_key_accepts_valid() {
3488 for k in [
3489 "cairn.session_id",
3490 "cairn.project",
3491 "a.b",
3492 "a1_2.x",
3493 "app.sub.field",
3494 "x.y_z",
3495 ] {
3496 validate_tag_key(k).unwrap_or_else(|e| panic!("{k:?} should pass: {e:?}"));
3497 }
3498 }
3499
3500 #[test]
3501 fn validate_tag_key_rejects_invalid() {
3502 for k in [
3503 "", // empty
3504 "Cairn.x", // uppercase first
3505 "1cairn.x", // leading digit
3506 "cairn", // no dot
3507 "cairn.", // empty suffix
3508 "cairn..x", // dot immediately after first dot
3509 ".cairn", // leading dot
3510 "cair n.x", // space before dot
3511 "ca-irn.x", // hyphen in prefix
3512 // Finding 2 tightening — suffix now fully validated.
3513 "cairn.Foo", // uppercase in suffix
3514 "cairn.foo bar", // space in suffix
3515 "cairn.foo\"bar", // double-quote in suffix (would break SQLite JSON-path quoting)
3516 "cairn.foo-bar", // hyphen in suffix
3517 "cairn.foo\\bar", // backslash in suffix
3518 ] {
3519 let err = validate_tag_key(k)
3520 .err()
3521 .unwrap_or_else(|| panic!("{k:?} should fail"));
3522 match err {
3523 EngineError::Validation {
3524 kind: crate::engine_error::ValidationKind::InvalidInput,
3525 ..
3526 } => {}
3527 other => panic!("{k:?}: unexpected err {other:?}"),
3528 }
3529 }
3530 }
3531
3532 // ── cairn #389: service-layer FCALL trait-method defaults ──
3533 //
3534 // Each method MUST return `EngineError::Unavailable { op: "<name>" }`
3535 // on backends that haven't overridden it, so out-of-tree backends
3536 // keep compiling and consumers get a terminal-classified error
3537 // rather than a panic. Mirrors the precedent used by
3538 // `issue_reclaim_grant` / `reclaim_execution` / `suspend_by_triple`.
3539 //
3540 // Feature-gated on `core` because the methods under test only
3541 // exist on the trait under that feature — matches the precedent
3542 // of `default_resolve_dependency_is_unavailable` above.
3543
3544 #[cfg(feature = "core")]
3545 #[tokio::test]
3546 async fn default_complete_execution_is_unavailable() {
3547 use crate::contracts::CompleteExecutionArgs;
3548 use crate::types::{ExecutionId, FlowId};
3549 let b = DefaultBackend;
3550 let config = crate::partition::PartitionConfig::default();
3551 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3552 let args = CompleteExecutionArgs {
3553 execution_id: eid,
3554 fence: None,
3555 attempt_index: AttemptIndex::new(0),
3556 result_payload: None,
3557 result_encoding: None,
3558 source: crate::types::CancelSource::default(),
3559 now: TimestampMs::from_millis(0),
3560 };
3561 match b.complete_execution(args).await.unwrap_err() {
3562 EngineError::Unavailable { op } => assert_eq!(op, "complete_execution"),
3563 other => panic!("expected Unavailable, got {other:?}"),
3564 }
3565 }
3566
3567 #[cfg(feature = "core")]
3568 #[tokio::test]
3569 async fn default_fail_execution_is_unavailable() {
3570 use crate::contracts::FailExecutionArgs;
3571 use crate::types::{ExecutionId, FlowId};
3572 let b = DefaultBackend;
3573 let config = crate::partition::PartitionConfig::default();
3574 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3575 let args = FailExecutionArgs {
3576 execution_id: eid,
3577 fence: None,
3578 attempt_index: AttemptIndex::new(0),
3579 failure_reason: String::new(),
3580 failure_category: String::new(),
3581 retry_policy_json: String::new(),
3582 next_attempt_policy_json: String::new(),
3583 source: crate::types::CancelSource::default(),
3584 };
3585 match b.fail_execution(args).await.unwrap_err() {
3586 EngineError::Unavailable { op } => assert_eq!(op, "fail_execution"),
3587 other => panic!("expected Unavailable, got {other:?}"),
3588 }
3589 }
3590
3591 #[cfg(feature = "core")]
3592 #[tokio::test]
3593 async fn default_renew_lease_is_unavailable() {
3594 use crate::contracts::RenewLeaseArgs;
3595 use crate::types::{ExecutionId, FlowId};
3596 let b = DefaultBackend;
3597 let config = crate::partition::PartitionConfig::default();
3598 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3599 let args = RenewLeaseArgs {
3600 execution_id: eid,
3601 attempt_index: AttemptIndex::new(0),
3602 fence: None,
3603 lease_ttl_ms: 1_000,
3604 lease_history_grace_ms: 60_000,
3605 };
3606 match b.renew_lease(args).await.unwrap_err() {
3607 EngineError::Unavailable { op } => assert_eq!(op, "renew_lease"),
3608 other => panic!("expected Unavailable, got {other:?}"),
3609 }
3610 }
3611
3612 #[cfg(feature = "core")]
3613 #[tokio::test]
3614 async fn default_resume_execution_is_unavailable() {
3615 use crate::contracts::ResumeExecutionArgs;
3616 use crate::types::{ExecutionId, FlowId};
3617 let b = DefaultBackend;
3618 let config = crate::partition::PartitionConfig::default();
3619 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3620 let args = ResumeExecutionArgs {
3621 execution_id: eid,
3622 trigger_type: "signal".to_owned(),
3623 resume_delay_ms: 0,
3624 };
3625 match b.resume_execution(args).await.unwrap_err() {
3626 EngineError::Unavailable { op } => assert_eq!(op, "resume_execution"),
3627 other => panic!("expected Unavailable, got {other:?}"),
3628 }
3629 }
3630
3631 #[cfg(feature = "core")]
3632 #[tokio::test]
3633 async fn default_check_admission_is_unavailable() {
3634 use crate::contracts::CheckAdmissionArgs;
3635 use crate::types::{ExecutionId, FlowId, QuotaPolicyId};
3636 let b = DefaultBackend;
3637 let config = crate::partition::PartitionConfig::default();
3638 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3639 let args = CheckAdmissionArgs {
3640 execution_id: eid,
3641 now: TimestampMs::from_millis(0),
3642 window_seconds: 60,
3643 rate_limit: 10,
3644 concurrency_cap: 1,
3645 jitter_ms: None,
3646 };
3647 let qid = QuotaPolicyId::new();
3648 match b
3649 .check_admission(&qid, "default", args)
3650 .await
3651 .unwrap_err()
3652 {
3653 EngineError::Unavailable { op } => assert_eq!(op, "check_admission"),
3654 other => panic!("expected Unavailable, got {other:?}"),
3655 }
3656 }
3657
3658 #[cfg(feature = "core")]
3659 #[tokio::test]
3660 async fn default_evaluate_flow_eligibility_is_unavailable() {
3661 use crate::contracts::EvaluateFlowEligibilityArgs;
3662 use crate::types::{ExecutionId, FlowId};
3663 let b = DefaultBackend;
3664 let config = crate::partition::PartitionConfig::default();
3665 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3666 let args = EvaluateFlowEligibilityArgs { execution_id: eid };
3667 match b.evaluate_flow_eligibility(args).await.unwrap_err() {
3668 EngineError::Unavailable { op } => assert_eq!(op, "evaluate_flow_eligibility"),
3669 other => panic!("expected Unavailable, got {other:?}"),
3670 }
3671 }
3672
3673 // ── Cluster 2b-A tally-recompute reconcilers ──
3674
3675 #[cfg(feature = "core")]
3676 #[tokio::test]
3677 async fn default_reconcile_execution_index_is_unavailable() {
3678 use crate::backend::ScannerFilter;
3679 use crate::partition::{Partition, PartitionFamily};
3680 let b = DefaultBackend;
3681 let partition = Partition { family: PartitionFamily::Execution, index: 0 };
3682 let lanes = [LaneId::new("default")];
3683 let filter = ScannerFilter::default();
3684 match b
3685 .reconcile_execution_index(partition, &lanes, &filter)
3686 .await
3687 .unwrap_err()
3688 {
3689 EngineError::Unavailable { op } => assert_eq!(op, "reconcile_execution_index"),
3690 other => panic!("expected Unavailable, got {other:?}"),
3691 }
3692 }
3693
3694 #[cfg(feature = "core")]
3695 #[tokio::test]
3696 async fn default_reconcile_budget_counters_is_unavailable() {
3697 use crate::partition::{Partition, PartitionFamily};
3698 let b = DefaultBackend;
3699 let partition = Partition { family: PartitionFamily::Budget, index: 0 };
3700 match b
3701 .reconcile_budget_counters(partition, TimestampMs::from_millis(0))
3702 .await
3703 .unwrap_err()
3704 {
3705 EngineError::Unavailable { op } => assert_eq!(op, "reconcile_budget_counters"),
3706 other => panic!("expected Unavailable, got {other:?}"),
3707 }
3708 }
3709
3710 #[cfg(feature = "core")]
3711 #[tokio::test]
3712 async fn default_reconcile_quota_counters_is_unavailable() {
3713 use crate::partition::{Partition, PartitionFamily};
3714 let b = DefaultBackend;
3715 let partition = Partition { family: PartitionFamily::Quota, index: 0 };
3716 match b
3717 .reconcile_quota_counters(partition, TimestampMs::from_millis(0))
3718 .await
3719 .unwrap_err()
3720 {
3721 EngineError::Unavailable { op } => assert_eq!(op, "reconcile_quota_counters"),
3722 other => panic!("expected Unavailable, got {other:?}"),
3723 }
3724 }
3725
3726 // ── #454 trait-additions default-impl tests (4) ────────────
3727
3728 #[cfg(feature = "core")]
3729 #[tokio::test]
3730 async fn default_record_spend_is_unavailable() {
3731 use crate::contracts::RecordSpendArgs;
3732 use crate::types::{BudgetId, ExecutionId, FlowId};
3733 let b = DefaultBackend;
3734 let config = crate::partition::PartitionConfig::default();
3735 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3736 let args = RecordSpendArgs::new(
3737 BudgetId::new(),
3738 eid,
3739 std::collections::BTreeMap::new(),
3740 "k",
3741 );
3742 match b.record_spend(args).await.unwrap_err() {
3743 EngineError::Unavailable { op } => assert_eq!(op, "record_spend"),
3744 other => panic!("expected Unavailable, got {other:?}"),
3745 }
3746 }
3747
3748 #[cfg(feature = "core")]
3749 #[tokio::test]
3750 async fn default_release_budget_is_unavailable() {
3751 use crate::contracts::ReleaseBudgetArgs;
3752 use crate::types::{BudgetId, ExecutionId, FlowId};
3753 let b = DefaultBackend;
3754 let config = crate::partition::PartitionConfig::default();
3755 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3756 let args = ReleaseBudgetArgs::new(BudgetId::new(), eid);
3757 match b.release_budget(args).await.unwrap_err() {
3758 EngineError::Unavailable { op } => assert_eq!(op, "release_budget"),
3759 other => panic!("expected Unavailable, got {other:?}"),
3760 }
3761 }
3762
3763 #[cfg(feature = "core")]
3764 #[tokio::test]
3765 async fn default_deliver_approval_signal_is_unavailable() {
3766 use crate::contracts::DeliverApprovalSignalArgs;
3767 use crate::types::{ExecutionId, FlowId, LaneId, WaitpointId};
3768 let b = DefaultBackend;
3769 let config = crate::partition::PartitionConfig::default();
3770 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3771 let args = DeliverApprovalSignalArgs::new(
3772 eid,
3773 LaneId::new("default"),
3774 WaitpointId::new(),
3775 "approved",
3776 "sfx",
3777 1_000,
3778 Some(100),
3779 Some(10),
3780 );
3781 match b.deliver_approval_signal(args).await.unwrap_err() {
3782 EngineError::Unavailable { op } => assert_eq!(op, "deliver_approval_signal"),
3783 other => panic!("expected Unavailable, got {other:?}"),
3784 }
3785 }
3786
3787 #[cfg(feature = "core")]
3788 #[tokio::test]
3789 async fn default_issue_grant_and_claim_is_unavailable() {
3790 use crate::contracts::IssueGrantAndClaimArgs;
3791 use crate::types::{ExecutionId, FlowId, LaneId};
3792 let b = DefaultBackend;
3793 let config = crate::partition::PartitionConfig::default();
3794 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
3795 let args = IssueGrantAndClaimArgs::new(eid, LaneId::new("default"), 30_000);
3796 match b.issue_grant_and_claim(args).await.unwrap_err() {
3797 EngineError::Unavailable { op } => assert_eq!(op, "issue_grant_and_claim"),
3798 other => panic!("expected Unavailable, got {other:?}"),
3799 }
3800 }
3801}