Skip to main content

ff_core/contracts/
mod.rs

1//! Phase 1 function contracts — Args + Result types for each FCALL.
2//!
3//! Each Args struct defines the typed inputs to a Valkey Function.
4//! Each Result enum defines the possible outcomes (success variants + error codes).
5
6pub mod decode;
7
8use crate::policy::ExecutionPolicy;
9use crate::state::{AttemptType, PublicState, StateVector};
10use crate::types::{
11    AttemptId, AttemptIndex, BudgetId, CancelSource, EdgeId, ExecutionId, FlowId, LaneId, LeaseEpoch,
12    LeaseFence, LeaseId, Namespace, SignalId, SuspensionId, TimestampMs, WaitpointId,
13    WaitpointToken, WorkerId, WorkerInstanceId,
14};
15use serde::{Deserialize, Serialize};
16use std::collections::{BTreeMap, BTreeSet, HashMap};
17
18// ─── create_execution ───
19
20#[derive(Clone, Debug, Serialize, Deserialize)]
21pub struct CreateExecutionArgs {
22    pub execution_id: ExecutionId,
23    pub namespace: Namespace,
24    pub lane_id: LaneId,
25    pub execution_kind: String,
26    pub input_payload: Vec<u8>,
27    #[serde(default)]
28    pub payload_encoding: Option<String>,
29    pub priority: i32,
30    pub creator_identity: String,
31    #[serde(default)]
32    pub idempotency_key: Option<String>,
33    #[serde(default)]
34    pub tags: HashMap<String, String>,
35    /// Execution policy (retry, timeout, suspension, routing, etc.).
36    #[serde(default)]
37    pub policy: Option<ExecutionPolicy>,
38    /// If set and in the future, execution starts delayed.
39    #[serde(default)]
40    pub delay_until: Option<TimestampMs>,
41    /// Absolute deadline timestamp (ms). Execution expires if not complete by this time.
42    #[serde(default)]
43    pub execution_deadline_at: Option<TimestampMs>,
44    /// Partition ID (pre-computed).
45    pub partition_id: u16,
46    pub now: TimestampMs,
47}
48
49#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
50pub enum CreateExecutionResult {
51    /// Execution created successfully.
52    Created {
53        execution_id: ExecutionId,
54        public_state: PublicState,
55    },
56    /// Idempotent duplicate — existing execution returned.
57    Duplicate { execution_id: ExecutionId },
58}
59
60// ─── issue_claim_grant ───
61
62/// Inputs to [`crate::engine_backend::EngineBackend::issue_claim_grant`]
63/// — the trait-level entry point v0.12 PR-5 lifted out of the SDK-side
64/// `FlowFabricWorker::claim_next` inline helper.
65///
66/// `#[non_exhaustive]` + `::new` per
67/// `feedback_non_exhaustive_needs_constructor`: future fields may be
68/// added in minor releases; consumers MUST construct via
69/// [`Self::new`] and populate optional fields (`capability_hash`,
70/// `route_snapshot_json`, `admission_summary`) by direct field
71/// assignment on the returned value. Struct-literal construction is
72/// blocked by `#[non_exhaustive]`; `..Default::default()` is not
73/// available for the same reason.
74///
75/// Carries the execution's [`crate::partition::Partition`] so the
76/// Valkey backend can derive `exec_core` / `claim_grant` / the lane's
77/// `eligible_zset` KEYS without a second round-trip.
78///
79/// Does NOT derive `Serialize` / `Deserialize` — this is a
80/// trait-boundary args struct, not a wire-format type; the
81/// `#[non_exhaustive]` marker already blocks cross-crate struct-
82/// literal construction, which matters more than JSON round-trip
83/// for a scanner hot-path primitive.
84#[derive(Clone, Debug)]
85#[non_exhaustive]
86pub struct IssueClaimGrantArgs {
87    pub execution_id: ExecutionId,
88    pub lane_id: LaneId,
89    pub worker_id: WorkerId,
90    pub worker_instance_id: WorkerInstanceId,
91    /// Partition context for KEYS derivation. v0.12 PR-5.
92    pub partition: crate::partition::Partition,
93    pub capability_hash: Option<String>,
94    pub route_snapshot_json: Option<String>,
95    pub admission_summary: Option<String>,
96    /// Capabilities this worker advertises. Serialized as a sorted,
97    /// comma-separated string to the Lua FCALL (see scheduling.lua
98    /// ff_issue_claim_grant). An empty set matches only executions whose
99    /// `required_capabilities` is also empty.
100    pub worker_capabilities: BTreeSet<String>,
101    pub grant_ttl_ms: u64,
102    /// Caller-side timestamp for bookkeeping. NOT passed to the Lua FCALL —
103    /// ff_issue_claim_grant uses `redis.call("TIME")` for grant_expires_at.
104    pub now: TimestampMs,
105}
106
107impl IssueClaimGrantArgs {
108    /// Construct an `IssueClaimGrantArgs`. Added alongside
109    /// `#[non_exhaustive]` per `feedback_non_exhaustive_needs_constructor`
110    /// so the SDK worker (and any future caller) can build the args
111    /// without the struct literal becoming a cross-crate breaking
112    /// change on every minor release.
113    #[allow(clippy::too_many_arguments)]
114    pub fn new(
115        execution_id: ExecutionId,
116        lane_id: LaneId,
117        worker_id: WorkerId,
118        worker_instance_id: WorkerInstanceId,
119        partition: crate::partition::Partition,
120        worker_capabilities: BTreeSet<String>,
121        grant_ttl_ms: u64,
122        now: TimestampMs,
123    ) -> Self {
124        Self {
125            execution_id,
126            lane_id,
127            worker_id,
128            worker_instance_id,
129            partition,
130            capability_hash: None,
131            route_snapshot_json: None,
132            admission_summary: None,
133            worker_capabilities,
134            grant_ttl_ms,
135            now,
136        }
137    }
138}
139
140/// Typed outcome of [`crate::engine_backend::EngineBackend::issue_claim_grant`].
141///
142/// Single-variant today — the Valkey FCALL either writes the grant
143/// and returns `Granted`, or the Lua reject (capability mismatch,
144/// already-granted, etc.) surfaces as a typed [`crate::engine_error::EngineError`]
145/// on the outer `Result`. `#[non_exhaustive]` reserves room for
146/// future additive variants without a breaking match-arm churn on
147/// consumers.
148#[derive(Clone, Debug, PartialEq, Eq)]
149#[non_exhaustive]
150pub enum IssueClaimGrantOutcome {
151    /// Grant issued.
152    Granted { execution_id: ExecutionId },
153}
154
155/// Legacy name for `IssueClaimGrantOutcome` — retained for
156/// `ff-script`'s `FromFcallResult` plumbing. Prefer
157/// [`IssueClaimGrantOutcome`] in trait-level code.
158#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
159pub enum IssueClaimGrantResult {
160    /// Grant issued.
161    Granted { execution_id: ExecutionId },
162}
163
164// ─── scan_eligible_executions + block_route (v0.12 PR-5) ───
165
166/// Inputs to [`crate::engine_backend::EngineBackend::scan_eligible_executions`].
167///
168/// Lifted from the SDK-side `ZRANGEBYSCORE` inline on the
169/// `claim_next` scanner (v0.12 PR-5). The backend reads the lane's
170/// eligible ZSET on the given partition and returns up to `limit`
171/// execution ids in priority order (Valkey: score = `-(priority *
172/// 1e12) + created_at_ms`, so `+inf`-bounded ZRANGEBYSCORE with
173/// `LIMIT 0 <limit>` yields highest-priority-first).
174#[derive(Clone, Debug)]
175#[non_exhaustive]
176pub struct ScanEligibleArgs {
177    pub lane_id: LaneId,
178    pub partition: crate::partition::Partition,
179    /// Maximum number of execution ids to return. Backends MAY
180    /// return fewer when the partition has less work.
181    pub limit: u32,
182}
183
184impl ScanEligibleArgs {
185    /// Construct a `ScanEligibleArgs`. Added alongside
186    /// `#[non_exhaustive]` per
187    /// `feedback_non_exhaustive_needs_constructor`.
188    pub fn new(
189        lane_id: LaneId,
190        partition: crate::partition::Partition,
191        limit: u32,
192    ) -> Self {
193        Self {
194            lane_id,
195            partition,
196            limit,
197        }
198    }
199}
200
201/// Inputs to [`crate::engine_backend::EngineBackend::block_route`].
202///
203/// Lifted from the SDK-side `ff_block_execution_for_admission`
204/// inline helper on the `claim_next` scanner (v0.12 PR-5). Moves an
205/// execution from the lane's eligible ZSET into its blocked_route
206/// ZSET after a `CapabilityMismatch` reject — the engine's unblock
207/// scanner promotes blocked_route back to eligible once a worker
208/// with matching caps registers.
209#[derive(Clone, Debug)]
210#[non_exhaustive]
211pub struct BlockRouteArgs {
212    pub execution_id: ExecutionId,
213    pub lane_id: LaneId,
214    pub partition: crate::partition::Partition,
215    /// Free-form block reason code (e.g. `"waiting_for_capable_worker"`).
216    pub reason_code: String,
217    /// Human-readable reason detail for operator logs.
218    pub reason_detail: String,
219    pub now: TimestampMs,
220}
221
222impl BlockRouteArgs {
223    /// Construct a `BlockRouteArgs`.
224    pub fn new(
225        execution_id: ExecutionId,
226        lane_id: LaneId,
227        partition: crate::partition::Partition,
228        reason_code: String,
229        reason_detail: String,
230        now: TimestampMs,
231    ) -> Self {
232        Self {
233            execution_id,
234            lane_id,
235            partition,
236            reason_code,
237            reason_detail,
238            now,
239        }
240    }
241}
242
243/// Typed outcome of [`crate::engine_backend::EngineBackend::block_route`].
244///
245/// `LuaRejected` captures the logical-reject case (e.g. the execution
246/// went terminal between pick and block — eligible ZSET is left
247/// unchanged and the caller should simply `continue` to the next
248/// partition). Transport faults surface on the outer `Result` as
249/// [`crate::engine_error::EngineError::Transport`]; callers that
250/// want the pre-PR-5 "best-effort, log-and-continue" semantic wrap
251/// the call in a `match` and swallow non-success variants.
252#[derive(Clone, Debug, PartialEq, Eq)]
253#[non_exhaustive]
254pub enum BlockRouteOutcome {
255    /// Execution moved from eligible → blocked_route successfully.
256    Blocked { execution_id: ExecutionId },
257    /// Lua returned a non-success result (e.g. execution went
258    /// terminal between pick and block). `message` carries the Lua
259    /// reject code for operator visibility.
260    LuaRejected { message: String },
261}
262
263/// A claim grant issued by the scheduler for a specific execution.
264///
265/// The worker uses this to call `ff_claim_execution` (or
266/// `ff_acquire_lease`), which atomically consumes the grant and
267/// creates the lease.
268///
269/// Shared wire-level type between `ff-scheduler` (issuer) and
270/// `ff-sdk` (consumer, via `FlowFabricWorker::claim_from_grant`).
271/// Lives in `ff-core` so neither crate needs a dep on the other.
272///
273/// **Lane asymmetry with [`ResumeGrant`]:** `ClaimGrant` does NOT
274/// carry `lane_id`. The issuing scheduler's caller already picked
275/// a lane (that's how admission reached this grant) and passes it
276/// through to `claim_from_grant` as a separate argument. The grant
277/// handle stays narrow to what uniquely identifies the admission
278/// decision. The matching field on [`ResumeGrant`] is an
279/// intentional divergence — see the note on that type.
280#[derive(Clone, Debug, PartialEq, Eq)]
281#[non_exhaustive]
282pub struct ClaimGrant {
283    /// The execution that was granted.
284    pub execution_id: ExecutionId,
285    /// Opaque partition handle for this execution's hash-tag slot.
286    ///
287    /// Public wire type: consumers pass it back to FlowFabric but
288    /// must not parse the interior hash tag for routing decisions.
289    /// Internal consumers that need the typed
290    /// [`crate::partition::Partition`] call [`Self::partition`].
291    pub partition_key: crate::partition::PartitionKey,
292    /// The Valkey key holding the grant hash (for the worker to
293    /// reference).
294    pub grant_key: String,
295    /// When the grant expires if not consumed.
296    pub expires_at_ms: u64,
297}
298
299impl ClaimGrant {
300    /// Construct a fresh-claim grant. Added alongside `#[non_exhaustive]`
301    /// per RFC-024 §3.1 + `feedback_non_exhaustive_needs_constructor`.
302    pub fn new(
303        execution_id: ExecutionId,
304        partition_key: crate::partition::PartitionKey,
305        grant_key: String,
306        expires_at_ms: u64,
307    ) -> Self {
308        Self {
309            execution_id,
310            partition_key,
311            grant_key,
312            expires_at_ms,
313        }
314    }
315
316    /// Parse `partition_key` into a typed
317    /// [`crate::partition::Partition`]. Intended for internal
318    /// consumers (scheduler emitter, SDK worker claim path) that
319    /// need the family/index pair. Fails only on malformed keys
320    /// (which indicates a producer bug).
321    ///
322    /// Alias collapse applies: a grant issued against `Execution`
323    /// family round-trips to `Flow` (see [`crate::partition::PartitionKey`]
324    /// for the rationale — routing is preserved, only the metadata
325    /// family label normalises).
326    pub fn partition(
327        &self,
328    ) -> Result<crate::partition::Partition, crate::partition::PartitionKeyParseError> {
329        self.partition_key.parse()
330    }
331}
332
333/// A resume grant issued for a resumed (attempt_interrupted) execution.
334///
335/// Issued by a producer (typically `ff-scheduler` once a Batch-C
336/// reclaim scanner is in place; test fixtures in the interim — no
337/// production Rust caller exists in-tree today). Consumed by
338/// [`FlowFabricWorker::claim_from_resume_grant`], which calls
339/// `ff_claim_resumed_execution` atomically: that FCALL validates the
340/// grant, consumes it, and transitions `attempt_interrupted` →
341/// `started` while preserving the existing `attempt_index` +
342/// `attempt_id` (a resumed execution re-uses its attempt; it does
343/// not start a new one).
344///
345/// **Naming history (RFC-024).** This type was historically called
346/// `ReclaimGrant`, but its semantic has always been resume-after-
347/// suspend (the routing FCALL is `ff_claim_resumed_execution`, not
348/// `ff_reclaim_execution`). RFC-024 PR-A renamed the type to
349/// `ResumeGrant` — the name now matches the semantic. RFC-024 PR-B+C
350/// dropped the transitional `ReclaimGrant = ResumeGrant` alias and
351/// introduced a distinct new [`ReclaimGrant`] for the lease-reclaim
352/// path (`reclaim_execution` / `ff_reclaim_execution`).
353///
354/// Mirrors [`ClaimGrant`] for the resume path. Differences:
355///
356///   * [`ClaimGrant`] is issued against a freshly-eligible
357///     execution and `ff_claim_execution` creates a new attempt.
358///   * `ResumeGrant` is issued against an `attempt_interrupted`
359///     execution; `ff_claim_resumed_execution` re-uses the existing
360///     attempt and bumps the lease epoch.
361///
362/// The grant itself is written to the same `claim_grant` Valkey key
363/// that [`ClaimGrant`] uses; the distinction is which Lua FCALL
364/// consumes it (`ff_claim_execution` for new attempts,
365/// `ff_claim_resumed_execution` for resumes).
366///
367/// **Lane asymmetry with [`ClaimGrant`]:** `ResumeGrant` CARRIES
368/// `lane_id` as a field. The issuing path already knows the lane
369/// (it's read from `exec_core` at grant time); carrying it here
370/// spares the consumer a `HGET exec_core lane_id` round trip on
371/// the hot claim path. The asymmetry is intentional — prefer
372/// one-fewer-HGET on a type that already lives with the resumer's
373/// lifecycle over strict handle symmetry with `ClaimGrant`.
374///
375/// Shared wire-level type between the eventual `ff-scheduler`
376/// producer (Batch-C reclaim scanner — not yet in-tree; test
377/// fixtures construct this type today) and `ff-sdk` (consumer, via
378/// `FlowFabricWorker::claim_from_resume_grant`). Lives in
379/// `ff-core` so neither crate needs a dep on the other.
380///
381/// [`FlowFabricWorker::claim_from_resume_grant`]: https://docs.rs/ff-sdk
382#[derive(Clone, Debug, PartialEq, Eq)]
383#[non_exhaustive]
384pub struct ResumeGrant {
385    /// The execution granted for resumption.
386    pub execution_id: ExecutionId,
387    /// Opaque partition handle for this execution's hash-tag slot.
388    ///
389    /// Same wire-opacity contract as [`ClaimGrant::partition_key`].
390    /// Internal consumers call [`Self::partition`] for the parsed
391    /// form.
392    pub partition_key: crate::partition::PartitionKey,
393    /// Valkey key of the grant hash — same key shape as
394    /// [`ClaimGrant`].
395    pub grant_key: String,
396    /// Monotonic ms when the grant expires; unconsumed grants
397    /// vanish.
398    pub expires_at_ms: u64,
399    /// Lane the execution belongs to. Needed by
400    /// `ff_claim_resumed_execution` for `KEYS[3]` (eligible_zset)
401    /// and `KEYS[9]` (active_index).
402    pub lane_id: LaneId,
403}
404
405impl ResumeGrant {
406    /// Construct a resume grant. Added alongside `#[non_exhaustive]`
407    /// per RFC-024 §3.1 + `feedback_non_exhaustive_needs_constructor`.
408    pub fn new(
409        execution_id: ExecutionId,
410        partition_key: crate::partition::PartitionKey,
411        grant_key: String,
412        expires_at_ms: u64,
413        lane_id: LaneId,
414    ) -> Self {
415        Self {
416            execution_id,
417            partition_key,
418            grant_key,
419            expires_at_ms,
420            lane_id,
421        }
422    }
423
424    /// Parse `partition_key` into a typed
425    /// [`crate::partition::Partition`]. See [`ClaimGrant::partition`]
426    /// for the alias-collapse note.
427    pub fn partition(
428        &self,
429    ) -> Result<crate::partition::Partition, crate::partition::PartitionKeyParseError> {
430        self.partition_key.parse()
431    }
432}
433
434/// A lease-reclaim grant issued for an execution in
435/// `lease_expired_reclaimable` or `lease_revoked` state (RFC-024 §3.1).
436///
437/// Distinct from [`ResumeGrant`]: the reclaim grant routes to
438/// `ff_reclaim_execution` (Valkey) / the new-attempt reclaim impl
439/// (PG/SQLite), which creates a NEW attempt row and bumps the
440/// execution's `lease_reclaim_count`. The resume grant, by contrast,
441/// re-uses the existing attempt under `ff_claim_resumed_execution`.
442///
443/// Carries `lane_id` for symmetry with [`ResumeGrant`] — the Lua
444/// `ff_reclaim_execution` needs the lane for key construction, and
445/// the consuming worker would otherwise pay a round-trip to recover
446/// it from `exec_core`.
447///
448/// Backend impl bodies ship under PR-D (PG) / PR-E (SQLite) / PR-F
449/// (Valkey). This PR lands only the type + trait surface; default
450/// [`crate::engine_backend::EngineBackend::issue_reclaim_grant`] and
451/// [`crate::engine_backend::EngineBackend::reclaim_execution`] return
452/// [`crate::engine_error::EngineError::Unavailable`] until each
453/// backend PR wires its real body.
454#[derive(Clone, Debug, PartialEq, Eq)]
455#[non_exhaustive]
456pub struct ReclaimGrant {
457    /// The execution granted for lease-reclaim.
458    pub execution_id: ExecutionId,
459    /// Opaque partition handle for this execution's hash-tag slot.
460    pub partition_key: crate::partition::PartitionKey,
461    /// Backend-scoped grant key (Valkey key / PG+SQLite
462    /// `ff_claim_grant.grant_id`).
463    pub grant_key: String,
464    /// Monotonic ms when the grant expires; unconsumed grants vanish.
465    pub expires_at_ms: u64,
466    /// Lane the execution belongs to — needed by
467    /// `ff_reclaim_execution` for `KEYS[*]` construction.
468    pub lane_id: LaneId,
469}
470
471impl ReclaimGrant {
472    /// Construct a reclaim grant. Added alongside `#[non_exhaustive]`
473    /// per RFC-024 §3.1 + `feedback_non_exhaustive_needs_constructor`.
474    pub fn new(
475        execution_id: ExecutionId,
476        partition_key: crate::partition::PartitionKey,
477        grant_key: String,
478        expires_at_ms: u64,
479        lane_id: LaneId,
480    ) -> Self {
481        Self {
482            execution_id,
483            partition_key,
484            grant_key,
485            expires_at_ms,
486            lane_id,
487        }
488    }
489
490    /// Parse `partition_key` into a typed
491    /// [`crate::partition::Partition`].
492    pub fn partition(
493        &self,
494    ) -> Result<crate::partition::Partition, crate::partition::PartitionKeyParseError> {
495        self.partition_key.parse()
496    }
497}
498
499// ─── claim_execution ───
500
501#[derive(Clone, Debug, Serialize, Deserialize)]
502#[non_exhaustive]
503pub struct ClaimExecutionArgs {
504    pub execution_id: ExecutionId,
505    pub worker_id: WorkerId,
506    pub worker_instance_id: WorkerInstanceId,
507    pub lane_id: LaneId,
508    pub lease_id: LeaseId,
509    pub lease_ttl_ms: u64,
510    pub attempt_id: AttemptId,
511    /// Expected attempt index (pre-read from exec_core.total_attempt_count).
512    /// Used for KEYS construction — must match what the Lua computes.
513    pub expected_attempt_index: AttemptIndex,
514    /// JSON-encoded attempt policy snapshot.
515    #[serde(default)]
516    pub attempt_policy_json: String,
517    /// Per-attempt timeout in ms.
518    #[serde(default)]
519    pub attempt_timeout_ms: Option<u64>,
520    /// Total execution deadline (absolute timestamp ms).
521    #[serde(default)]
522    pub execution_deadline_at: Option<i64>,
523    pub now: TimestampMs,
524}
525
526impl ClaimExecutionArgs {
527    /// Construct a `ClaimExecutionArgs`. Added alongside
528    /// `#[non_exhaustive]` per `feedback_non_exhaustive_needs_constructor`
529    /// so consumers (SDK worker, backend impls) can still build the
530    /// args after the struct was sealed for forward-compat.
531    #[allow(clippy::too_many_arguments)]
532    pub fn new(
533        execution_id: ExecutionId,
534        worker_id: WorkerId,
535        worker_instance_id: WorkerInstanceId,
536        lane_id: LaneId,
537        lease_id: LeaseId,
538        lease_ttl_ms: u64,
539        attempt_id: AttemptId,
540        expected_attempt_index: AttemptIndex,
541        attempt_policy_json: String,
542        attempt_timeout_ms: Option<u64>,
543        execution_deadline_at: Option<i64>,
544        now: TimestampMs,
545    ) -> Self {
546        Self {
547            execution_id,
548            worker_id,
549            worker_instance_id,
550            lane_id,
551            lease_id,
552            lease_ttl_ms,
553            attempt_id,
554            expected_attempt_index,
555            attempt_policy_json,
556            attempt_timeout_ms,
557            execution_deadline_at,
558            now,
559        }
560    }
561}
562
563#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
564#[non_exhaustive]
565pub struct ClaimedExecution {
566    pub execution_id: ExecutionId,
567    pub lease_id: LeaseId,
568    pub lease_epoch: LeaseEpoch,
569    pub attempt_index: AttemptIndex,
570    pub attempt_id: AttemptId,
571    pub attempt_type: AttemptType,
572    pub lease_expires_at: TimestampMs,
573    /// Backend-populated attempt handle for this claim (v0.12 PR-5.5).
574    /// Valkey fills in an encoded `HandleKind::Fresh`; PG/SQLite are
575    /// `Unavailable` on `claim_execution` at runtime per
576    /// `project_claim_from_grant_pg_sqlite_gap.md`, so the field stays
577    /// a stub on those paths.
578    #[serde(default = "crate::backend::stub_handle_fresh")]
579    pub handle: crate::backend::Handle,
580}
581
582impl ClaimedExecution {
583    /// Construct a `ClaimedExecution`. Added alongside
584    /// `#[non_exhaustive]` per `feedback_non_exhaustive_needs_constructor`
585    /// so consumers (backend impls building a claim outcome) can still
586    /// build the struct after it was sealed for forward-compat.
587    #[allow(clippy::too_many_arguments)]
588    pub fn new(
589        execution_id: ExecutionId,
590        lease_id: LeaseId,
591        lease_epoch: LeaseEpoch,
592        attempt_index: AttemptIndex,
593        attempt_id: AttemptId,
594        attempt_type: AttemptType,
595        lease_expires_at: TimestampMs,
596        handle: crate::backend::Handle,
597    ) -> Self {
598        Self {
599            execution_id,
600            lease_id,
601            lease_epoch,
602            attempt_index,
603            attempt_id,
604            attempt_type,
605            lease_expires_at,
606            handle,
607        }
608    }
609}
610
611/// Typed outcome of [`crate::engine_backend::EngineBackend::claim_execution`].
612///
613/// Single-variant today; `#[non_exhaustive]` reserves room for future
614/// outcomes (e.g. an explicit `NoGrant` variant if RFC-024 splits it
615/// out of `InvalidClaimGrant`) without a breaking match-arm churn on
616/// consumers.
617#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
618#[non_exhaustive]
619pub enum ClaimExecutionResult {
620    /// Successfully claimed.
621    Claimed(ClaimedExecution),
622}
623
624// ─── complete_execution ───
625
626#[derive(Clone, Debug, Serialize, Deserialize)]
627pub struct CompleteExecutionArgs {
628    pub execution_id: ExecutionId,
629    /// RFC #58.5 — fence triple. `Some` for SDK worker paths (standard
630    /// stale-lease fence). `None` for operator overrides, in which case
631    /// `source` must be `CancelSource::OperatorOverride` or the Lua
632    /// returns `fence_required`.
633    #[serde(default)]
634    pub fence: Option<LeaseFence>,
635    pub attempt_index: AttemptIndex,
636    #[serde(default)]
637    pub result_payload: Option<Vec<u8>>,
638    #[serde(default)]
639    pub result_encoding: Option<String>,
640    /// RFC #58.5 — unfenced-call gate. Ignored when `fence` is `Some`.
641    #[serde(default)]
642    pub source: CancelSource,
643    pub now: TimestampMs,
644}
645
646#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
647pub enum CompleteExecutionResult {
648    /// Execution completed successfully.
649    Completed {
650        execution_id: ExecutionId,
651        public_state: PublicState,
652    },
653}
654
655// ─── renew_lease ───
656
657#[derive(Clone, Debug, Serialize, Deserialize)]
658pub struct RenewLeaseArgs {
659    pub execution_id: ExecutionId,
660    pub attempt_index: AttemptIndex,
661    /// RFC #58.5 — fence triple. Required (no operator override path for
662    /// renew). `None` returns `fence_required`.
663    pub fence: Option<LeaseFence>,
664    /// How long to extend the lease (milliseconds).
665    pub lease_ttl_ms: u64,
666    /// Grace period after lease_expires_at before the lease_current key is auto-deleted.
667    #[serde(default = "default_lease_history_grace_ms")]
668    pub lease_history_grace_ms: u64,
669}
670
671fn default_lease_history_grace_ms() -> u64 {
672    60_000
673}
674
675#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
676pub enum RenewLeaseResult {
677    /// Lease renewed.
678    Renewed { expires_at: TimestampMs },
679}
680
681// ─── mark_lease_expired_if_due ───
682
683#[derive(Clone, Debug, Serialize, Deserialize)]
684pub struct MarkLeaseExpiredArgs {
685    pub execution_id: ExecutionId,
686}
687
688#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
689pub enum MarkLeaseExpiredResult {
690    /// Lease was marked as expired.
691    MarkedExpired,
692    /// No action needed (already expired, not yet due, not active, etc.).
693    AlreadySatisfied { reason: String },
694}
695
696// ─── cancel_execution ───
697
698#[derive(Clone, Debug, Serialize, Deserialize)]
699pub struct CancelExecutionArgs {
700    pub execution_id: ExecutionId,
701    pub reason: String,
702    #[serde(default)]
703    pub source: CancelSource,
704    /// Required if not operator_override and execution is active.
705    #[serde(default)]
706    pub lease_id: Option<LeaseId>,
707    #[serde(default)]
708    pub lease_epoch: Option<LeaseEpoch>,
709    /// Required if not operator_override and execution is active.
710    #[serde(default)]
711    pub attempt_id: Option<AttemptId>,
712    pub now: TimestampMs,
713}
714
715#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
716pub enum CancelExecutionResult {
717    /// Execution cancelled.
718    Cancelled {
719        execution_id: ExecutionId,
720        public_state: PublicState,
721    },
722}
723
724// ─── revoke_lease ───
725
726#[derive(Clone, Debug, Serialize, Deserialize)]
727pub struct RevokeLeaseArgs {
728    pub execution_id: ExecutionId,
729    /// If set, only revoke if this matches the current lease. Empty string skips check.
730    #[serde(default)]
731    pub expected_lease_id: Option<String>,
732    /// Worker instance whose lease set to clean up. Read from exec_core before calling.
733    pub worker_instance_id: WorkerInstanceId,
734    pub reason: String,
735}
736
737#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
738pub enum RevokeLeaseResult {
739    /// Lease revoked.
740    Revoked {
741        lease_id: String,
742        lease_epoch: String,
743    },
744    /// Already revoked or expired — no action needed.
745    AlreadySatisfied { reason: String },
746}
747
748// ─── delay_execution ───
749
750#[derive(Clone, Debug, Serialize, Deserialize)]
751pub struct DelayExecutionArgs {
752    pub execution_id: ExecutionId,
753    /// RFC #58.5 — fence triple. `None` requires `source ==
754    /// CancelSource::OperatorOverride`.
755    #[serde(default)]
756    pub fence: Option<LeaseFence>,
757    pub attempt_index: AttemptIndex,
758    pub delay_until: TimestampMs,
759    #[serde(default)]
760    pub source: CancelSource,
761    pub now: TimestampMs,
762}
763
764#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
765pub enum DelayExecutionResult {
766    /// Execution delayed.
767    Delayed {
768        execution_id: ExecutionId,
769        public_state: PublicState,
770    },
771}
772
773// ─── move_to_waiting_children ───
774
775#[derive(Clone, Debug, Serialize, Deserialize)]
776pub struct MoveToWaitingChildrenArgs {
777    pub execution_id: ExecutionId,
778    /// RFC #58.5 — fence triple. `None` requires `source ==
779    /// CancelSource::OperatorOverride`.
780    #[serde(default)]
781    pub fence: Option<LeaseFence>,
782    pub attempt_index: AttemptIndex,
783    #[serde(default)]
784    pub source: CancelSource,
785    pub now: TimestampMs,
786}
787
788#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
789pub enum MoveToWaitingChildrenResult {
790    /// Moved to waiting children.
791    Moved {
792        execution_id: ExecutionId,
793        public_state: PublicState,
794    },
795}
796
797// ─── change_priority ───
798
799#[derive(Clone, Debug, Serialize, Deserialize)]
800pub struct ChangePriorityArgs {
801    pub execution_id: ExecutionId,
802    pub new_priority: i32,
803    pub lane_id: LaneId,
804    pub now: TimestampMs,
805}
806
807#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
808pub enum ChangePriorityResult {
809    /// Priority changed and re-scored.
810    Changed { execution_id: ExecutionId },
811}
812
813// ─── update_progress ───
814
815#[derive(Clone, Debug, Serialize, Deserialize)]
816pub struct UpdateProgressArgs {
817    pub execution_id: ExecutionId,
818    pub lease_id: LeaseId,
819    pub lease_epoch: LeaseEpoch,
820    pub attempt_id: AttemptId,
821    #[serde(default)]
822    pub progress_pct: Option<u8>,
823    #[serde(default)]
824    pub progress_message: Option<String>,
825    pub now: TimestampMs,
826}
827
828#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
829pub enum UpdateProgressResult {
830    /// Progress updated.
831    Updated,
832}
833
834// ═══════════════════════════════════════════════════════════════════════
835// Phase 2 contracts: fail, reclaim, expire
836// ═══════════════════════════════════════════════════════════════════════
837
838// ─── fail_execution ───
839
840#[derive(Clone, Debug, Serialize, Deserialize)]
841pub struct FailExecutionArgs {
842    pub execution_id: ExecutionId,
843    /// RFC #58.5 — fence triple. `None` requires `source ==
844    /// CancelSource::OperatorOverride`.
845    #[serde(default)]
846    pub fence: Option<LeaseFence>,
847    pub attempt_index: AttemptIndex,
848    pub failure_reason: String,
849    pub failure_category: String,
850    /// JSON-encoded retry policy (from execution policy). Empty = no retries.
851    #[serde(default)]
852    pub retry_policy_json: String,
853    /// JSON-encoded attempt policy for the next retry attempt.
854    #[serde(default)]
855    pub next_attempt_policy_json: String,
856    #[serde(default)]
857    pub source: CancelSource,
858}
859
860/// Outcome of a fail_execution call.
861#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
862pub enum FailExecutionResult {
863    /// Retry was scheduled — execution is delayed with backoff.
864    RetryScheduled {
865        delay_until: TimestampMs,
866        next_attempt_index: AttemptIndex,
867    },
868    /// No retries left — execution is terminal failed.
869    TerminalFailed,
870}
871
872// ─── issue_reclaim_grant ───
873
874#[derive(Clone, Debug, Serialize, Deserialize)]
875#[non_exhaustive]
876pub struct IssueReclaimGrantArgs {
877    pub execution_id: ExecutionId,
878    pub worker_id: WorkerId,
879    pub worker_instance_id: WorkerInstanceId,
880    pub lane_id: LaneId,
881    #[serde(default)]
882    pub capability_hash: Option<String>,
883    pub grant_ttl_ms: u64,
884    #[serde(default)]
885    pub route_snapshot_json: Option<String>,
886    #[serde(default)]
887    pub admission_summary: Option<String>,
888    /// Worker capabilities (parity with `IssueClaimGrantArgs`). The
889    /// Lua primitive `ff_issue_reclaim_grant` reads these at ARGV[9].
890    /// Populated by the SDK admin path from the registered worker's
891    /// `WorkerRegistration::capabilities` per RFC-024 §3.2 (B-2).
892    #[serde(default)]
893    pub worker_capabilities: BTreeSet<String>,
894    /// Caller-side timestamp for bookkeeping. NOT passed to the Lua FCALL —
895    /// ff_issue_reclaim_grant uses `redis.call("TIME")` for grant_expires_at
896    /// (same as ff_issue_claim_grant). Kept for contract symmetry with
897    /// IssueClaimGrantArgs and scheduler audit logging.
898    pub now: TimestampMs,
899}
900
901impl IssueReclaimGrantArgs {
902    /// Construct an `IssueReclaimGrantArgs`. Added alongside
903    /// `#[non_exhaustive]` per RFC-024 §3.2 +
904    /// `feedback_non_exhaustive_needs_constructor`.
905    #[allow(clippy::too_many_arguments)]
906    pub fn new(
907        execution_id: ExecutionId,
908        worker_id: WorkerId,
909        worker_instance_id: WorkerInstanceId,
910        lane_id: LaneId,
911        capability_hash: Option<String>,
912        grant_ttl_ms: u64,
913        route_snapshot_json: Option<String>,
914        admission_summary: Option<String>,
915        worker_capabilities: BTreeSet<String>,
916        now: TimestampMs,
917    ) -> Self {
918        Self {
919            execution_id,
920            worker_id,
921            worker_instance_id,
922            lane_id,
923            capability_hash,
924            grant_ttl_ms,
925            route_snapshot_json,
926            admission_summary,
927            worker_capabilities,
928            now,
929        }
930    }
931}
932
933#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
934pub enum IssueReclaimGrantResult {
935    /// Reclaim grant issued.
936    Granted { expires_at_ms: TimestampMs },
937}
938
939/// Typed outcome of [`crate::engine_backend::EngineBackend::issue_reclaim_grant`]
940/// (RFC-024 §3.2).
941///
942/// Construction surface: backends produce variants; consumers match
943/// on variants. No `::new()` — variants ARE the surface.
944#[derive(Clone, Debug, PartialEq, Eq)]
945#[non_exhaustive]
946pub enum IssueReclaimGrantOutcome {
947    /// Grant issued — hand the carried [`ReclaimGrant`] to
948    /// [`crate::engine_backend::EngineBackend::reclaim_execution`].
949    Granted(ReclaimGrant),
950    /// Execution is not in a reclaimable state (not
951    /// `lease_expired_reclaimable` / `lease_revoked`).
952    NotReclaimable {
953        execution_id: ExecutionId,
954        detail: String,
955    },
956    /// `max_reclaim_count` exceeded; execution transitioned to
957    /// terminal_failed by the backend primitive.
958    ReclaimCapExceeded {
959        execution_id: ExecutionId,
960        reclaim_count: u32,
961    },
962}
963
964// ─── reclaim_execution ───
965
966#[derive(Clone, Debug, Serialize, Deserialize)]
967#[non_exhaustive]
968pub struct ReclaimExecutionArgs {
969    pub execution_id: ExecutionId,
970    pub worker_id: WorkerId,
971    pub worker_instance_id: WorkerInstanceId,
972    pub lane_id: LaneId,
973    #[serde(default)]
974    pub capability_hash: Option<String>,
975    pub lease_id: LeaseId,
976    pub lease_ttl_ms: u64,
977    pub attempt_id: AttemptId,
978    /// JSON-encoded attempt policy for the reclaim attempt.
979    #[serde(default)]
980    pub attempt_policy_json: String,
981    /// Maximum reclaim count before terminal failure. `None` ⇒ backend
982    /// applies the Rust-surface default of 1000 per RFC-024 §4.6. The
983    /// Lua fallback remains 100 for pre-RFC ARGV-omitted call sites;
984    /// the two-default coexistence is explicit by design.
985    #[serde(default)]
986    pub max_reclaim_count: Option<u32>,
987    /// Old worker instance (for old_worker_leases key construction).
988    pub old_worker_instance_id: WorkerInstanceId,
989    /// Current attempt index (for old_attempt/old_stream_meta key construction).
990    pub current_attempt_index: AttemptIndex,
991}
992
993impl ReclaimExecutionArgs {
994    /// Construct a `ReclaimExecutionArgs`. Added alongside
995    /// `#[non_exhaustive]` per RFC-024 §3.2 +
996    /// `feedback_non_exhaustive_needs_constructor`.
997    #[allow(clippy::too_many_arguments)]
998    pub fn new(
999        execution_id: ExecutionId,
1000        worker_id: WorkerId,
1001        worker_instance_id: WorkerInstanceId,
1002        lane_id: LaneId,
1003        capability_hash: Option<String>,
1004        lease_id: LeaseId,
1005        lease_ttl_ms: u64,
1006        attempt_id: AttemptId,
1007        attempt_policy_json: String,
1008        max_reclaim_count: Option<u32>,
1009        old_worker_instance_id: WorkerInstanceId,
1010        current_attempt_index: AttemptIndex,
1011    ) -> Self {
1012        Self {
1013            execution_id,
1014            worker_id,
1015            worker_instance_id,
1016            lane_id,
1017            capability_hash,
1018            lease_id,
1019            lease_ttl_ms,
1020            attempt_id,
1021            attempt_policy_json,
1022            max_reclaim_count,
1023            old_worker_instance_id,
1024            current_attempt_index,
1025        }
1026    }
1027}
1028
1029/// Typed outcome of [`crate::engine_backend::EngineBackend::reclaim_execution`]
1030/// (RFC-024 §3.2).
1031///
1032/// Distinct from the wire-level [`ReclaimExecutionResult`]; this enum
1033/// is the trait-surface shape consumers match on.
1034#[derive(Clone, Debug, PartialEq, Eq)]
1035#[non_exhaustive]
1036pub enum ReclaimExecutionOutcome {
1037    /// Execution reclaimed — carries the new-attempt
1038    /// [`crate::backend::Handle`] (kind = `Reclaimed`).
1039    Claimed(crate::backend::Handle),
1040    /// Execution is not in a reclaimable state.
1041    NotReclaimable {
1042        execution_id: ExecutionId,
1043        detail: String,
1044    },
1045    /// `max_reclaim_count` exceeded; execution transitioned to
1046    /// terminal_failed.
1047    ReclaimCapExceeded {
1048        execution_id: ExecutionId,
1049        reclaim_count: u32,
1050    },
1051    /// The supplied grant was not found / already consumed / expired.
1052    GrantNotFound { execution_id: ExecutionId },
1053}
1054
1055#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1056pub enum ReclaimExecutionResult {
1057    /// Execution reclaimed — new attempt + new lease.
1058    Reclaimed {
1059        new_attempt_index: AttemptIndex,
1060        new_attempt_id: AttemptId,
1061        new_lease_id: LeaseId,
1062        new_lease_epoch: LeaseEpoch,
1063        lease_expires_at: TimestampMs,
1064    },
1065    /// Max reclaims exceeded — execution moved to terminal.
1066    MaxReclaimsExceeded,
1067}
1068
1069// ─── expire_execution ───
1070
1071#[derive(Clone, Debug, Serialize, Deserialize)]
1072pub struct ExpireExecutionArgs {
1073    pub execution_id: ExecutionId,
1074    /// "attempt_timeout" or "execution_deadline"
1075    pub expire_reason: String,
1076}
1077
1078#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1079pub enum ExpireExecutionResult {
1080    /// Execution expired.
1081    Expired { execution_id: ExecutionId },
1082    /// Already terminal — no-op.
1083    AlreadyTerminal,
1084}
1085
1086// ═══════════════════════════════════════════════════════════════════════
1087// Phase 3 contracts: suspend, signal, resume, waitpoint
1088// ═══════════════════════════════════════════════════════════════════════
1089
1090// ─── suspend_execution ───
1091
1092#[derive(Clone, Debug, Serialize, Deserialize)]
1093pub struct SuspendExecutionArgs {
1094    pub execution_id: ExecutionId,
1095    /// RFC #58.5 — fence triple. Required (no operator override path for
1096    /// suspend). `None` returns `fence_required`.
1097    pub fence: Option<LeaseFence>,
1098    pub attempt_index: AttemptIndex,
1099    pub suspension_id: SuspensionId,
1100    pub waitpoint_id: WaitpointId,
1101    pub waitpoint_key: String,
1102    pub reason_code: String,
1103    pub requested_by: String,
1104    pub resume_condition_json: String,
1105    pub resume_policy_json: String,
1106    #[serde(default)]
1107    pub continuation_metadata_pointer: Option<String>,
1108    #[serde(default)]
1109    pub timeout_at: Option<TimestampMs>,
1110    /// true to activate a pending waitpoint, false to create new.
1111    #[serde(default)]
1112    pub use_pending_waitpoint: bool,
1113    /// Timeout behavior: "fail", "cancel", "expire", "auto_resume", "escalate".
1114    #[serde(default = "default_timeout_behavior")]
1115    pub timeout_behavior: String,
1116}
1117
1118fn default_timeout_behavior() -> String {
1119    "fail".to_owned()
1120}
1121
1122#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1123pub enum SuspendExecutionResult {
1124    /// Execution suspended, waitpoint active.
1125    Suspended {
1126        suspension_id: SuspensionId,
1127        waitpoint_id: WaitpointId,
1128        waitpoint_key: String,
1129        /// HMAC-SHA1 token bound to (waitpoint_id, waitpoint_key, created_at).
1130        /// Required by signal-delivery callers to authenticate against this
1131        /// waitpoint (RFC-004 §Waitpoint Security).
1132        waitpoint_token: WaitpointToken,
1133    },
1134    /// Buffered signals already satisfied the condition — suspension skipped.
1135    /// Lease is still held. Token comes from the pending waitpoint record.
1136    AlreadySatisfied {
1137        suspension_id: SuspensionId,
1138        waitpoint_id: WaitpointId,
1139        waitpoint_key: String,
1140        waitpoint_token: WaitpointToken,
1141    },
1142}
1143
1144// ─── resume_execution ───
1145
1146#[derive(Clone, Debug, Serialize, Deserialize)]
1147pub struct ResumeExecutionArgs {
1148    pub execution_id: ExecutionId,
1149    /// "signal", "operator", "auto_resume"
1150    #[serde(default = "default_trigger_type")]
1151    pub trigger_type: String,
1152    /// Optional delay before becoming eligible (ms).
1153    #[serde(default)]
1154    pub resume_delay_ms: u64,
1155}
1156
1157fn default_trigger_type() -> String {
1158    "signal".to_owned()
1159}
1160
1161#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1162pub enum ResumeExecutionResult {
1163    /// Execution resumed to runnable.
1164    Resumed { public_state: PublicState },
1165}
1166
1167// ─── create_pending_waitpoint ───
1168
1169#[derive(Clone, Debug, Serialize, Deserialize)]
1170pub struct CreatePendingWaitpointArgs {
1171    pub execution_id: ExecutionId,
1172    pub lease_id: LeaseId,
1173    pub lease_epoch: LeaseEpoch,
1174    pub attempt_index: AttemptIndex,
1175    pub attempt_id: AttemptId,
1176    pub waitpoint_id: WaitpointId,
1177    pub waitpoint_key: String,
1178    /// Short expiry for the pending waitpoint (ms).
1179    pub expires_in_ms: u64,
1180}
1181
1182#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1183pub enum CreatePendingWaitpointResult {
1184    /// Pending waitpoint created.
1185    Created {
1186        waitpoint_id: WaitpointId,
1187        waitpoint_key: String,
1188        /// HMAC-SHA1 token bound to the pending waitpoint. Required for
1189        /// `buffer_signal_for_pending_waitpoint` and carried forward when
1190        /// the waitpoint is activated by `suspend_execution`.
1191        waitpoint_token: WaitpointToken,
1192    },
1193}
1194
1195// ─── close_waitpoint ───
1196
1197#[derive(Clone, Debug, Serialize, Deserialize)]
1198pub struct CloseWaitpointArgs {
1199    pub waitpoint_id: WaitpointId,
1200    pub reason: String,
1201}
1202
1203#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1204pub enum CloseWaitpointResult {
1205    /// Waitpoint closed.
1206    Closed,
1207}
1208
1209// ─── deliver_signal ───
1210
1211#[derive(Clone, Debug, Serialize, Deserialize)]
1212pub struct DeliverSignalArgs {
1213    pub execution_id: ExecutionId,
1214    pub waitpoint_id: WaitpointId,
1215    pub signal_id: SignalId,
1216    pub signal_name: String,
1217    pub signal_category: String,
1218    pub source_type: String,
1219    pub source_identity: String,
1220    #[serde(default)]
1221    pub payload: Option<Vec<u8>>,
1222    #[serde(default)]
1223    pub payload_encoding: Option<String>,
1224    #[serde(default)]
1225    pub correlation_id: Option<String>,
1226    #[serde(default)]
1227    pub idempotency_key: Option<String>,
1228    pub target_scope: String,
1229    #[serde(default)]
1230    pub created_at: Option<TimestampMs>,
1231    /// Dedup TTL for idempotency key (ms).
1232    #[serde(default)]
1233    pub dedup_ttl_ms: Option<u64>,
1234    /// Resume delay after signal satisfaction (ms).
1235    #[serde(default)]
1236    pub resume_delay_ms: Option<u64>,
1237    /// Max signals per execution (default 10000).
1238    #[serde(default)]
1239    pub max_signals_per_execution: Option<u64>,
1240    /// MAXLEN for the waitpoint signal stream.
1241    #[serde(default)]
1242    pub signal_maxlen: Option<u64>,
1243    /// HMAC-SHA1 token issued when the waitpoint was created. Required for
1244    /// signal delivery; missing/tampered/rotated-past-grace tokens are
1245    /// rejected with `invalid_token` or `token_expired` (RFC-004).
1246    ///
1247    /// Defense-in-depth: `WaitpointToken` is a transparent string newtype,
1248    /// so an empty string deserializes successfully from JSON. The
1249    /// validation boundary is in Lua (`validate_waitpoint_token` returns
1250    /// `missing_token` on empty input); this type intentionally does NOT
1251    /// pre-reject at the Rust layer so callers get a consistent typed
1252    /// error regardless of how they constructed the args.
1253    pub waitpoint_token: WaitpointToken,
1254    pub now: TimestampMs,
1255}
1256
1257#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1258pub enum DeliverSignalResult {
1259    /// Signal accepted with the given effect.
1260    Accepted { signal_id: SignalId, effect: String },
1261    /// Duplicate signal (idempotency key matched).
1262    Duplicate { existing_signal_id: SignalId },
1263}
1264
1265// ─── buffer_signal_for_pending_waitpoint ───
1266
1267#[derive(Clone, Debug, Serialize, Deserialize)]
1268pub struct BufferSignalArgs {
1269    pub execution_id: ExecutionId,
1270    pub waitpoint_id: WaitpointId,
1271    pub signal_id: SignalId,
1272    pub signal_name: String,
1273    pub signal_category: String,
1274    pub source_type: String,
1275    pub source_identity: String,
1276    #[serde(default)]
1277    pub payload: Option<Vec<u8>>,
1278    #[serde(default)]
1279    pub payload_encoding: Option<String>,
1280    #[serde(default)]
1281    pub idempotency_key: Option<String>,
1282    pub target_scope: String,
1283    /// HMAC-SHA1 token issued when `create_pending_waitpoint` ran. Required
1284    /// to authenticate early signals targeting the pending waitpoint.
1285    pub waitpoint_token: WaitpointToken,
1286    pub now: TimestampMs,
1287}
1288
1289#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1290pub enum BufferSignalResult {
1291    /// Signal buffered for pending waitpoint.
1292    Buffered { signal_id: SignalId },
1293    /// Duplicate signal.
1294    Duplicate { existing_signal_id: SignalId },
1295}
1296
1297// ─── list_pending_waitpoints ───
1298
1299/// One entry in the read-only view of an execution's active waitpoints.
1300///
1301/// Returned by `EngineBackend::list_pending_waitpoints` (and the
1302/// `GET /v1/executions/{id}/pending-waitpoints` REST endpoint).
1303///
1304/// **RFC-017 §8 schema rewrite (Stage D1).** This struct no longer
1305/// carries the raw HMAC `waitpoint_token` at the trait boundary — the
1306/// backend emits only the sanitised `(token_kid, token_fingerprint)`
1307/// pair. The HTTP handler (see `ff-server::api::list_pending_waitpoints`)
1308/// wraps the trait response and re-injects the real token on the
1309/// v0.7.x wire for one-release deprecation warning; the wire field is
1310/// removed entirely at v0.8.0.
1311#[non_exhaustive]
1312#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1313pub struct PendingWaitpointInfo {
1314    pub waitpoint_id: WaitpointId,
1315    pub waitpoint_key: String,
1316    /// Current waitpoint state: `pending`, `active`, `closed`. Callers
1317    /// typically filter to `pending` or `active`.
1318    pub state: String,
1319    /// Signal names the resume condition is waiting for. Reviewers that
1320    /// need to drive a specific waitpoint — particularly when multiple
1321    /// concurrent waitpoints exist on one execution — filter on this to
1322    /// pick the right target.
1323    ///
1324    /// An EMPTY vec means the condition matches any signal (wildcard, per
1325    /// `lua/helpers.lua` `initialize_condition`). Callers must not infer
1326    /// "no waitpoint" from empty; check `state` / length of the outer
1327    /// list for that.
1328    #[serde(default)]
1329    pub required_signal_names: Vec<String>,
1330    /// Timestamp when the waitpoint record was first written.
1331    pub created_at: TimestampMs,
1332    /// Timestamp when the waitpoint was activated (suspension landed).
1333    /// `None` while the waitpoint is still `pending`.
1334    #[serde(default, skip_serializing_if = "Option::is_none")]
1335    pub activated_at: Option<TimestampMs>,
1336    /// Scheduled expiration timestamp. `None` if no timeout configured.
1337    #[serde(default, skip_serializing_if = "Option::is_none")]
1338    pub expires_at: Option<TimestampMs>,
1339    /// Owning execution — surfaces without a separate lookup.
1340    pub execution_id: ExecutionId,
1341    /// HMAC key identifier (the `<kid>` prefix of the stored
1342    /// `waitpoint_token`). Safe to expose — identifies which signing
1343    /// key minted the token without revealing the key material.
1344    pub token_kid: String,
1345    /// 16-hex-char (8-byte) fingerprint of the HMAC digest. Audit-friendly
1346    /// handle that correlates across logs without being replayable.
1347    pub token_fingerprint: String,
1348}
1349
1350impl PendingWaitpointInfo {
1351    /// Construct a `PendingWaitpointInfo` with the 7 required fields.
1352    /// Optional fields (`activated_at`, `expires_at`) default to
1353    /// `None`; use [`Self::with_activated_at`] / [`Self::with_expires_at`]
1354    /// to populate them. `required_signal_names` defaults to empty
1355    /// (wildcard condition); use [`Self::with_required_signal_names`]
1356    /// to set it.
1357    pub fn new(
1358        waitpoint_id: WaitpointId,
1359        waitpoint_key: String,
1360        state: String,
1361        created_at: TimestampMs,
1362        execution_id: ExecutionId,
1363        token_kid: String,
1364        token_fingerprint: String,
1365    ) -> Self {
1366        Self {
1367            waitpoint_id,
1368            waitpoint_key,
1369            state,
1370            required_signal_names: Vec::new(),
1371            created_at,
1372            activated_at: None,
1373            expires_at: None,
1374            execution_id,
1375            token_kid,
1376            token_fingerprint,
1377        }
1378    }
1379
1380    pub fn with_activated_at(mut self, activated_at: TimestampMs) -> Self {
1381        self.activated_at = Some(activated_at);
1382        self
1383    }
1384
1385    pub fn with_expires_at(mut self, expires_at: TimestampMs) -> Self {
1386        self.expires_at = Some(expires_at);
1387        self
1388    }
1389
1390    pub fn with_required_signal_names(mut self, names: Vec<String>) -> Self {
1391        self.required_signal_names = names;
1392        self
1393    }
1394}
1395
1396// ─── expire_suspension ───
1397
1398#[derive(Clone, Debug, Serialize, Deserialize)]
1399pub struct ExpireSuspensionArgs {
1400    pub execution_id: ExecutionId,
1401}
1402
1403#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1404pub enum ExpireSuspensionResult {
1405    /// Suspension expired with the given behavior applied.
1406    Expired { behavior_applied: String },
1407    /// Already resolved — no action needed.
1408    AlreadySatisfied { reason: String },
1409}
1410
1411// ─── claim_resumed_execution ───
1412
1413#[derive(Clone, Debug, Serialize, Deserialize)]
1414pub struct ClaimResumedExecutionArgs {
1415    pub execution_id: ExecutionId,
1416    pub worker_id: WorkerId,
1417    pub worker_instance_id: WorkerInstanceId,
1418    pub lane_id: LaneId,
1419    pub lease_id: LeaseId,
1420    pub lease_ttl_ms: u64,
1421    /// Current attempt index (for KEYS construction — from exec_core).
1422    pub current_attempt_index: AttemptIndex,
1423    /// Remaining attempt timeout from before suspension (ms). 0 = no timeout.
1424    #[serde(default)]
1425    pub remaining_attempt_timeout_ms: Option<u64>,
1426    pub now: TimestampMs,
1427}
1428
1429#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1430#[non_exhaustive]
1431pub struct ClaimedResumedExecution {
1432    pub execution_id: ExecutionId,
1433    pub lease_id: LeaseId,
1434    pub lease_epoch: LeaseEpoch,
1435    pub attempt_index: AttemptIndex,
1436    pub attempt_id: AttemptId,
1437    pub lease_expires_at: TimestampMs,
1438    /// Backend-populated attempt handle for this resumed claim
1439    /// (v0.12 PR-5.5). Valkey fills in `HandleKind::Resumed`; PG/SQLite
1440    /// populate a backend-tagged real handle via
1441    /// `ff_core::handle_codec::encode`.
1442    #[serde(default = "crate::backend::stub_handle_resumed")]
1443    pub handle: crate::backend::Handle,
1444}
1445
1446impl ClaimedResumedExecution {
1447    /// Construct a `ClaimedResumedExecution`. Added alongside
1448    /// `#[non_exhaustive]` per `feedback_non_exhaustive_needs_constructor`
1449    /// so consumers (backend impls building a resumed-claim outcome)
1450    /// can still build the struct after it was sealed for forward-compat.
1451    #[allow(clippy::too_many_arguments)]
1452    pub fn new(
1453        execution_id: ExecutionId,
1454        lease_id: LeaseId,
1455        lease_epoch: LeaseEpoch,
1456        attempt_index: AttemptIndex,
1457        attempt_id: AttemptId,
1458        lease_expires_at: TimestampMs,
1459        handle: crate::backend::Handle,
1460    ) -> Self {
1461        Self {
1462            execution_id,
1463            lease_id,
1464            lease_epoch,
1465            attempt_index,
1466            attempt_id,
1467            lease_expires_at,
1468            handle,
1469        }
1470    }
1471}
1472
1473#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1474pub enum ClaimResumedExecutionResult {
1475    /// Successfully claimed resumed execution (same attempt continues).
1476    Claimed(ClaimedResumedExecution),
1477}
1478
1479// ═══════════════════════════════════════════════════════════════════════
1480// Phase 4 contracts: stream
1481// ═══════════════════════════════════════════════════════════════════════
1482
1483// ─── append_frame ───
1484
1485#[derive(Clone, Debug, Serialize, Deserialize)]
1486pub struct AppendFrameArgs {
1487    pub execution_id: ExecutionId,
1488    pub attempt_index: AttemptIndex,
1489    pub lease_id: LeaseId,
1490    pub lease_epoch: LeaseEpoch,
1491    pub attempt_id: AttemptId,
1492    pub frame_type: String,
1493    pub timestamp: TimestampMs,
1494    pub payload: Vec<u8>,
1495    #[serde(default)]
1496    pub encoding: Option<String>,
1497    /// Optional structured metadata for the frame (JSON blob).
1498    #[serde(default)]
1499    pub metadata_json: Option<String>,
1500    #[serde(default)]
1501    pub correlation_id: Option<String>,
1502    #[serde(default)]
1503    pub source: Option<String>,
1504    /// MAXLEN for the stream. 0 = no trim.
1505    #[serde(default)]
1506    pub retention_maxlen: Option<u32>,
1507    /// Max payload bytes per frame. Default: 65536.
1508    #[serde(default)]
1509    pub max_payload_bytes: Option<u32>,
1510}
1511
1512#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1513pub enum AppendFrameResult {
1514    /// Frame appended successfully.
1515    Appended {
1516        /// Valkey Stream entry ID (e.g. "1713100800150-0").
1517        entry_id: String,
1518        /// Total frame count after this append.
1519        frame_count: u64,
1520    },
1521}
1522
1523// ─── StreamCursor (issue #92) ───
1524
1525/// Opaque cursor for attempt-stream reads/tails.
1526///
1527/// Replaces the bare `&str` / `String` stream-id parameters previously
1528/// carried on `read_stream` / `tail_stream` / `ReadStreamParams` /
1529/// `TailStreamParams`. The wire form is a flat string — serde is
1530/// transparent via `try_from`/`into` — so `?from=start&to=end` and
1531/// `?after=123-0` continue to work for REST clients.
1532///
1533/// # Public wire grammar
1534///
1535/// The ONLY accepted tokens are:
1536///
1537/// * `"start"` — first entry in the stream (XRANGE `-` equivalent).
1538///   Valid in `read_stream` / `ReadStreamParams`.
1539/// * `"end"` — latest entry in the stream (XRANGE `+` equivalent).
1540///   Valid in `read_stream` / `ReadStreamParams`.
1541/// * `"<ms>"` or `"<ms>-<seq>"` — a concrete Valkey Stream entry id.
1542///   Valid everywhere.
1543///
1544/// The bare XRANGE/XREAD markers `"-"` and `"+"` are **NOT** accepted
1545/// on the wire. The opaque `StreamCursor` grammar is the public
1546/// contract; the Valkey `-`/`+` markers are an internal implementation
1547/// detail carried only inside the Lua-adjacent [`ReadFramesArgs`] /
1548/// `xread_block` path via [`StreamCursor::to_wire`].
1549///
1550/// For XREAD (tail), the documented "from the beginning" convention is
1551/// `StreamCursor::At("0-0".into())` — use the convenience constructor
1552/// [`StreamCursor::from_beginning`] which returns exactly that value.
1553/// `Start` / `End` are rejected by the SDK's `tail_stream` boundary
1554/// because XREAD does not accept `-` / `+` as cursors. The
1555/// [`StreamCursor::is_concrete`] helper centralises this
1556/// Start/End-vs-At decision for boundary-validation call sites.
1557///
1558/// # Why an enum instead of a string
1559///
1560/// A string parameter lets malformed ids escape to the Lua/Valkey
1561/// layer, surfacing as a script error and HTTP 500. An enum with
1562/// fallible `FromStr` / `TryFrom<String>` catches every malformed input
1563/// at the wire boundary with a structured error, and prevents bare `-`
1564/// / `+` from leaking into consumer code as tacit extensions of the
1565/// public API.
1566#[derive(Clone, Debug, PartialEq, Eq, Hash)]
1567pub enum StreamCursor {
1568    /// First entry in the stream (XRANGE start marker).
1569    Start,
1570    /// Latest entry in the stream (XRANGE end marker).
1571    End,
1572    /// A concrete Valkey Stream entry id (`<ms>` or `<ms>-<seq>`).
1573    ///
1574    /// For XREAD-style tails, the documented "from the beginning"
1575    /// convention is `At("0-0".to_owned())` — see
1576    /// [`StreamCursor::from_beginning`].
1577    At(String),
1578}
1579
1580impl StreamCursor {
1581    /// Convenience constructor for the XREAD-from-beginning convention
1582    /// (`"0-0"`). XREAD's `last_id` is exclusive, so passing this as
1583    /// the `after` cursor returns every entry in the stream.
1584    pub fn from_beginning() -> Self {
1585        Self::At("0-0".to_owned())
1586    }
1587
1588    /// Serde default helper — emits `StreamCursor::Start`. Used as
1589    /// `#[serde(default = "StreamCursor::start")]` on REST query
1590    /// structs.
1591    pub fn start() -> Self {
1592        Self::Start
1593    }
1594
1595    /// Serde default helper — emits `StreamCursor::End`.
1596    pub fn end() -> Self {
1597        Self::End
1598    }
1599
1600    /// Serde default helper — emits
1601    /// `StreamCursor::from_beginning()`. Used as the default for
1602    /// `TailStreamParams::after`.
1603    pub fn beginning() -> Self {
1604        Self::from_beginning()
1605    }
1606
1607    /// Internal-only: lower the cursor to the XRANGE/XREAD marker
1608    /// string Valkey expects. `Start → "-"`, `End → "+"`,
1609    /// `At(s) → s`.
1610    ///
1611    /// Used at the ff-script adapter edge (right before constructing
1612    /// `ReadFramesArgs` or calling `xread_block`) to translate the
1613    /// opaque wire grammar into the Lua-ABI form. NOT part of the
1614    /// public wire — do not emit these raw characters to consumers.
1615    /// Hidden from the generated docs to discourage external use;
1616    /// external consumers should never need to see the raw `-` / `+`.
1617    #[doc(hidden)]
1618    pub fn to_wire(&self) -> &str {
1619        match self {
1620            Self::Start => "-",
1621            Self::End => "+",
1622            Self::At(s) => s.as_str(),
1623        }
1624    }
1625
1626    /// Internal-only owned variant of [`Self::to_wire`] — moves the
1627    /// inner `String` out of `At(s)` without cloning. Use at adapter
1628    /// edges that construct an owned wire string (e.g.
1629    /// `ReadFramesArgs.from_id`) from a `StreamCursor` that is about
1630    /// to be dropped.
1631    #[doc(hidden)]
1632    pub fn into_wire_string(self) -> String {
1633        match self {
1634            Self::Start => "-".to_owned(),
1635            Self::End => "+".to_owned(),
1636            Self::At(s) => s,
1637        }
1638    }
1639
1640    /// True iff this cursor is a concrete entry id
1641    /// (`"<ms>"` / `"<ms>-<seq>"`). False for the open markers
1642    /// `Start` / `End`.
1643    ///
1644    /// Used by boundaries like XREAD (tailing) that do not accept
1645    /// open markers — rejecting a cursor is equivalent to
1646    /// `!cursor.is_concrete()`. Centralised here to keep the SDK and
1647    /// REST guards in lock-step.
1648    pub fn is_concrete(&self) -> bool {
1649        matches!(self, Self::At(_))
1650    }
1651}
1652
1653impl std::fmt::Display for StreamCursor {
1654    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1655        match self {
1656            Self::Start => f.write_str("start"),
1657            Self::End => f.write_str("end"),
1658            Self::At(s) => f.write_str(s),
1659        }
1660    }
1661}
1662
1663/// Error produced when parsing a [`StreamCursor`] from a string.
1664#[derive(Clone, Debug, PartialEq, Eq)]
1665pub enum StreamCursorParseError {
1666    /// Empty input.
1667    Empty,
1668    /// Input matched a rejected bare-marker alias (`"-"`, `"+"`).
1669    /// The public wire requires `"start"` / `"end"`; the raw Valkey
1670    /// markers are internal-only.
1671    BareMarkerRejected(String),
1672    /// Input was neither a recognized keyword nor a well-formed
1673    /// Stream entry id. Entry ids must match `^\d+(?:-\d+)?$`.
1674    Malformed(String),
1675}
1676
1677impl std::fmt::Display for StreamCursorParseError {
1678    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1679        match self {
1680            Self::Empty => f.write_str("stream cursor must not be empty"),
1681            Self::BareMarkerRejected(s) => write!(
1682                f,
1683                "bare marker '{s}' is not a valid stream cursor; use 'start' or 'end'"
1684            ),
1685            Self::Malformed(s) => write!(
1686                f,
1687                "invalid stream cursor '{s}' (expected 'start', 'end', '<ms>', or '<ms>-<seq>')"
1688            ),
1689        }
1690    }
1691}
1692
1693impl std::error::Error for StreamCursorParseError {}
1694
1695/// Shared grammar check — classifies `s` as `Start` / `End` / a
1696/// concrete-id shape / malformed / empty, WITHOUT allocating. The
1697/// owned vs borrowed entry points ([`StreamCursor::from_str`],
1698/// [`StreamCursor::try_from`]) consume this classification and move
1699/// the owned `String` into `At` when applicable, avoiding a
1700/// round-trip `String → &str → String::to_owned` for the common
1701/// REST-query path.
1702enum StreamCursorClass {
1703    Start,
1704    End,
1705    Concrete,
1706    BareMarker,
1707    Empty,
1708    Malformed,
1709}
1710
1711fn classify_stream_cursor(s: &str) -> StreamCursorClass {
1712    if s.is_empty() {
1713        return StreamCursorClass::Empty;
1714    }
1715    if s == "-" || s == "+" {
1716        return StreamCursorClass::BareMarker;
1717    }
1718    if s == "start" {
1719        return StreamCursorClass::Start;
1720    }
1721    if s == "end" {
1722        return StreamCursorClass::End;
1723    }
1724    if !s.is_ascii() {
1725        return StreamCursorClass::Malformed;
1726    }
1727    let (ms_part, seq_part) = match s.split_once('-') {
1728        Some((ms, seq)) => (ms, Some(seq)),
1729        None => (s, None),
1730    };
1731    let ms_ok = !ms_part.is_empty() && ms_part.bytes().all(|b| b.is_ascii_digit());
1732    let seq_ok = seq_part
1733        .map(|p| !p.is_empty() && p.bytes().all(|b| b.is_ascii_digit()))
1734        .unwrap_or(true);
1735    if ms_ok && seq_ok {
1736        StreamCursorClass::Concrete
1737    } else {
1738        StreamCursorClass::Malformed
1739    }
1740}
1741
1742impl std::str::FromStr for StreamCursor {
1743    type Err = StreamCursorParseError;
1744
1745    fn from_str(s: &str) -> Result<Self, Self::Err> {
1746        match classify_stream_cursor(s) {
1747            StreamCursorClass::Start => Ok(Self::Start),
1748            StreamCursorClass::End => Ok(Self::End),
1749            StreamCursorClass::Concrete => Ok(Self::At(s.to_owned())),
1750            StreamCursorClass::BareMarker => {
1751                Err(StreamCursorParseError::BareMarkerRejected(s.to_owned()))
1752            }
1753            StreamCursorClass::Empty => Err(StreamCursorParseError::Empty),
1754            StreamCursorClass::Malformed => Err(StreamCursorParseError::Malformed(s.to_owned())),
1755        }
1756    }
1757}
1758
1759impl TryFrom<String> for StreamCursor {
1760    type Error = StreamCursorParseError;
1761
1762    fn try_from(s: String) -> Result<Self, Self::Error> {
1763        // Owned parsing path — the `At` variant moves `s` in directly,
1764        // avoiding the `&str → String::to_owned` re-allocation that a
1765        // blind forward to `FromStr::from_str(&s)` would force. Error
1766        // paths still pay one allocation to describe the offending
1767        // input.
1768        match classify_stream_cursor(&s) {
1769            StreamCursorClass::Start => Ok(Self::Start),
1770            StreamCursorClass::End => Ok(Self::End),
1771            StreamCursorClass::Concrete => Ok(Self::At(s)),
1772            StreamCursorClass::BareMarker => Err(StreamCursorParseError::BareMarkerRejected(s)),
1773            StreamCursorClass::Empty => Err(StreamCursorParseError::Empty),
1774            StreamCursorClass::Malformed => Err(StreamCursorParseError::Malformed(s)),
1775        }
1776    }
1777}
1778
1779impl From<StreamCursor> for String {
1780    fn from(c: StreamCursor) -> Self {
1781        c.to_string()
1782    }
1783}
1784
1785impl Serialize for StreamCursor {
1786    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
1787        serializer.collect_str(self)
1788    }
1789}
1790
1791impl<'de> Deserialize<'de> for StreamCursor {
1792    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
1793        let s = String::deserialize(deserializer)?;
1794        Self::try_from(s).map_err(serde::de::Error::custom)
1795    }
1796}
1797
1798// ─── read_attempt_stream / tail_attempt_stream ───
1799
1800/// Hard cap on the number of frames returned by a single read/tail call.
1801///
1802/// Single source of truth across the Rust layer (ff-script, ff-server,
1803/// ff-sdk). The Lua side in `lua/stream.lua` keeps a matching literal with
1804/// an inline reference back here; bump both together if you ever need to
1805/// lift the cap.
1806pub const STREAM_READ_HARD_CAP: u64 = 10_000;
1807
1808/// A single frame read from an attempt-scoped stream.
1809///
1810/// Field set mirrors what `ff_append_frame` writes: `frame_type`, `ts`,
1811/// `payload`, `encoding`, `source`, and optionally `correlation_id`. Stored
1812/// as an ordered map so field order is deterministic across read calls.
1813#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1814pub struct StreamFrame {
1815    /// Valkey Stream entry ID, e.g. "1713100800150-0".
1816    pub id: String,
1817    /// Frame fields in sorted order.
1818    pub fields: std::collections::BTreeMap<String, String>,
1819}
1820
1821/// Inputs to `ff_read_attempt_stream` (XRANGE wrapper).
1822#[derive(Clone, Debug, Serialize, Deserialize)]
1823pub struct ReadFramesArgs {
1824    pub execution_id: ExecutionId,
1825    pub attempt_index: AttemptIndex,
1826    /// XRANGE start ID. Use "-" for earliest.
1827    pub from_id: String,
1828    /// XRANGE end ID. Use "+" for latest.
1829    pub to_id: String,
1830    /// XRANGE COUNT limit. MUST be `>= 1`. The REST and SDK layers reject
1831    /// `0` at the boundary; the Lua side rejects it too. `STREAM_READ_HARD_CAP`
1832    /// is the upper bound.
1833    pub count_limit: u64,
1834}
1835
1836/// Result of reading frames from an attempt stream — frames plus terminal
1837/// signal so consumers can stop polling without a timeout fallback.
1838#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1839pub struct StreamFrames {
1840    /// Entries in the requested range (possibly empty).
1841    pub frames: Vec<StreamFrame>,
1842    /// Timestamp when the upstream writer closed the stream. `None` if the
1843    /// stream is still open (or has never been written).
1844    #[serde(default, skip_serializing_if = "Option::is_none")]
1845    pub closed_at: Option<TimestampMs>,
1846    /// Reason from the closing writer. Current values:
1847    /// `attempt_success`, `attempt_failure`, `attempt_cancelled`,
1848    /// `attempt_interrupted`. `None` iff the stream is still open.
1849    #[serde(default, skip_serializing_if = "Option::is_none")]
1850    pub closed_reason: Option<String>,
1851}
1852
1853impl StreamFrames {
1854    /// Construct an empty open-stream result (no frames, no terminal
1855    /// markers). Useful for fast-path peek helpers.
1856    pub fn empty_open() -> Self {
1857        Self {
1858            frames: Vec::new(),
1859            closed_at: None,
1860            closed_reason: None,
1861        }
1862    }
1863
1864    /// True iff the producer has closed this stream. Consumers should stop
1865    /// polling and drain once this returns true.
1866    pub fn is_closed(&self) -> bool {
1867        self.closed_at.is_some()
1868    }
1869}
1870
1871#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1872pub enum ReadFramesResult {
1873    /// Frames returned (possibly empty) plus optional closed markers.
1874    Frames(StreamFrames),
1875}
1876
1877// ═══════════════════════════════════════════════════════════════════════
1878// Phase 5 contracts: budget, quota, block/unblock
1879// ═══════════════════════════════════════════════════════════════════════
1880
1881// ─── create_budget ───
1882
1883#[derive(Clone, Debug, Serialize, Deserialize)]
1884pub struct CreateBudgetArgs {
1885    pub budget_id: crate::types::BudgetId,
1886    pub scope_type: String,
1887    pub scope_id: String,
1888    pub enforcement_mode: String,
1889    pub on_hard_limit: String,
1890    pub on_soft_limit: String,
1891    pub reset_interval_ms: u64,
1892    /// Dimension names.
1893    pub dimensions: Vec<String>,
1894    /// Hard limits per dimension (parallel with dimensions).
1895    pub hard_limits: Vec<u64>,
1896    /// Soft limits per dimension (parallel with dimensions).
1897    pub soft_limits: Vec<u64>,
1898    pub now: TimestampMs,
1899}
1900
1901#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1902pub enum CreateBudgetResult {
1903    /// Budget created.
1904    Created { budget_id: crate::types::BudgetId },
1905    /// Already exists (idempotent).
1906    AlreadySatisfied { budget_id: crate::types::BudgetId },
1907}
1908
1909// ─── create_quota_policy ───
1910
1911#[derive(Clone, Debug, Serialize, Deserialize)]
1912pub struct CreateQuotaPolicyArgs {
1913    pub quota_policy_id: crate::types::QuotaPolicyId,
1914    pub window_seconds: u64,
1915    pub max_requests_per_window: u64,
1916    pub max_concurrent: u64,
1917    pub now: TimestampMs,
1918}
1919
1920#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1921pub enum CreateQuotaPolicyResult {
1922    /// Quota policy created.
1923    Created {
1924        quota_policy_id: crate::types::QuotaPolicyId,
1925    },
1926    /// Already exists (idempotent).
1927    AlreadySatisfied {
1928        quota_policy_id: crate::types::QuotaPolicyId,
1929    },
1930}
1931
1932// ─── budget_status (read-only) ───
1933
1934/// Operator-facing budget status snapshot (not an FCALL — direct HGETALL reads).
1935#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1936pub struct BudgetStatus {
1937    pub budget_id: String,
1938    pub scope_type: String,
1939    pub scope_id: String,
1940    pub enforcement_mode: String,
1941    /// Current usage per dimension: {dimension_name: current_value}.
1942    pub usage: HashMap<String, u64>,
1943    /// Hard limits per dimension: {dimension_name: limit}.
1944    pub hard_limits: HashMap<String, u64>,
1945    /// Soft limits per dimension: {dimension_name: limit}.
1946    pub soft_limits: HashMap<String, u64>,
1947    pub breach_count: u64,
1948    pub soft_breach_count: u64,
1949    pub last_breach_at: Option<String>,
1950    pub last_breach_dim: Option<String>,
1951    pub next_reset_at: Option<String>,
1952    pub created_at: Option<String>,
1953}
1954
1955// ─── report_usage_and_check ───
1956
1957#[derive(Clone, Debug, Serialize, Deserialize)]
1958pub struct ReportUsageArgs {
1959    /// Dimension names to increment.
1960    pub dimensions: Vec<String>,
1961    /// Increment values (parallel with dimensions).
1962    pub deltas: Vec<u64>,
1963    pub now: TimestampMs,
1964    /// Optional idempotency key to prevent double-counting on retries.
1965    /// Pass the raw dedup id (e.g. `"retry-42"`); the typed FCALL wrapper
1966    /// wraps it into `ff:usagededup:{b:M}:<id>` using the budget
1967    /// partition's hash tag so it co-locates with the other budget keys
1968    /// (#108).
1969    #[serde(default)]
1970    pub dedup_key: Option<String>,
1971}
1972
1973#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1974#[non_exhaustive]
1975pub enum ReportUsageResult {
1976    /// All increments applied, no breach.
1977    Ok,
1978    /// Soft limit breached on a dimension (advisory, increments applied).
1979    SoftBreach {
1980        dimension: String,
1981        current_usage: u64,
1982        soft_limit: u64,
1983    },
1984    /// Hard limit breached (increments NOT applied).
1985    HardBreach {
1986        dimension: String,
1987        current_usage: u64,
1988        hard_limit: u64,
1989    },
1990    /// Dedup key matched — usage already applied in a prior call.
1991    AlreadyApplied,
1992}
1993
1994// ─── reset_budget ───
1995
1996#[derive(Clone, Debug, Serialize, Deserialize)]
1997pub struct ResetBudgetArgs {
1998    pub budget_id: crate::types::BudgetId,
1999    pub now: TimestampMs,
2000}
2001
2002#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2003pub enum ResetBudgetResult {
2004    /// Budget reset successfully.
2005    Reset { next_reset_at: TimestampMs },
2006}
2007
2008// ─── check_admission_and_record ───
2009
2010#[derive(Clone, Debug, Serialize, Deserialize)]
2011pub struct CheckAdmissionArgs {
2012    pub execution_id: ExecutionId,
2013    pub now: TimestampMs,
2014    pub window_seconds: u64,
2015    pub rate_limit: u64,
2016    pub concurrency_cap: u64,
2017    #[serde(default)]
2018    pub jitter_ms: Option<u64>,
2019}
2020
2021#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2022pub enum CheckAdmissionResult {
2023    /// Admitted — execution may proceed.
2024    Admitted,
2025    /// Already admitted in this window (idempotent).
2026    AlreadyAdmitted,
2027    /// Rate limit exceeded.
2028    RateExceeded { retry_after_ms: u64 },
2029    /// Concurrency cap hit.
2030    ConcurrencyExceeded,
2031}
2032
2033// ─── release_admission ───
2034
2035/// Args for [`crate::engine_backend::EngineBackend::release_admission`].
2036///
2037/// Called when `issue_claim_grant` fails after admission was recorded,
2038/// to prevent leaked concurrency slots. Semantics are idempotent: a
2039/// release against an already-released slot is a no-op (matches the
2040/// Valkey Lua body's DEL-is-idempotent + DECR-if-positive discipline).
2041///
2042/// FF #511 Phase 1 extended this with `quota_policy_id` so PG/SQLite
2043/// bodies can locate the admission row without reverse-mapping from
2044/// the execution. Valkey still derives its quota key from the
2045/// `{p:N}` partition tag + quota id.
2046#[derive(Clone, Debug, Serialize, Deserialize)]
2047#[non_exhaustive]
2048pub struct ReleaseAdmissionArgs {
2049    /// The quota policy whose admission slot should be released.
2050    pub quota_policy_id: crate::types::QuotaPolicyId,
2051    /// The execution whose slot was admitted.
2052    pub execution_id: ExecutionId,
2053}
2054
2055impl ReleaseAdmissionArgs {
2056    pub fn new(
2057        quota_policy_id: crate::types::QuotaPolicyId,
2058        execution_id: ExecutionId,
2059    ) -> Self {
2060        Self {
2061            quota_policy_id,
2062            execution_id,
2063        }
2064    }
2065}
2066
2067#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2068#[non_exhaustive]
2069pub enum ReleaseAdmissionResult {
2070    Released,
2071}
2072
2073// ─── read_budget_usage_and_limits (FF #511 Phase 3) ───
2074
2075/// Typed snapshot of one budget's usage + limits map. Returned by
2076/// [`crate::engine_backend::EngineBackend::read_budget_usage_and_limits`]
2077/// so the scheduler's `BudgetChecker` can evaluate hard-limit
2078/// breaches without reaching at Valkey-shaped HGETs.
2079///
2080/// `limits` keys are full field names written by the control-plane
2081/// (`hard:<dimension>`, `soft:<dimension>`, etc.); the scheduler
2082/// filters on the `hard:` prefix itself. `usage` keys are raw
2083/// dimension names (no prefix) — one entry per dimension currently
2084/// counting.
2085///
2086/// Missing limits hash ⇒ both maps empty (return `Ok(Self::empty())`).
2087#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
2088#[non_exhaustive]
2089pub struct BudgetUsageAndLimits {
2090    pub limits: std::collections::BTreeMap<String, String>,
2091    pub usage: std::collections::BTreeMap<String, u64>,
2092}
2093
2094impl BudgetUsageAndLimits {
2095    pub fn new(
2096        limits: std::collections::BTreeMap<String, String>,
2097        usage: std::collections::BTreeMap<String, u64>,
2098    ) -> Self {
2099        Self { limits, usage }
2100    }
2101
2102    /// Empty snapshot — returned when the budget's limits hash is
2103    /// absent. Scheduler treats this as "no limits configured".
2104    pub fn empty() -> Self {
2105        Self::default()
2106    }
2107
2108    /// Iterator over `hard:<dimension>` limits paired with the
2109    /// current usage for that dimension. Usage defaults to 0 for
2110    /// dimensions present in `limits` but missing from `usage`.
2111    pub fn hard_limits_with_usage(&self) -> impl Iterator<Item = (&str, u64, u64)> {
2112        self.limits.iter().filter_map(|(field, v)| {
2113            let dim = field.strip_prefix("hard:")?;
2114            let limit: u64 = v.parse().ok().filter(|&n: &u64| n > 0)?;
2115            let used = *self.usage.get(dim).unwrap_or(&0);
2116            Some((dim, used, limit))
2117        })
2118    }
2119}
2120
2121// ─── read_quota_policy_limits (FF #511 Phase 2a) ───
2122
2123/// Typed snapshot of the admission-relevant fields on a quota policy.
2124/// Returned by [`crate::engine_backend::EngineBackend::read_quota_policy_limits`]
2125/// so the scheduler can make admission decisions without reaching
2126/// directly at Valkey-shaped HGETs on `ff:quota:{K}:def`.
2127///
2128/// Fields match the Lua contract for `ff_check_admission_and_record`.
2129/// Zero values are the well-defined "no limit" signal (the scheduler
2130/// skips admission when both `max_requests_per_window` and
2131/// `active_concurrency_cap` are zero).
2132#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
2133#[non_exhaustive]
2134pub struct QuotaPolicyLimits {
2135    pub max_requests_per_window: u64,
2136    pub requests_per_window_seconds: u64,
2137    pub active_concurrency_cap: u64,
2138    pub jitter_ms: u64,
2139}
2140
2141impl QuotaPolicyLimits {
2142    pub fn new(
2143        max_requests_per_window: u64,
2144        requests_per_window_seconds: u64,
2145        active_concurrency_cap: u64,
2146        jitter_ms: u64,
2147    ) -> Self {
2148        Self {
2149            max_requests_per_window,
2150            requests_per_window_seconds,
2151            active_concurrency_cap,
2152            jitter_ms,
2153        }
2154    }
2155
2156    /// Policy declares no admission limits — caller skips admission.
2157    pub fn is_unlimited(&self) -> bool {
2158        self.max_requests_per_window == 0 && self.active_concurrency_cap == 0
2159    }
2160}
2161
2162// ─── block_execution_for_admission ───
2163
2164/// Inputs to [`crate::engine_backend::EngineBackend::block_execution_for_admission`]
2165/// (FF #511 Phase 2b). Generalises `block_route` (capability-mismatch
2166/// only) to cover every admission-time block target: budget denial,
2167/// quota denial, capability mismatch, operator pause, lane pause.
2168///
2169/// `BlockingReason` picks both the eligibility state written onto
2170/// `exec_core` and the `blocked_<reason>` index the execution lands
2171/// on, matching `ff_block_execution_for_admission` Lua's
2172/// `REASON_TO_ELIGIBILITY` table.
2173#[derive(Clone, Debug)]
2174#[non_exhaustive]
2175pub struct BlockExecutionForAdmissionArgs {
2176    pub execution_id: ExecutionId,
2177    pub lane_id: LaneId,
2178    pub partition: crate::partition::Partition,
2179    pub reason: BlockingReason,
2180    /// Human-readable operator-visible detail; `None` = no detail.
2181    pub reason_detail: Option<String>,
2182    pub now: TimestampMs,
2183}
2184
2185impl BlockExecutionForAdmissionArgs {
2186    pub fn new(
2187        execution_id: ExecutionId,
2188        lane_id: LaneId,
2189        partition: crate::partition::Partition,
2190        reason: BlockingReason,
2191        reason_detail: Option<String>,
2192        now: TimestampMs,
2193    ) -> Self {
2194        Self {
2195            execution_id,
2196            lane_id,
2197            partition,
2198            reason,
2199            reason_detail,
2200            now,
2201        }
2202    }
2203}
2204
2205/// Admission-time block reasons supported by
2206/// `block_execution_for_admission`. Mirrors the Lua
2207/// `REASON_TO_ELIGIBILITY` map.
2208#[derive(Clone, Copy, Debug, PartialEq, Eq)]
2209#[non_exhaustive]
2210pub enum BlockingReason {
2211    /// Budget counter denied — `blocked_by_budget`.
2212    WaitingForBudget,
2213    /// Quota admission denied — `blocked_by_quota`.
2214    WaitingForQuota,
2215    /// Capability mismatch — `blocked_by_route`. Equivalent to the
2216    /// existing `block_route` trait method's target.
2217    WaitingForCapableWorker,
2218}
2219
2220impl BlockingReason {
2221    /// The Lua-visible reason string (ARGV[2] in
2222    /// `ff_block_execution_for_admission`).
2223    pub fn reason_code(&self) -> &'static str {
2224        match self {
2225            Self::WaitingForBudget => "waiting_for_budget",
2226            Self::WaitingForQuota => "waiting_for_quota",
2227            Self::WaitingForCapableWorker => "waiting_for_capable_worker",
2228        }
2229    }
2230}
2231
2232/// Typed outcome of
2233/// [`crate::engine_backend::EngineBackend::block_execution_for_admission`].
2234///
2235/// Mirrors [`BlockRouteOutcome`] but carries the reason so operator
2236/// telemetry can distinguish budget-vs-quota-vs-capability rejects.
2237#[derive(Clone, Debug, PartialEq, Eq)]
2238#[non_exhaustive]
2239pub enum BlockExecutionForAdmissionOutcome {
2240    /// Execution moved eligible → blocked_<reason> successfully.
2241    Blocked { execution_id: ExecutionId, reason: BlockingReason },
2242    /// Lua returned a non-success result (e.g. execution went
2243    /// terminal between pick and block).
2244    LuaRejected { message: String },
2245}
2246
2247// Deprecated alias shape — left intact for any pre-existing consumer.
2248#[derive(Clone, Debug, Serialize, Deserialize)]
2249pub struct BlockExecutionArgs {
2250    pub execution_id: ExecutionId,
2251    pub blocking_reason: String,
2252    #[serde(default)]
2253    pub blocking_detail: Option<String>,
2254    pub now: TimestampMs,
2255}
2256
2257#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2258pub enum BlockExecutionResult {
2259    /// Execution blocked.
2260    Blocked,
2261}
2262
2263// ─── unblock_execution ───
2264
2265#[derive(Clone, Debug, Serialize, Deserialize)]
2266pub struct UnblockExecutionArgs {
2267    pub execution_id: ExecutionId,
2268    pub now: TimestampMs,
2269    /// Expected blocking reason (prevents stale unblock).
2270    #[serde(default)]
2271    pub expected_blocking_reason: Option<String>,
2272}
2273
2274#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2275pub enum UnblockExecutionResult {
2276    /// Execution unblocked and moved to eligible.
2277    Unblocked,
2278}
2279
2280// ═══════════════════════════════════════════════════════════════════════
2281// Phase 6 contracts: flow coordination and dependencies
2282// ═══════════════════════════════════════════════════════════════════════
2283
2284// ─── create_flow ───
2285
2286#[derive(Clone, Debug, Serialize, Deserialize)]
2287pub struct CreateFlowArgs {
2288    pub flow_id: crate::types::FlowId,
2289    pub flow_kind: String,
2290    pub namespace: Namespace,
2291    pub now: TimestampMs,
2292}
2293
2294#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2295pub enum CreateFlowResult {
2296    /// Flow created successfully.
2297    Created { flow_id: crate::types::FlowId },
2298    /// Flow already exists (idempotent).
2299    AlreadySatisfied { flow_id: crate::types::FlowId },
2300}
2301
2302// ─── add_execution_to_flow ───
2303
2304#[derive(Clone, Debug, Serialize, Deserialize)]
2305pub struct AddExecutionToFlowArgs {
2306    pub flow_id: crate::types::FlowId,
2307    pub execution_id: ExecutionId,
2308    pub now: TimestampMs,
2309}
2310
2311#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2312pub enum AddExecutionToFlowResult {
2313    /// Execution added to flow.
2314    Added {
2315        execution_id: ExecutionId,
2316        new_node_count: u32,
2317    },
2318    /// Already a member (idempotent).
2319    AlreadyMember {
2320        execution_id: ExecutionId,
2321        node_count: u32,
2322    },
2323}
2324
2325// ─── cancel_flow ───
2326
2327#[derive(Clone, Debug, Serialize, Deserialize)]
2328pub struct CancelFlowArgs {
2329    pub flow_id: crate::types::FlowId,
2330    pub reason: String,
2331    pub cancellation_policy: String,
2332    pub now: TimestampMs,
2333}
2334
2335#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2336pub enum CancelFlowResult {
2337    /// Flow cancelled and all member cancellations (if any) have completed
2338    /// synchronously. Used when `cancellation_policy != "cancel_all"`, when
2339    /// the flow has no members, when the caller opted into synchronous
2340    /// dispatch (e.g. `?wait=true`), or when the flow was already in a
2341    /// terminal state (idempotent retry).
2342    ///
2343    /// On the idempotent-retry path `member_execution_ids` may be *capped*
2344    /// at the server (default 1000 entries) to bound response bandwidth on
2345    /// flows with very large membership. The first (non-idempotent) call
2346    /// always returns the full list, so clients that need every member id
2347    /// should persist the initial response.
2348    Cancelled {
2349        cancellation_policy: String,
2350        member_execution_ids: Vec<String>,
2351    },
2352    /// Flow state was flipped to cancelled atomically, but member
2353    /// cancellations are dispatched asynchronously in the background.
2354    /// Clients may poll `GET /v1/executions/{id}/state` for each member
2355    /// execution id to track terminal state.
2356    CancellationScheduled {
2357        cancellation_policy: String,
2358        member_count: u32,
2359        member_execution_ids: Vec<String>,
2360    },
2361    /// `?wait=true` dispatch completed but one or more member cancellations
2362    /// failed mid-loop (e.g. ghost member, Lua error, transport fault after
2363    /// retries exhausted). The flow itself is still flipped to cancelled
2364    /// (atomic Lua already ran); callers SHOULD inspect
2365    /// `failed_member_execution_ids` and either retry those ids directly
2366    /// via `cancel_execution` or wait for the cancel-backlog reconciler
2367    /// to retry them in the background.
2368    ///
2369    /// Only emitted by the synchronous wait path
2370    /// ([`crate::CancelFlowArgs`] via `?wait=true`). The async path returns
2371    /// [`CancelFlowResult::CancellationScheduled`] and delegates retries
2372    /// to the reconciler — there is no visible "partial" state on the
2373    /// async path because the dispatch result is not observed inline.
2374    PartiallyCancelled {
2375        cancellation_policy: String,
2376        /// All member execution ids that the cancel_flow FCALL returned
2377        /// (i.e. the full membership at the moment of cancellation).
2378        member_execution_ids: Vec<String>,
2379        /// Strict subset of `member_execution_ids` whose per-member cancel
2380        /// FCALL returned an error. Order is deterministic (matches the
2381        /// iteration order over `member_execution_ids`).
2382        failed_member_execution_ids: Vec<String>,
2383    },
2384}
2385
2386/// RFC-017 Stage E2: result of the "header" portion of a cancel_flow
2387/// operation — the atomic flow-state flip + member enumeration.
2388///
2389/// The Server composes this with its own wait/async member-dispatch
2390/// machinery to build the wire-level [`CancelFlowResult`]. Backends
2391/// implement [`crate::engine_backend::EngineBackend::cancel_flow_header`]
2392/// (default: `Unavailable`) so the Valkey-native `ff_cancel_flow`
2393/// FCALL (with its `flow_already_terminal` idempotency branch) can be
2394/// driven through the trait without re-shaping the existing public
2395/// `cancel_flow(id, policy, wait)` signature.
2396#[derive(Clone, Debug, PartialEq, Eq)]
2397#[non_exhaustive]
2398pub enum CancelFlowHeader {
2399    /// Flow-state flipped this call. `member_execution_ids` is the
2400    /// full (uncapped) membership at flip time.
2401    Cancelled {
2402        cancellation_policy: String,
2403        member_execution_ids: Vec<String>,
2404    },
2405    /// Flow was already in a terminal state on entry. The backend has
2406    /// surfaced the *stored* `cancellation_policy`, `cancel_reason`,
2407    /// and full membership so the Server can return an idempotent
2408    /// [`CancelFlowResult::Cancelled`] without re-doing the flip.
2409    AlreadyTerminal {
2410        /// `None` only for flows cancelled by pre-E2 Lua that never
2411        /// persisted the policy field.
2412        stored_cancellation_policy: Option<String>,
2413        /// `None` when the flow was never cancel-reason-stamped.
2414        stored_cancel_reason: Option<String>,
2415        /// Full membership. Server caps to
2416        /// `ALREADY_TERMINAL_MEMBER_CAP` before wiring.
2417        member_execution_ids: Vec<String>,
2418    },
2419}
2420
2421// ─── stage_dependency_edge ───
2422
2423#[derive(Clone, Debug, Serialize, Deserialize)]
2424pub struct StageDependencyEdgeArgs {
2425    pub flow_id: crate::types::FlowId,
2426    pub edge_id: crate::types::EdgeId,
2427    pub upstream_execution_id: ExecutionId,
2428    pub downstream_execution_id: ExecutionId,
2429    #[serde(default = "default_dependency_kind")]
2430    pub dependency_kind: String,
2431    #[serde(default)]
2432    pub data_passing_ref: Option<String>,
2433    pub expected_graph_revision: u64,
2434    pub now: TimestampMs,
2435}
2436
2437fn default_dependency_kind() -> String {
2438    "success_only".to_owned()
2439}
2440
2441#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2442pub enum StageDependencyEdgeResult {
2443    /// Edge staged, new graph revision.
2444    Staged {
2445        edge_id: crate::types::EdgeId,
2446        new_graph_revision: u64,
2447    },
2448}
2449
2450// ─── apply_dependency_to_child ───
2451
2452#[derive(Clone, Debug, Serialize, Deserialize)]
2453pub struct ApplyDependencyToChildArgs {
2454    pub flow_id: crate::types::FlowId,
2455    pub edge_id: crate::types::EdgeId,
2456    /// The child execution that receives the dependency.
2457    pub downstream_execution_id: ExecutionId,
2458    pub upstream_execution_id: ExecutionId,
2459    pub graph_revision: u64,
2460    #[serde(default = "default_dependency_kind")]
2461    pub dependency_kind: String,
2462    #[serde(default)]
2463    pub data_passing_ref: Option<String>,
2464    pub now: TimestampMs,
2465}
2466
2467#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2468pub enum ApplyDependencyToChildResult {
2469    /// Dependency applied, N unsatisfied deps remaining.
2470    Applied { unsatisfied_count: u32 },
2471    /// Already applied (idempotent).
2472    AlreadyApplied,
2473}
2474
2475// ─── resolve_dependency ───
2476
2477/// Inputs to [`crate::engine_backend::EngineBackend::resolve_dependency`]
2478/// — the trait-level entry point PR-7b Step 0 lifted out of the two
2479/// inline FCALL call sites (`ff-engine::partition_router::
2480/// dispatch_dependency_resolution` and
2481/// `ff-engine::scanner::dependency_reconciler::resolve_eligible_edges`).
2482///
2483/// Both Valkey call sites build identical KEYS[14]+ARGV[5] arrays.
2484/// The fields below are the minimum they need to survive a trait-
2485/// boundary hand-off: `partition` drives the Valkey key-tagging,
2486/// `downstream_execution_id` + `lane_id` + `current_attempt_index`
2487/// pin the child's KEYS, `upstream_execution_id` derives KEYS[11]
2488/// (`upstream_result` for server-side `data_passing_ref` copy),
2489/// `flow_id` supplies the RFC-016 Stage C ARGV[4] + edgegroup/incoming
2490/// KEYS.
2491///
2492/// `#[non_exhaustive]` + `::new` per
2493/// `feedback_non_exhaustive_needs_constructor`. Does NOT derive
2494/// `Serialize` / `Deserialize` — this is a trait-boundary args
2495/// struct, not a wire-format type.
2496#[derive(Clone, Debug)]
2497#[non_exhaustive]
2498pub struct ResolveDependencyArgs {
2499    /// Child (downstream) execution's partition. Flow + exec partitions
2500    /// co-locate on `{fp:N}` post-RFC-011 so the FCALL stays single-slot.
2501    pub partition: crate::partition::Partition,
2502    pub flow_id: crate::types::FlowId,
2503    pub downstream_execution_id: ExecutionId,
2504    pub upstream_execution_id: ExecutionId,
2505    pub edge_id: crate::types::EdgeId,
2506    pub lane_id: LaneId,
2507    /// Child's current attempt index — selects `attempt_hash` +
2508    /// `stream_meta` KEYS so late satisfaction updates the active
2509    /// attempt (race-safe under renewal).
2510    pub current_attempt_index: AttemptIndex,
2511    /// "success", "failed", "cancelled", "expired", "skipped"
2512    pub upstream_outcome: String,
2513    pub now: TimestampMs,
2514}
2515
2516impl ResolveDependencyArgs {
2517    /// Construct a `ResolveDependencyArgs`. Added alongside
2518    /// `#[non_exhaustive]` per
2519    /// `feedback_non_exhaustive_needs_constructor`.
2520    #[allow(clippy::too_many_arguments)]
2521    pub fn new(
2522        partition: crate::partition::Partition,
2523        flow_id: crate::types::FlowId,
2524        downstream_execution_id: ExecutionId,
2525        upstream_execution_id: ExecutionId,
2526        edge_id: crate::types::EdgeId,
2527        lane_id: LaneId,
2528        current_attempt_index: AttemptIndex,
2529        upstream_outcome: String,
2530        now: TimestampMs,
2531    ) -> Self {
2532        Self {
2533            partition,
2534            flow_id,
2535            downstream_execution_id,
2536            upstream_execution_id,
2537            edge_id,
2538            lane_id,
2539            current_attempt_index,
2540            upstream_outcome,
2541            now,
2542        }
2543    }
2544}
2545
2546/// Typed outcome of
2547/// [`crate::engine_backend::EngineBackend::resolve_dependency`].
2548///
2549/// `#[non_exhaustive]` so additional variants (e.g. `ChildSkipped`
2550/// cascade hints) can be added in minor releases without forcing
2551/// match-arm churn on consumers.
2552#[derive(Clone, Debug, PartialEq, Eq)]
2553#[non_exhaustive]
2554pub enum ResolveDependencyOutcome {
2555    /// Edge satisfied — downstream may become eligible.
2556    Satisfied,
2557    /// Edge made impossible — downstream becomes skipped.
2558    Impossible,
2559    /// Already resolved (idempotent).
2560    AlreadyResolved,
2561}
2562
2563/// Legacy name retained for `ff-script`'s `FromFcallResult` plumbing.
2564/// Prefer [`ResolveDependencyOutcome`] in trait-level code.
2565#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2566pub enum ResolveDependencyResult {
2567    /// Edge satisfied — downstream may become eligible.
2568    Satisfied,
2569    /// Edge made impossible — downstream becomes skipped.
2570    Impossible,
2571    /// Already resolved (idempotent).
2572    AlreadyResolved,
2573}
2574
2575impl From<ResolveDependencyResult> for ResolveDependencyOutcome {
2576    fn from(r: ResolveDependencyResult) -> Self {
2577        match r {
2578            ResolveDependencyResult::Satisfied => ResolveDependencyOutcome::Satisfied,
2579            ResolveDependencyResult::Impossible => ResolveDependencyOutcome::Impossible,
2580            ResolveDependencyResult::AlreadyResolved => ResolveDependencyOutcome::AlreadyResolved,
2581        }
2582    }
2583}
2584
2585// ─── cascade_completion (PR-7b Cluster 4) ─────────────────────────────
2586
2587/// Typed outcome of
2588/// [`crate::engine_backend::EngineBackend::cascade_completion`].
2589///
2590/// Observable result of cascading a terminal-execution completion into
2591/// its downstream edges. Counters are best-effort — they describe what
2592/// the backend actually did on this call, not an authoritative graph
2593/// state (the `dependency_reconciler` remains the correctness safety
2594/// net for both backends).
2595///
2596/// # Timing semantics
2597///
2598/// The two in-tree backends differ in *when* cascade work is observable
2599/// to the caller. This is an accepted architectural divergence; consumer
2600/// code that needs synchronous cascade either targets Valkey explicitly
2601/// or inspects Postgres's observability surface (outbox drain) to
2602/// verify.
2603///
2604/// - **Valkey (`synchronous = true`):** the FCALL cascade runs
2605///   inline — when `cascade_completion` returns, every directly
2606///   resolvable edge has been advanced and every `child_skipped`
2607///   transitive descendant has been recursively cascaded up to the
2608///   `MAX_CASCADE_DEPTH` cap. `resolved` + `cascaded_children`
2609///   reflect the full subtree walked on this invocation.
2610/// - **Postgres (`synchronous = false`):** the call enqueues a
2611///   dispatch against the `ff_completion_event` outbox row; actual
2612///   downstream `ff_edge_group` advancement is performed by
2613///   `ff_backend_postgres::dispatch::dispatch_completion` in per-hop
2614///   transactions. `resolved` is the number of edge groups advanced
2615///   during this invocation's outbox drain; `cascaded_children` is
2616///   always `0` (PG does not self-recurse — further hops go through
2617///   their own outbox events emitted by the completing transaction).
2618///
2619/// `#[non_exhaustive]` so additional counters (e.g. an explicit
2620/// `dispatched_event_id` for PG, or a `depth_reached` for Valkey) can
2621/// be added without breaking consumers.
2622#[derive(Clone, Debug, Default, PartialEq, Eq)]
2623#[non_exhaustive]
2624pub struct CascadeOutcome {
2625    /// Edges whose resolution observably advanced on this call.
2626    pub resolved: usize,
2627    /// Transitive descendants cascaded synchronously (Valkey only;
2628    /// always `0` on Postgres — see Timing semantics).
2629    pub cascaded_children: usize,
2630    /// `true` when the caller observed the cascade inline (Valkey);
2631    /// `false` when the call only enqueued dispatch (Postgres outbox).
2632    pub synchronous: bool,
2633}
2634
2635impl CascadeOutcome {
2636    /// Construct an outcome describing a synchronous (Valkey) cascade.
2637    pub fn synchronous(resolved: usize, cascaded_children: usize) -> Self {
2638        Self { resolved, cascaded_children, synchronous: true }
2639    }
2640
2641    /// Construct an outcome describing an async (Postgres outbox)
2642    /// dispatch. `advanced` is the per-call outbox-drain count.
2643    pub fn async_dispatched(advanced: usize) -> Self {
2644        Self { resolved: advanced, cascaded_children: 0, synchronous: false }
2645    }
2646}
2647
2648// ─── promote_blocked_to_eligible ───
2649
2650#[derive(Clone, Debug, Serialize, Deserialize)]
2651pub struct PromoteBlockedToEligibleArgs {
2652    pub execution_id: ExecutionId,
2653    pub now: TimestampMs,
2654}
2655
2656#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2657pub enum PromoteBlockedToEligibleResult {
2658    Promoted,
2659}
2660
2661// ─── evaluate_flow_eligibility ───
2662
2663#[derive(Clone, Debug, Serialize, Deserialize)]
2664pub struct EvaluateFlowEligibilityArgs {
2665    pub execution_id: ExecutionId,
2666}
2667
2668#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2669pub enum EvaluateFlowEligibilityResult {
2670    /// Execution eligibility status.
2671    Status { status: String },
2672}
2673
2674// ─── replay_execution ───
2675
2676#[derive(Clone, Debug, Serialize, Deserialize)]
2677pub struct ReplayExecutionArgs {
2678    pub execution_id: ExecutionId,
2679    pub now: TimestampMs,
2680}
2681
2682#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2683pub enum ReplayExecutionResult {
2684    /// Replayed to runnable.
2685    Replayed { public_state: PublicState },
2686}
2687
2688// ─── get_execution (full read) ───
2689
2690/// Full execution info returned by `Server::get_execution`.
2691#[derive(Clone, Debug, Serialize, Deserialize)]
2692pub struct ExecutionInfo {
2693    pub execution_id: ExecutionId,
2694    pub namespace: String,
2695    pub lane_id: String,
2696    pub priority: i32,
2697    pub execution_kind: String,
2698    pub state_vector: StateVector,
2699    pub public_state: PublicState,
2700    pub created_at: String,
2701    /// TimestampMs (ms since epoch) when the execution's first attempt
2702    /// was started by a worker claim. Empty string until the first
2703    /// claim lands. Serialised as `Option<String>` so pre-claim reads
2704    /// deserialise cleanly even if the field is absent from the wire.
2705    #[serde(default, skip_serializing_if = "Option::is_none")]
2706    pub started_at: Option<String>,
2707    /// TimestampMs when the execution reached a terminal
2708    /// `completed`/`failed`/`cancelled`/`expired` state. Empty /
2709    /// absent while still in flight.
2710    #[serde(default, skip_serializing_if = "Option::is_none")]
2711    pub completed_at: Option<String>,
2712    pub current_attempt_index: u32,
2713    pub flow_id: Option<String>,
2714    pub blocking_detail: String,
2715}
2716
2717// ─── set_execution_tags / set_flow_tags (issue #58.4) ───
2718
2719/// Args for `ff_set_execution_tags`. Tag keys MUST match
2720/// `^[a-z][a-z0-9_]*\.` — the caller-namespace rule — or the FCALL
2721/// returns `invalid_tag_key`. Values are arbitrary strings. The map is
2722/// ordered (`BTreeMap`) so two callers submitting the same logical set
2723/// of tags produce identical ARGV.
2724#[derive(Clone, Debug, Serialize, Deserialize)]
2725pub struct SetExecutionTagsArgs {
2726    pub execution_id: ExecutionId,
2727    pub tags: BTreeMap<String, String>,
2728}
2729
2730/// Result of `ff_set_execution_tags`.
2731#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2732pub enum SetExecutionTagsResult {
2733    /// Tags written. `count` is the number of key-value pairs applied.
2734    Ok { count: u32 },
2735}
2736
2737/// Args for `ff_set_flow_tags`. Same namespace rule as
2738/// [`SetExecutionTagsArgs`]. The Lua function also lazy-migrates any
2739/// pre-58.4 reserved-namespace fields stashed inline on `flow_core` into
2740/// the new tags key.
2741#[derive(Clone, Debug, Serialize, Deserialize)]
2742pub struct SetFlowTagsArgs {
2743    pub flow_id: FlowId,
2744    pub tags: BTreeMap<String, String>,
2745}
2746
2747/// Result of `ff_set_flow_tags`.
2748#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2749pub enum SetFlowTagsResult {
2750    /// Tags written. `count` is the number of key-value pairs applied.
2751    Ok { count: u32 },
2752}
2753
2754// ─── describe_execution (issue #58.1) ───
2755
2756/// Engine-decoupled read-model for one execution.
2757///
2758/// Returned by `ff_sdk::FlowFabricWorker::describe_execution`. Consumers
2759/// consult this struct instead of reaching into Valkey's exec_core hash
2760/// directly — the engine is free to rename fields or restructure storage
2761/// under this surface.
2762///
2763/// `#[non_exhaustive]` — FF may add fields in minor releases without a
2764/// semver break. Match with `..` or use field-by-field construction.
2765#[derive(Clone, Debug, PartialEq, Eq)]
2766#[non_exhaustive]
2767pub struct ExecutionSnapshot {
2768    pub execution_id: ExecutionId,
2769    pub flow_id: Option<FlowId>,
2770    pub lane_id: LaneId,
2771    pub namespace: Namespace,
2772    pub public_state: PublicState,
2773    /// Blocking reason string (e.g. `"waiting_for_worker"`,
2774    /// `"waiting_for_delay"`, `"waiting_for_dependencies"`). `None` when
2775    /// the exec_core field is empty.
2776    pub blocking_reason: Option<String>,
2777    /// Free-form operator-readable detail explaining `blocking_reason`.
2778    /// `None` when the exec_core field is empty.
2779    pub blocking_detail: Option<String>,
2780    /// Summary of the execution's currently-active attempt. `None` when
2781    /// no attempt has been started (pre-claim) or when the exec_core
2782    /// attempt fields are all empty.
2783    pub current_attempt: Option<AttemptSummary>,
2784    /// Summary of the execution's currently-held lease. `None` when the
2785    /// execution is not held by a worker.
2786    pub current_lease: Option<LeaseSummary>,
2787    /// The waitpoint this execution is currently suspended on, if any.
2788    pub current_waitpoint: Option<WaitpointId>,
2789    pub created_at: TimestampMs,
2790    /// Timestamp of the last write that mutated exec_core. Engine-maintained.
2791    pub last_mutation_at: TimestampMs,
2792    pub total_attempt_count: u32,
2793    /// Caller-owned labels. The prefix `^[a-z][a-z0-9_]*\.` is reserved for
2794    /// consumer metadata (e.g. `cairn.task_id`); FF guarantees it will not
2795    /// write keys matching that shape. FF's own fields stay in snake_case
2796    /// without dots. Empty when no tags are set.
2797    pub tags: BTreeMap<String, String>,
2798}
2799
2800impl ExecutionSnapshot {
2801    /// Construct an [`ExecutionSnapshot`]. Present so downstream crates
2802    /// (ff-sdk's `describe_execution`) can assemble the struct despite
2803    /// the `#[non_exhaustive]` marker. Prefer adding builder-style
2804    /// helpers here over loosening `non_exhaustive`.
2805    #[allow(clippy::too_many_arguments)]
2806    pub fn new(
2807        execution_id: ExecutionId,
2808        flow_id: Option<FlowId>,
2809        lane_id: LaneId,
2810        namespace: Namespace,
2811        public_state: PublicState,
2812        blocking_reason: Option<String>,
2813        blocking_detail: Option<String>,
2814        current_attempt: Option<AttemptSummary>,
2815        current_lease: Option<LeaseSummary>,
2816        current_waitpoint: Option<WaitpointId>,
2817        created_at: TimestampMs,
2818        last_mutation_at: TimestampMs,
2819        total_attempt_count: u32,
2820        tags: BTreeMap<String, String>,
2821    ) -> Self {
2822        Self {
2823            execution_id,
2824            flow_id,
2825            lane_id,
2826            namespace,
2827            public_state,
2828            blocking_reason,
2829            blocking_detail,
2830            current_attempt,
2831            current_lease,
2832            current_waitpoint,
2833            created_at,
2834            last_mutation_at,
2835            total_attempt_count,
2836            tags,
2837        }
2838    }
2839}
2840
2841/// Currently-active attempt summary inside an [`ExecutionSnapshot`].
2842///
2843/// `#[non_exhaustive]`.
2844#[derive(Clone, Debug, PartialEq, Eq)]
2845#[non_exhaustive]
2846pub struct AttemptSummary {
2847    pub attempt_id: AttemptId,
2848    pub attempt_index: AttemptIndex,
2849}
2850
2851impl AttemptSummary {
2852    /// Construct an [`AttemptSummary`]. See [`ExecutionSnapshot::new`]
2853    /// for the rationale — `#[non_exhaustive]` blocks cross-crate
2854    /// struct-literal construction.
2855    pub fn new(attempt_id: AttemptId, attempt_index: AttemptIndex) -> Self {
2856        Self {
2857            attempt_id,
2858            attempt_index,
2859        }
2860    }
2861}
2862
2863/// Currently-held lease summary inside an [`ExecutionSnapshot`].
2864///
2865/// `#[non_exhaustive]`. New fields may be added in minor releases — use
2866/// [`LeaseSummary::new`] plus the fluent `with_*` setters to construct
2867/// one, and match with `..` in destructuring.
2868///
2869/// # Field provenance (FF#278)
2870///
2871/// * `lease_id` — minted at claim time (`ff_claim_execution` /
2872///   `ff_claim_resumed_execution`), cleared atomically with the other
2873///   lease fields on revoke/expire/complete. Stable for the lifetime
2874///   of a single lease; a fresh one is minted per re-claim. Defaults
2875///   to the nil UUID when a backend does not surface per-lease ids
2876///   (treat as "field not populated").
2877/// * `attempt_index` — the 1-based attempt counter (`current_attempt_index`
2878///   on `exec_core`). Set atomically with `current_attempt_id` at claim
2879///   time; always populated while a Valkey-backed lease is held.
2880/// * `last_heartbeat_at` — the most recent `ff_renew_lease` timestamp
2881///   (`lease_last_renewed_at` on `exec_core`). `None` when the field
2882///   is empty (e.g. legacy data pre-0.9, or a backend that does not
2883///   surface per-renewal heartbeats).
2884#[derive(Clone, Debug, PartialEq, Eq)]
2885#[non_exhaustive]
2886pub struct LeaseSummary {
2887    pub lease_epoch: LeaseEpoch,
2888    pub worker_instance_id: WorkerInstanceId,
2889    pub expires_at: TimestampMs,
2890    /// Per-lease unique identity. Correlates audit-log entries,
2891    /// reclaim events, and recovery traces.
2892    pub lease_id: LeaseId,
2893    /// 1-based attempt counter; `.0` mirrors `current_attempt_index`
2894    /// on `exec_core`.
2895    pub attempt_index: AttemptIndex,
2896    /// Most recent heartbeat (lease-renewal) timestamp. `None` when the
2897    /// backend does not surface per-renewal ticks on this lease.
2898    pub last_heartbeat_at: Option<TimestampMs>,
2899}
2900
2901impl LeaseSummary {
2902    /// Construct a [`LeaseSummary`] with the three always-present
2903    /// fields. Use the `with_*` setters to populate the FF#278
2904    /// additions (`lease_id`, `attempt_index`, `last_heartbeat_at`);
2905    /// otherwise they default to the nil / zero / empty forms, which
2906    /// callers should treat as "field not surfaced by this backend".
2907    ///
2908    /// See [`ExecutionSnapshot::new`] for the broader `#[non_exhaustive]`
2909    /// construction rationale.
2910    pub fn new(
2911        lease_epoch: LeaseEpoch,
2912        worker_instance_id: WorkerInstanceId,
2913        expires_at: TimestampMs,
2914    ) -> Self {
2915        Self {
2916            lease_epoch,
2917            worker_instance_id,
2918            expires_at,
2919            lease_id: LeaseId::from_uuid(uuid::Uuid::nil()),
2920            attempt_index: AttemptIndex::new(0),
2921            last_heartbeat_at: None,
2922        }
2923    }
2924
2925    /// Set the lease's unique identity (FF#278).
2926    #[must_use]
2927    pub fn with_lease_id(mut self, lease_id: LeaseId) -> Self {
2928        self.lease_id = lease_id;
2929        self
2930    }
2931
2932    /// Set the 1-based attempt counter (FF#278).
2933    #[must_use]
2934    pub fn with_attempt_index(mut self, attempt_index: AttemptIndex) -> Self {
2935        self.attempt_index = attempt_index;
2936        self
2937    }
2938
2939    /// Set the most recent heartbeat timestamp (FF#278).
2940    #[must_use]
2941    pub fn with_last_heartbeat_at(mut self, ts: TimestampMs) -> Self {
2942        self.last_heartbeat_at = Some(ts);
2943        self
2944    }
2945}
2946
2947// ─── read_execution_context (v0.12 agnostic-SDK prep, PR-1) ───
2948
2949/// Point-read bundle of the three execution-scoped fields the SDK
2950/// worker needs to construct a `ClaimedTask` (see `ff_sdk::ClaimedTask`):
2951/// `input_payload`, `execution_kind`, and `tags`.
2952///
2953/// Returned by
2954/// [`EngineBackend::read_execution_context`](crate::engine_backend::EngineBackend::read_execution_context).
2955/// All three fields are execution-scoped (not per-attempt) across
2956/// Valkey, Postgres, and SQLite — there is no per-attempt variant in
2957/// the data model.
2958///
2959/// `#[non_exhaustive]` — FF may add fields in minor releases without a
2960/// semver break. Construct via [`ExecutionContext::new`]; match with
2961/// `..` when destructuring.
2962#[derive(Clone, Debug, PartialEq, Eq)]
2963#[non_exhaustive]
2964pub struct ExecutionContext {
2965    /// Opaque payload handed to the execution body. Empty when the
2966    /// execution was created with no payload.
2967    pub input_payload: Vec<u8>,
2968    /// Caller-supplied `execution_kind` label — free-form string
2969    /// identifying which handler the worker should dispatch to.
2970    pub execution_kind: String,
2971    /// Caller-owned tag map. Tag key conventions mirror
2972    /// [`ExecutionSnapshot::tags`]; empty when no tags are set.
2973    pub tags: HashMap<String, String>,
2974}
2975
2976impl ExecutionContext {
2977    /// Construct an [`ExecutionContext`]. Present so downstream crates
2978    /// (concrete `EngineBackend` impls) can assemble the struct despite
2979    /// the `#[non_exhaustive]` marker. Prefer adding builder-style
2980    /// helpers here over loosening `non_exhaustive`.
2981    pub fn new(
2982        input_payload: Vec<u8>,
2983        execution_kind: String,
2984        tags: HashMap<String, String>,
2985    ) -> Self {
2986        Self {
2987            input_payload,
2988            execution_kind,
2989            tags,
2990        }
2991    }
2992}
2993
2994// ─── Common sub-types ───
2995
2996// ─── describe_flow (issue #58.2) ───
2997
2998/// Engine-decoupled read-model for one flow.
2999///
3000/// Returned by `ff_sdk::FlowFabricWorker::describe_flow`. Consumers
3001/// consult this struct instead of reaching into Valkey's flow_core hash
3002/// directly — the engine is free to rename fields or restructure storage
3003/// under this surface.
3004///
3005/// `#[non_exhaustive]` — FF may add fields in minor releases without a
3006/// semver break. Match with `..` or use [`FlowSnapshot::new`].
3007///
3008/// # `public_flow_state`
3009///
3010/// Stored as an engine-written string literal on `flow_core`. Known
3011/// values today: `open`, `running`, `blocked`, `cancelled`, `completed`,
3012/// `failed`. Surfaced as `String` (not a typed enum) because FF does
3013/// not yet expose a `PublicFlowState` type — callers that need to act
3014/// on specific values should match on the literal. The flow_projector
3015/// writes a parallel `public_flow_state` into the flow's summary hash;
3016/// this field reflects the authoritative value on `flow_core`, which
3017/// is what mutation guards (cancel/add-member) consult.
3018///
3019/// # `tags`
3020///
3021/// Unlike [`ExecutionSnapshot::tags`] (which has a dedicated tags
3022/// hash), flow tags live inline on `flow_core`. FF's own fields are
3023/// snake_case without a `.`; any field whose name starts with
3024/// `<lowercase>.` (e.g. `cairn.task_id`) is treated as consumer-owned
3025/// metadata and routed here. An empty map means no namespaced tags
3026/// were written. The prefix convention mirrors
3027/// [`ExecutionSnapshot::tags`] — consumers should keep tag keys
3028/// namespaced (`cairn.*`, `operator.*`, etc.) so future FF field
3029/// additions don't collide.
3030#[derive(Clone, Debug, PartialEq, Eq)]
3031#[non_exhaustive]
3032pub struct FlowSnapshot {
3033    pub flow_id: FlowId,
3034    /// The `flow_kind` literal passed to `create_flow` (e.g. `dag`,
3035    /// `pipeline`). Preserved as-is; FF does not interpret it.
3036    pub flow_kind: String,
3037    pub namespace: Namespace,
3038    /// Authoritative flow state on `flow_core`. See the struct-level
3039    /// docs for the set of known values.
3040    pub public_flow_state: String,
3041    /// Monotonically increasing revision bumped on every structural
3042    /// mutation (add-member, stage-edge). Used by optimistic-concurrency
3043    /// writers via `expected_graph_revision`.
3044    pub graph_revision: u64,
3045    /// Number of member executions added so far. Never decremented.
3046    pub node_count: u32,
3047    /// Number of dependency edges staged so far. Never decremented.
3048    pub edge_count: u32,
3049    pub created_at: TimestampMs,
3050    /// Timestamp of the last write that mutated `flow_core`.
3051    /// Engine-maintained.
3052    pub last_mutation_at: TimestampMs,
3053    /// When the flow reached a terminal state via `cancel_flow`. `None`
3054    /// while the flow is live. Only written by the cancel path today;
3055    /// `completed`/`failed` terminal states do not populate this field
3056    /// (the flow_projector derives them from membership).
3057    pub cancelled_at: Option<TimestampMs>,
3058    /// Operator-supplied reason from the `cancel_flow` call. `None`
3059    /// when the flow has not been cancelled.
3060    pub cancel_reason: Option<String>,
3061    /// The `cancellation_policy` value persisted by `cancel_flow`
3062    /// (e.g. `cancel_all`, `cancel_flow_only`). `None` for flows
3063    /// cancelled before this field was persisted, or not yet cancelled.
3064    pub cancellation_policy: Option<String>,
3065    /// Consumer-owned namespaced metadata (e.g. `cairn.task_id`). See
3066    /// the struct-level docs for the routing rule.
3067    pub tags: BTreeMap<String, String>,
3068    /// RFC-016 Stage A: inbound edge groups known on this flow.
3069    ///
3070    /// One entry per downstream execution that has at least one staged
3071    /// inbound dependency edge. Populated from the
3072    /// `ff:flow:{fp:N}:<flow_id>:edgegroup:<downstream_eid>` hash —
3073    /// when that hash is absent (existing flows created before Stage A),
3074    /// the backend falls back to reading the legacy
3075    /// `deps_meta.unsatisfied_required_count` counter on the
3076    /// downstream's exec partition and reports the group as
3077    /// [`EdgeDependencyPolicy::AllOf`] with the derived counters
3078    /// (backward-compat shim — see RFC-016 §11 Stage A).
3079    ///
3080    /// Every entry in Stage A reports `policy = AllOf`; Stage B/C/D/E
3081    /// extend the variants and wire the quorum counters.
3082    pub edge_groups: Vec<EdgeGroupSnapshot>,
3083}
3084
3085impl FlowSnapshot {
3086    /// Construct a [`FlowSnapshot`]. Present so downstream crates
3087    /// (ff-sdk's `describe_flow`) can assemble the struct despite the
3088    /// `#[non_exhaustive]` marker.
3089    #[allow(clippy::too_many_arguments)]
3090    pub fn new(
3091        flow_id: FlowId,
3092        flow_kind: String,
3093        namespace: Namespace,
3094        public_flow_state: String,
3095        graph_revision: u64,
3096        node_count: u32,
3097        edge_count: u32,
3098        created_at: TimestampMs,
3099        last_mutation_at: TimestampMs,
3100        cancelled_at: Option<TimestampMs>,
3101        cancel_reason: Option<String>,
3102        cancellation_policy: Option<String>,
3103        tags: BTreeMap<String, String>,
3104        edge_groups: Vec<EdgeGroupSnapshot>,
3105    ) -> Self {
3106        Self {
3107            flow_id,
3108            flow_kind,
3109            namespace,
3110            public_flow_state,
3111            graph_revision,
3112            node_count,
3113            edge_count,
3114            created_at,
3115            last_mutation_at,
3116            cancelled_at,
3117            cancel_reason,
3118            cancellation_policy,
3119            tags,
3120            edge_groups,
3121        }
3122    }
3123}
3124
3125// ─── describe_edge / list_*_edges (issue #58.3) ───
3126
3127/// Engine-decoupled read-model for one dependency edge.
3128///
3129/// Returned by `ff_sdk::FlowFabricWorker::describe_edge`,
3130/// `list_incoming_edges`, and `list_outgoing_edges`. Consumers consult
3131/// this struct instead of reaching into Valkey's per-flow `edge:` hash
3132/// directly — the engine is free to rename hash fields or restructure
3133/// key layout under this surface.
3134///
3135/// `#[non_exhaustive]` — FF may add fields in minor releases without a
3136/// semver break. Match with `..` or use [`EdgeSnapshot::new`].
3137///
3138/// # Fields
3139///
3140/// The struct mirrors the immutable edge record written by
3141/// `ff_stage_dependency_edge` (see `lua/flow.lua`). The flow-scoped
3142/// edge hash is only ever written once, at staging time; per-execution
3143/// resolution state lives on a separate `dep:<edge_id>` hash and is not
3144/// surfaced here. The `edge_state` field therefore reflects the
3145/// staging-time literal (currently `pending`), not the downstream
3146/// execution's dep-edge state.
3147#[derive(Clone, Debug, PartialEq, Eq)]
3148#[non_exhaustive]
3149pub struct EdgeSnapshot {
3150    pub edge_id: EdgeId,
3151    pub flow_id: FlowId,
3152    pub upstream_execution_id: ExecutionId,
3153    pub downstream_execution_id: ExecutionId,
3154    /// The `dependency_kind` literal (e.g. `success_only`) from
3155    /// `stage_dependency_edge`. Preserved as-is; FF does not interpret
3156    /// it on reads.
3157    pub dependency_kind: String,
3158    /// The satisfaction-condition literal stamped at staging time
3159    /// (e.g. `all_required`).
3160    pub satisfaction_condition: String,
3161    /// Optional opaque handle to a data-passing artifact. `None` when
3162    /// the stored field is empty (the most common case).
3163    pub data_passing_ref: Option<String>,
3164    /// Edge-state literal on the flow-scoped edge hash. Written once
3165    /// at staging as `pending`; this hash is immutable on the flow
3166    /// side. Per-execution resolution state is tracked separately on
3167    /// the child's `dep:<edge_id>` hash.
3168    pub edge_state: String,
3169    pub created_at: TimestampMs,
3170    /// Origin of the edge (e.g. `engine`). Preserved as-is.
3171    pub created_by: String,
3172}
3173
3174/// Direction marker for [`crate::engine_backend::EngineBackend::list_edges`].
3175///
3176/// Carries the subject execution whose adjacency side the caller wants
3177/// to list — mirrors the internal `AdjacencySide + subject_eid` pair
3178/// the ff-sdk free-fn `list_edges_from_set` already uses. Keeping
3179/// direction + subject fused in one enum means the trait method has a
3180/// single `direction` parameter rather than a `(side, eid)` pair, and
3181/// the backend impl can't forget one of the two.
3182///
3183/// * `Outgoing { from_node }` — the caller wants every edge whose
3184///   `upstream_execution_id == from_node`. Corresponds to the
3185///   `out:<execution_id>` adjacency SET under the execution's flow
3186///   partition.
3187/// * `Incoming { to_node }` — the caller wants every edge whose
3188///   `downstream_execution_id == to_node`. Corresponds to the
3189///   `in:<execution_id>` adjacency SET under the execution's flow
3190///   partition.
3191#[derive(Clone, Debug, PartialEq, Eq)]
3192pub enum EdgeDirection {
3193    /// Edges leaving `from_node` — `out:` adjacency SET.
3194    Outgoing {
3195        /// The subject execution whose outgoing edges to list.
3196        from_node: ExecutionId,
3197    },
3198    /// Edges landing on `to_node` — `in:` adjacency SET.
3199    Incoming {
3200        /// The subject execution whose incoming edges to list.
3201        to_node: ExecutionId,
3202    },
3203}
3204
3205impl EdgeDirection {
3206    /// Return the subject execution id regardless of direction. Shared
3207    /// helper for backend impls that need the execution id for the
3208    /// initial `HGET exec_core.flow_id` lookup (flow routing) before
3209    /// they know which adjacency SET to read.
3210    pub fn subject(&self) -> &ExecutionId {
3211        match self {
3212            Self::Outgoing { from_node } => from_node,
3213            Self::Incoming { to_node } => to_node,
3214        }
3215    }
3216}
3217
3218impl EdgeSnapshot {
3219    /// Construct an [`EdgeSnapshot`]. Present so downstream crates
3220    /// (ff-sdk's `describe_edge` / `list_*_edges`) can assemble the
3221    /// struct despite the `#[non_exhaustive]` marker.
3222    #[allow(clippy::too_many_arguments)]
3223    pub fn new(
3224        edge_id: EdgeId,
3225        flow_id: FlowId,
3226        upstream_execution_id: ExecutionId,
3227        downstream_execution_id: ExecutionId,
3228        dependency_kind: String,
3229        satisfaction_condition: String,
3230        data_passing_ref: Option<String>,
3231        edge_state: String,
3232        created_at: TimestampMs,
3233        created_by: String,
3234    ) -> Self {
3235        Self {
3236            edge_id,
3237            flow_id,
3238            upstream_execution_id,
3239            downstream_execution_id,
3240            dependency_kind,
3241            satisfaction_condition,
3242            data_passing_ref,
3243            edge_state,
3244            created_at,
3245            created_by,
3246        }
3247    }
3248}
3249
3250// ─── RFC-016 edge-group policy (Stage A) ───
3251
3252/// Policy controlling how an inbound edge group's satisfaction is
3253/// decided.
3254///
3255/// Stage A honours only [`EdgeDependencyPolicy::AllOf`] — the two
3256/// quorum variants exist so the wire/snapshot surface is stable for
3257/// Stage B/C/D's resolver extensions, but
3258/// [`crate::engine_backend::EngineBackend::set_edge_group_policy`]
3259/// rejects them with [`crate::engine_error::EngineError::Validation`]
3260/// until Stage B lands.
3261///
3262/// `#[non_exhaustive]` — future stages may add variants (e.g.
3263/// `Threshold` — see RFC-016 §10.3) without a semver break. Construct
3264/// via the [`EdgeDependencyPolicy::all_of`], [`EdgeDependencyPolicy::any_of`],
3265/// and [`EdgeDependencyPolicy::quorum`] helpers.
3266#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
3267#[non_exhaustive]
3268#[serde(tag = "kind", rename_all = "snake_case")]
3269pub enum EdgeDependencyPolicy {
3270    /// Today's behavior: every edge in the inbound group must be
3271    /// satisfied (RFC-007 `all_required` + `success_only`).
3272    AllOf,
3273    /// k-of-n where k==1 — satisfied on the first upstream success.
3274    /// Stage A: rejected on
3275    /// [`crate::engine_backend::EngineBackend::set_edge_group_policy`];
3276    /// resolver emits nothing for this variant yet.
3277    AnyOf {
3278        #[serde(rename = "on_satisfied")]
3279        on_satisfied: OnSatisfied,
3280    },
3281    /// k-of-n quorum. Stage A: rejected on
3282    /// [`crate::engine_backend::EngineBackend::set_edge_group_policy`].
3283    Quorum {
3284        k: u32,
3285        #[serde(rename = "on_satisfied")]
3286        on_satisfied: OnSatisfied,
3287    },
3288}
3289
3290impl EdgeDependencyPolicy {
3291    /// Construct the default all-of policy (RFC-007 behavior).
3292    pub fn all_of() -> Self {
3293        Self::AllOf
3294    }
3295
3296    /// Construct an any-of policy — reserved for Stage B.
3297    pub fn any_of(on_satisfied: OnSatisfied) -> Self {
3298        Self::AnyOf { on_satisfied }
3299    }
3300
3301    /// Construct a quorum policy — reserved for Stage B.
3302    pub fn quorum(k: u32, on_satisfied: OnSatisfied) -> Self {
3303        Self::Quorum { k, on_satisfied }
3304    }
3305
3306    /// Stable string label used for wire format + metric labels.
3307    /// `all_of` | `any_of` | `quorum`.
3308    pub fn variant_str(&self) -> &'static str {
3309        match self {
3310            Self::AllOf => "all_of",
3311            Self::AnyOf { .. } => "any_of",
3312            Self::Quorum { .. } => "quorum",
3313        }
3314    }
3315}
3316
3317/// Policy for unfinished sibling upstreams once the quorum is met.
3318///
3319/// `#[non_exhaustive]` — RFC-016 §10.5 rejects a third variant today
3320/// but keeps the door open. Construct via [`OnSatisfied::cancel_remaining`]
3321/// / [`OnSatisfied::let_run`].
3322#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
3323#[non_exhaustive]
3324#[serde(rename_all = "snake_case")]
3325pub enum OnSatisfied {
3326    /// Default. Cancel any still-running siblings once quorum met.
3327    CancelRemaining,
3328    /// Let stragglers finish; their terminals update counters for
3329    /// observability only (one-shot downstream).
3330    LetRun,
3331}
3332
3333impl OnSatisfied {
3334    /// Construct the default `cancel_remaining` disposition.
3335    pub fn cancel_remaining() -> Self {
3336        Self::CancelRemaining
3337    }
3338
3339    /// Construct the `let_run` disposition.
3340    pub fn let_run() -> Self {
3341        Self::LetRun
3342    }
3343
3344    /// Stable string label for wire format.
3345    pub fn variant_str(&self) -> &'static str {
3346        match self {
3347            Self::CancelRemaining => "cancel_remaining",
3348            Self::LetRun => "let_run",
3349        }
3350    }
3351}
3352
3353/// Edge-group lifecycle state (Stage A exposes only `pending` +
3354/// `satisfied` + `impossible`; `cancelled` reserved for Stage C).
3355#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
3356#[non_exhaustive]
3357#[serde(rename_all = "snake_case")]
3358pub enum EdgeGroupState {
3359    Pending,
3360    Satisfied,
3361    Impossible,
3362    Cancelled,
3363}
3364
3365impl EdgeGroupState {
3366    pub fn from_literal(s: &str) -> Self {
3367        match s {
3368            "satisfied" => Self::Satisfied,
3369            "impossible" => Self::Impossible,
3370            "cancelled" => Self::Cancelled,
3371            _ => Self::Pending,
3372        }
3373    }
3374
3375    pub fn as_str(&self) -> &'static str {
3376        match self {
3377            Self::Pending => "pending",
3378            Self::Satisfied => "satisfied",
3379            Self::Impossible => "impossible",
3380            Self::Cancelled => "cancelled",
3381        }
3382    }
3383}
3384
3385/// Snapshot of one inbound edge group (per downstream execution).
3386///
3387/// Exposed via [`FlowSnapshot::edge_groups`]. Stage A only populates
3388/// `AllOf` groups and their counters; Stage B/C add `failed` /
3389/// `skipped` / `satisfied_at` wiring for the quorum variants.
3390///
3391/// `#[non_exhaustive]` — future stages will add fields (`satisfied_at`,
3392/// `failed_count` write-path, `cancel_siblings_pending`). Match with
3393/// `..` or use [`EdgeGroupSnapshot::new`].
3394#[derive(Clone, Debug, PartialEq, Eq)]
3395#[non_exhaustive]
3396pub struct EdgeGroupSnapshot {
3397    pub downstream_execution_id: ExecutionId,
3398    pub policy: EdgeDependencyPolicy,
3399    pub total_deps: u32,
3400    pub satisfied_count: u32,
3401    pub failed_count: u32,
3402    pub skipped_count: u32,
3403    pub running_count: u32,
3404    pub group_state: EdgeGroupState,
3405}
3406
3407impl EdgeGroupSnapshot {
3408    #[allow(clippy::too_many_arguments)]
3409    pub fn new(
3410        downstream_execution_id: ExecutionId,
3411        policy: EdgeDependencyPolicy,
3412        total_deps: u32,
3413        satisfied_count: u32,
3414        failed_count: u32,
3415        skipped_count: u32,
3416        running_count: u32,
3417        group_state: EdgeGroupState,
3418    ) -> Self {
3419        Self {
3420            downstream_execution_id,
3421            policy,
3422            total_deps,
3423            satisfied_count,
3424            failed_count,
3425            skipped_count,
3426            running_count,
3427            group_state,
3428        }
3429    }
3430}
3431
3432// ─── set_edge_group_policy (RFC-016 §6.1) ───
3433
3434#[derive(Clone, Debug, Serialize, Deserialize)]
3435pub struct SetEdgeGroupPolicyArgs {
3436    pub flow_id: FlowId,
3437    pub downstream_execution_id: ExecutionId,
3438    pub policy: EdgeDependencyPolicy,
3439    pub now: TimestampMs,
3440}
3441
3442#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
3443pub enum SetEdgeGroupPolicyResult {
3444    /// Policy stored (fresh write).
3445    Set,
3446    /// Policy already stored with an identical value (idempotent).
3447    AlreadySet,
3448}
3449
3450// ─── list_flows (issue #185) ───
3451
3452/// Typed flow-lifecycle status surfaced on [`FlowSummary`].
3453///
3454/// Mirrors the free-form `public_flow_state` literal that FF's flow
3455/// lifecycle writes onto `flow_core` (known values: `open`, `running`,
3456/// `blocked`, `cancelled`, `completed`, `failed` — see [`FlowSnapshot`]).
3457/// The three "active" runtime states (`open`, `running`, `blocked`)
3458/// collapse to [`FlowStatus::Active`] here — callers that need the
3459/// exact runtime sub-state should use [`FlowSnapshot::public_flow_state`]
3460/// via [`crate::engine_backend::EngineBackend::describe_flow`]. `failed`
3461/// maps to [`FlowStatus::Failed`].
3462///
3463/// `#[non_exhaustive]` so future lifecycle states (if FF introduces
3464/// any) can be added without a semver break.
3465#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
3466#[non_exhaustive]
3467pub enum FlowStatus {
3468    /// `open` / `running` / `blocked` — flow is still live on the engine.
3469    Active,
3470    /// Terminal success: all members reached a successful terminal state
3471    /// and the flow projector flipped `public_flow_state` to `completed`.
3472    Completed,
3473    /// Terminal failure: one or more members failed and the flow
3474    /// projector flipped `public_flow_state` to `failed`.
3475    Failed,
3476    /// Cancelled by an operator via `cancel_flow`.
3477    Cancelled,
3478    /// The stored `public_flow_state` literal is present but not a
3479    /// known value. The raw literal is preserved on
3480    /// [`FlowSnapshot::public_flow_state`] — callers that need to act
3481    /// on it should fall back to [`crate::engine_backend::EngineBackend::describe_flow`].
3482    Unknown,
3483}
3484
3485impl FlowStatus {
3486    /// Map the raw `public_flow_state` literal stored on `flow_core`
3487    /// to a typed [`FlowStatus`]. Unknown literals surface as
3488    /// [`FlowStatus::Unknown`] so the list surface stays forwards-
3489    /// compatible with future engine-side state additions.
3490    pub fn from_public_flow_state(raw: &str) -> Self {
3491        match raw {
3492            "open" | "running" | "blocked" => Self::Active,
3493            "completed" => Self::Completed,
3494            "failed" => Self::Failed,
3495            "cancelled" => Self::Cancelled,
3496            _ => Self::Unknown,
3497        }
3498    }
3499}
3500
3501/// Lightweight per-flow projection returned by
3502/// [`crate::engine_backend::EngineBackend::list_flows`].
3503///
3504/// Deliberately narrower than [`FlowSnapshot`] — listing pages serve
3505/// dashboard-style enumerations where the caller does not want to pay
3506/// for the full `flow_core` hash on every row. Consumers that need
3507/// revision / node-count / tags / cancel metadata should fan out to
3508/// [`crate::engine_backend::EngineBackend::describe_flow`] for the
3509/// specific ids they care about.
3510///
3511/// `#[non_exhaustive]` — FF may add fields in minor releases without
3512/// a semver break. Match with `..` or use [`FlowSummary::new`].
3513#[derive(Clone, Debug, PartialEq, Eq)]
3514#[non_exhaustive]
3515pub struct FlowSummary {
3516    pub flow_id: FlowId,
3517    /// Timestamp (ms since unix epoch) `flow_core.created_at` was
3518    /// stamped. Mirrors [`FlowSnapshot::created_at`]; kept typed so
3519    /// callers that want raw millis can read `.0`.
3520    pub created_at: TimestampMs,
3521    /// Typed projection of `flow_core.public_flow_state`. See
3522    /// [`FlowStatus`] for the mapping.
3523    pub status: FlowStatus,
3524}
3525
3526impl FlowSummary {
3527    /// Construct a [`FlowSummary`]. Present so downstream crates can
3528    /// assemble the struct despite the `#[non_exhaustive]` marker.
3529    pub fn new(flow_id: FlowId, created_at: TimestampMs, status: FlowStatus) -> Self {
3530        Self {
3531            flow_id,
3532            created_at,
3533            status,
3534        }
3535    }
3536}
3537
3538/// One page of [`FlowSummary`] rows returned by
3539/// [`crate::engine_backend::EngineBackend::list_flows`].
3540///
3541/// `next_cursor` is `Some(last_flow_id)` when at least one more row
3542/// may exist on the partition — callers forward it verbatim as the
3543/// next call's `cursor` argument to continue iteration. `None` means
3544/// the listing is exhausted. Cursor semantics match the Postgres
3545/// `WHERE flow_id > $cursor ORDER BY flow_id LIMIT $limit` pattern
3546/// (see the trait method's rustdoc).
3547///
3548/// `#[non_exhaustive]` — FF may add summary-level fields (total count,
3549/// partition hint) in future minor releases without a semver break.
3550#[derive(Clone, Debug, PartialEq, Eq)]
3551#[non_exhaustive]
3552pub struct ListFlowsPage {
3553    pub flows: Vec<FlowSummary>,
3554    pub next_cursor: Option<FlowId>,
3555}
3556
3557impl ListFlowsPage {
3558    /// Construct a [`ListFlowsPage`]. Present so downstream crates can
3559    /// assemble the struct despite the `#[non_exhaustive]` marker.
3560    pub fn new(flows: Vec<FlowSummary>, next_cursor: Option<FlowId>) -> Self {
3561        Self { flows, next_cursor }
3562    }
3563}
3564
3565/// Summary of state after a mutation, returned by many functions.
3566#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
3567pub struct StateSummary {
3568    pub state_vector: StateVector,
3569    pub current_attempt_index: AttemptIndex,
3570}
3571
3572// ─── RFC-025 worker registry ───
3573
3574/// Inputs to [`crate::engine_backend::EngineBackend::register_worker`]
3575/// (RFC-025). Feature gate: `core`.
3576///
3577/// `worker_instance_id` is process identity (unique per-boot);
3578/// `worker_id` is pool / logical identity (stable across restarts).
3579/// See RFC-025 §7 terminology glossary.
3580///
3581/// `liveness_ttl_ms` is stored alongside the registration so
3582/// `heartbeat_worker` refreshes to the same value without the caller
3583/// re-supplying it.
3584///
3585/// Re-registering with the same `worker_instance_id` is
3586/// **idempotent** (RFC-025 §9.3): caps/lanes/TTL overwritten,
3587/// `Refreshed` returned. Re-registering with the same
3588/// `worker_instance_id` under a DIFFERENT `worker_id` is rejected
3589/// with `Validation(InvalidInput, "instance_id reassigned")`.
3590#[non_exhaustive]
3591#[derive(Clone, Debug, PartialEq, Eq)]
3592pub struct RegisterWorkerArgs {
3593    pub worker_id: WorkerId,
3594    pub worker_instance_id: WorkerInstanceId,
3595    /// Workers serve one-or-more lanes. `BTreeSet` for stable
3596    /// iteration + dedup.
3597    pub lanes: BTreeSet<LaneId>,
3598    pub capabilities: BTreeSet<String>,
3599    /// Stored for subsequent `heartbeat_worker` TTL refresh.
3600    pub liveness_ttl_ms: u64,
3601    pub namespace: Namespace,
3602    pub now: TimestampMs,
3603}
3604
3605impl RegisterWorkerArgs {
3606    pub fn new(
3607        worker_id: WorkerId,
3608        worker_instance_id: WorkerInstanceId,
3609        lanes: BTreeSet<LaneId>,
3610        capabilities: BTreeSet<String>,
3611        liveness_ttl_ms: u64,
3612        namespace: Namespace,
3613        now: TimestampMs,
3614    ) -> Self {
3615        Self {
3616            worker_id,
3617            worker_instance_id,
3618            lanes,
3619            capabilities,
3620            liveness_ttl_ms,
3621            namespace,
3622            now,
3623        }
3624    }
3625}
3626
3627#[non_exhaustive]
3628#[derive(Clone, Copy, Debug, PartialEq, Eq)]
3629pub enum RegisterWorkerOutcome {
3630    /// No prior live key for this `worker_instance_id` (fresh boot
3631    /// or post-TTL-expiry).
3632    Registered,
3633    /// Existing live key was found; TTL reset + caps/lanes
3634    /// overwritten (in-process hot-restart, RFC-025 §9.3).
3635    Refreshed,
3636}
3637
3638/// Inputs to [`crate::engine_backend::EngineBackend::heartbeat_worker`]
3639/// (RFC-025). Feature gate: `core`.
3640#[non_exhaustive]
3641#[derive(Clone, Debug, PartialEq, Eq)]
3642pub struct HeartbeatWorkerArgs {
3643    pub worker_instance_id: WorkerInstanceId,
3644    pub namespace: Namespace,
3645    pub now: TimestampMs,
3646}
3647
3648impl HeartbeatWorkerArgs {
3649    pub fn new(
3650        worker_instance_id: WorkerInstanceId,
3651        namespace: Namespace,
3652        now: TimestampMs,
3653    ) -> Self {
3654        Self {
3655            worker_instance_id,
3656            namespace,
3657            now,
3658        }
3659    }
3660}
3661
3662#[non_exhaustive]
3663#[derive(Clone, Copy, Debug, PartialEq, Eq)]
3664pub enum HeartbeatWorkerOutcome {
3665    /// TTL refreshed. `next_expiry_ms = now + stored-ttl`; callers
3666    /// schedule the next heartbeat from this value.
3667    Refreshed { next_expiry_ms: TimestampMs },
3668    /// Liveness key was absent — TTL ran out or `mark_worker_dead`
3669    /// landed earlier. Caller re-registers, not re-heartbeats.
3670    NotRegistered,
3671}
3672
3673/// Inputs to [`crate::engine_backend::EngineBackend::mark_worker_dead`]
3674/// (RFC-025). Feature gate: `core`.
3675///
3676/// `reason` is capped at 256 bytes and must not contain control
3677/// characters; oversize / invalid reject with
3678/// `Validation(InvalidInput, "reason: …")`. Mirrors
3679/// `fail_execution`'s `failure_reason` discipline.
3680#[non_exhaustive]
3681#[derive(Clone, Debug, PartialEq, Eq)]
3682pub struct MarkWorkerDeadArgs {
3683    pub worker_instance_id: WorkerInstanceId,
3684    pub namespace: Namespace,
3685    pub reason: String,
3686    pub now: TimestampMs,
3687}
3688
3689impl MarkWorkerDeadArgs {
3690    pub fn new(
3691        worker_instance_id: WorkerInstanceId,
3692        namespace: Namespace,
3693        reason: String,
3694        now: TimestampMs,
3695    ) -> Self {
3696        Self {
3697            worker_instance_id,
3698            namespace,
3699            reason,
3700            now,
3701        }
3702    }
3703}
3704
3705/// Max bytes for `MarkWorkerDeadArgs.reason`. Mirrors
3706/// `FailExecutionArgs::failure_reason` ceiling.
3707pub const MARK_WORKER_DEAD_REASON_MAX_BYTES: usize = 256;
3708
3709#[non_exhaustive]
3710#[derive(Clone, Copy, Debug, PartialEq, Eq)]
3711pub enum MarkWorkerDeadOutcome {
3712    Marked,
3713    /// Liveness key already absent — no-op, idempotent. Unified
3714    /// variant name with `HeartbeatWorkerOutcome::NotRegistered`.
3715    NotRegistered,
3716}
3717
3718/// Cursor for [`crate::engine_backend::EngineBackend::list_expired_leases`]
3719/// (RFC-025 §9-locked). Tuple (not bare `ExecutionId`) so pagination
3720/// is stable under equal-expiry: `ZRANGEBYSCORE` with `LIMIT` keyed
3721/// on score alone duplicates or skips when two leases share a
3722/// millisecond.
3723///
3724/// Backends order strictly by
3725/// `(expires_at_ms ASC, execution_id ASC)`.
3726#[non_exhaustive]
3727#[derive(Clone, Debug, PartialEq, Eq)]
3728pub struct ExpiredLeasesCursor {
3729    pub expires_at_ms: TimestampMs,
3730    pub execution_id: ExecutionId,
3731}
3732
3733impl ExpiredLeasesCursor {
3734    pub fn new(expires_at_ms: TimestampMs, execution_id: ExecutionId) -> Self {
3735        Self {
3736            expires_at_ms,
3737            execution_id,
3738        }
3739    }
3740}
3741
3742/// Default fan-out for `list_expired_leases` when
3743/// `max_partitions_per_call` is `None` (RFC-025 §9.2).
3744pub const LIST_EXPIRED_LEASES_DEFAULT_MAX_PARTITIONS: u32 = 32;
3745
3746/// Default page size when `limit` is `None`.
3747pub const LIST_EXPIRED_LEASES_DEFAULT_LIMIT: u32 = 1_000;
3748
3749/// Backend cap for `limit`.
3750pub const LIST_EXPIRED_LEASES_MAX_LIMIT: u32 = 10_000;
3751
3752/// Inputs to [`crate::engine_backend::EngineBackend::list_expired_leases`]
3753/// (RFC-025). Feature gate: `suspension`.
3754///
3755/// `namespace = None` = cross-namespace sweep for operator tooling
3756/// (auth enforced at the ff-server admin route, NOT the trait
3757/// boundary). `Some(ns)` = per-tenant scope.
3758#[non_exhaustive]
3759#[derive(Clone, Debug, PartialEq, Eq)]
3760pub struct ListExpiredLeasesArgs {
3761    /// Every returned lease has `expires_at_ms <= as_of`.
3762    pub as_of: TimestampMs,
3763    /// Exclusive pagination cursor. `None` = scan from earliest.
3764    pub after: Option<ExpiredLeasesCursor>,
3765    /// Default [`LIST_EXPIRED_LEASES_DEFAULT_LIMIT`] when `None`;
3766    /// backend cap [`LIST_EXPIRED_LEASES_MAX_LIMIT`].
3767    pub limit: Option<u32>,
3768    /// Default [`LIST_EXPIRED_LEASES_DEFAULT_MAX_PARTITIONS`] when
3769    /// `None`.
3770    pub max_partitions_per_call: Option<u32>,
3771    pub namespace: Option<Namespace>,
3772}
3773
3774impl ListExpiredLeasesArgs {
3775    pub fn new(as_of: TimestampMs) -> Self {
3776        Self {
3777            as_of,
3778            after: None,
3779            limit: None,
3780            max_partitions_per_call: None,
3781            namespace: None,
3782        }
3783    }
3784}
3785
3786#[non_exhaustive]
3787#[derive(Clone, Debug, PartialEq, Eq)]
3788pub struct ExpiredLeaseInfo {
3789    pub execution_id: ExecutionId,
3790    pub lease_id: LeaseId,
3791    pub lease_epoch: LeaseEpoch,
3792    pub worker_instance_id: WorkerInstanceId,
3793    pub expires_at_ms: TimestampMs,
3794    pub attempt_index: AttemptIndex,
3795}
3796
3797impl ExpiredLeaseInfo {
3798    pub fn new(
3799        execution_id: ExecutionId,
3800        lease_id: LeaseId,
3801        lease_epoch: LeaseEpoch,
3802        worker_instance_id: WorkerInstanceId,
3803        expires_at_ms: TimestampMs,
3804        attempt_index: AttemptIndex,
3805    ) -> Self {
3806        Self {
3807            execution_id,
3808            lease_id,
3809            lease_epoch,
3810            worker_instance_id,
3811            expires_at_ms,
3812            attempt_index,
3813        }
3814    }
3815}
3816
3817#[non_exhaustive]
3818#[derive(Clone, Debug, PartialEq, Eq)]
3819pub struct ListExpiredLeasesResult {
3820    pub entries: Vec<ExpiredLeaseInfo>,
3821    pub cursor: Option<ExpiredLeasesCursor>,
3822}
3823
3824impl ListExpiredLeasesResult {
3825    pub fn new(entries: Vec<ExpiredLeaseInfo>, cursor: Option<ExpiredLeasesCursor>) -> Self {
3826        Self { entries, cursor }
3827    }
3828}
3829
3830/// Inputs to [`crate::engine_backend::EngineBackend::list_workers`]
3831/// (RFC-025 Phase 6, §9.4).
3832#[non_exhaustive]
3833#[derive(Clone, Debug, PartialEq, Eq)]
3834pub struct ListWorkersArgs {
3835    /// `None` = cross-namespace sweep for operator tooling.
3836    pub namespace: Option<Namespace>,
3837    /// Exclusive pagination cursor.
3838    pub after: Option<WorkerInstanceId>,
3839    /// Default 1000 when `None`.
3840    pub limit: Option<u32>,
3841}
3842
3843impl ListWorkersArgs {
3844    pub fn new() -> Self {
3845        Self {
3846            namespace: None,
3847            after: None,
3848            limit: None,
3849        }
3850    }
3851}
3852
3853impl Default for ListWorkersArgs {
3854    fn default() -> Self {
3855        Self::new()
3856    }
3857}
3858
3859#[non_exhaustive]
3860#[derive(Clone, Debug, PartialEq, Eq)]
3861pub struct WorkerInfo {
3862    pub worker_id: WorkerId,
3863    pub worker_instance_id: WorkerInstanceId,
3864    pub namespace: Namespace,
3865    pub lanes: BTreeSet<LaneId>,
3866    pub capabilities: BTreeSet<String>,
3867    pub last_heartbeat_ms: TimestampMs,
3868    pub liveness_ttl_ms: u64,
3869    pub registered_at_ms: TimestampMs,
3870}
3871
3872impl WorkerInfo {
3873    #[allow(clippy::too_many_arguments)]
3874    pub fn new(
3875        worker_id: WorkerId,
3876        worker_instance_id: WorkerInstanceId,
3877        namespace: Namespace,
3878        lanes: BTreeSet<LaneId>,
3879        capabilities: BTreeSet<String>,
3880        last_heartbeat_ms: TimestampMs,
3881        liveness_ttl_ms: u64,
3882        registered_at_ms: TimestampMs,
3883    ) -> Self {
3884        Self {
3885            worker_id,
3886            worker_instance_id,
3887            namespace,
3888            lanes,
3889            capabilities,
3890            last_heartbeat_ms,
3891            liveness_ttl_ms,
3892            registered_at_ms,
3893        }
3894    }
3895}
3896
3897#[non_exhaustive]
3898#[derive(Clone, Debug, PartialEq, Eq)]
3899pub struct ListWorkersResult {
3900    pub entries: Vec<WorkerInfo>,
3901    pub cursor: Option<WorkerInstanceId>,
3902}
3903
3904impl ListWorkersResult {
3905    pub fn new(entries: Vec<WorkerInfo>, cursor: Option<WorkerInstanceId>) -> Self {
3906        Self { entries, cursor }
3907    }
3908}
3909
3910#[cfg(test)]
3911mod tests {
3912    use super::*;
3913    use crate::types::FlowId;
3914
3915    #[test]
3916    fn create_execution_args_serde() {
3917        let config = crate::partition::PartitionConfig::default();
3918        let args = CreateExecutionArgs {
3919            execution_id: ExecutionId::for_flow(&FlowId::new(), &config),
3920            namespace: Namespace::new("test"),
3921            lane_id: LaneId::new("default"),
3922            execution_kind: "llm_call".to_owned(),
3923            input_payload: b"hello".to_vec(),
3924            payload_encoding: Some("json".to_owned()),
3925            priority: 0,
3926            creator_identity: "test-user".to_owned(),
3927            idempotency_key: None,
3928            tags: HashMap::new(),
3929            policy: None,
3930            delay_until: None,
3931            execution_deadline_at: None,
3932            partition_id: 42,
3933            now: TimestampMs::now(),
3934        };
3935        let json = serde_json::to_string(&args).unwrap();
3936        let parsed: CreateExecutionArgs = serde_json::from_str(&json).unwrap();
3937        assert_eq!(args.execution_id, parsed.execution_id);
3938    }
3939
3940    #[test]
3941    fn claim_result_serde() {
3942        let config = crate::partition::PartitionConfig::default();
3943        let result = ClaimExecutionResult::Claimed(ClaimedExecution {
3944            execution_id: ExecutionId::for_flow(&FlowId::new(), &config),
3945            lease_id: LeaseId::new(),
3946            lease_epoch: LeaseEpoch::new(1),
3947            attempt_index: AttemptIndex::new(0),
3948            attempt_id: AttemptId::new(),
3949            attempt_type: AttemptType::Initial,
3950            lease_expires_at: TimestampMs::from_millis(1000),
3951            handle: crate::backend::stub_handle_fresh(),
3952        });
3953        let json = serde_json::to_string(&result).unwrap();
3954        let parsed: ClaimExecutionResult = serde_json::from_str(&json).unwrap();
3955        assert_eq!(result, parsed);
3956    }
3957
3958    // ── StreamCursor (issue #92) ──
3959
3960    #[test]
3961    fn stream_cursor_display_matches_wire_tokens() {
3962        assert_eq!(StreamCursor::Start.to_string(), "start");
3963        assert_eq!(StreamCursor::End.to_string(), "end");
3964        assert_eq!(StreamCursor::At("123".into()).to_string(), "123");
3965        assert_eq!(StreamCursor::At("123-4".into()).to_string(), "123-4");
3966    }
3967
3968    #[test]
3969    fn stream_cursor_to_wire_maps_to_valkey_markers() {
3970        assert_eq!(StreamCursor::Start.to_wire(), "-");
3971        assert_eq!(StreamCursor::End.to_wire(), "+");
3972        assert_eq!(StreamCursor::At("0-0".into()).to_wire(), "0-0");
3973        assert_eq!(StreamCursor::At("17-3".into()).to_wire(), "17-3");
3974    }
3975
3976    #[test]
3977    fn stream_cursor_from_str_accepts_wire_tokens() {
3978        use std::str::FromStr;
3979        assert_eq!(
3980            StreamCursor::from_str("start").unwrap(),
3981            StreamCursor::Start
3982        );
3983        assert_eq!(StreamCursor::from_str("end").unwrap(), StreamCursor::End);
3984        assert_eq!(
3985            StreamCursor::from_str("123").unwrap(),
3986            StreamCursor::At("123".into())
3987        );
3988        assert_eq!(
3989            StreamCursor::from_str("0-0").unwrap(),
3990            StreamCursor::At("0-0".into())
3991        );
3992        assert_eq!(
3993            StreamCursor::from_str("1713100800150-0").unwrap(),
3994            StreamCursor::At("1713100800150-0".into())
3995        );
3996    }
3997
3998    #[test]
3999    fn stream_cursor_from_str_rejects_bare_markers() {
4000        use std::str::FromStr;
4001        assert!(matches!(
4002            StreamCursor::from_str("-"),
4003            Err(StreamCursorParseError::BareMarkerRejected(s)) if s == "-"
4004        ));
4005        assert!(matches!(
4006            StreamCursor::from_str("+"),
4007            Err(StreamCursorParseError::BareMarkerRejected(s)) if s == "+"
4008        ));
4009    }
4010
4011    #[test]
4012    fn stream_cursor_from_str_rejects_empty() {
4013        use std::str::FromStr;
4014        assert_eq!(
4015            StreamCursor::from_str(""),
4016            Err(StreamCursorParseError::Empty)
4017        );
4018    }
4019
4020    #[test]
4021    fn stream_cursor_from_str_rejects_malformed() {
4022        use std::str::FromStr;
4023        for bad in [
4024            "abc", "-1", "1-", "-1-2", "1-2-3", "1.2", "1 2", "Start", "END",
4025        ] {
4026            assert!(
4027                matches!(
4028                    StreamCursor::from_str(bad),
4029                    Err(StreamCursorParseError::Malformed(_))
4030                ),
4031                "must reject {bad:?}",
4032            );
4033        }
4034    }
4035
4036    #[test]
4037    fn stream_cursor_from_str_rejects_non_ascii() {
4038        use std::str::FromStr;
4039        assert!(matches!(
4040            StreamCursor::from_str("1\u{2013}2"),
4041            Err(StreamCursorParseError::Malformed(_))
4042        ));
4043    }
4044
4045    #[test]
4046    fn stream_cursor_serde_round_trip() {
4047        for c in [
4048            StreamCursor::Start,
4049            StreamCursor::End,
4050            StreamCursor::At("0-0".into()),
4051            StreamCursor::At("1713100800150-0".into()),
4052        ] {
4053            let json = serde_json::to_string(&c).unwrap();
4054            let back: StreamCursor = serde_json::from_str(&json).unwrap();
4055            assert_eq!(back, c);
4056        }
4057    }
4058
4059    #[test]
4060    fn stream_cursor_serializes_as_bare_string() {
4061        assert_eq!(
4062            serde_json::to_string(&StreamCursor::Start).unwrap(),
4063            r#""start""#
4064        );
4065        assert_eq!(
4066            serde_json::to_string(&StreamCursor::End).unwrap(),
4067            r#""end""#
4068        );
4069        assert_eq!(
4070            serde_json::to_string(&StreamCursor::At("123-0".into())).unwrap(),
4071            r#""123-0""#
4072        );
4073    }
4074
4075    #[test]
4076    fn stream_cursor_deserialize_rejects_bare_markers() {
4077        assert!(serde_json::from_str::<StreamCursor>(r#""-""#).is_err());
4078        assert!(serde_json::from_str::<StreamCursor>(r#""+""#).is_err());
4079    }
4080
4081    #[test]
4082    fn stream_cursor_from_beginning_is_zero_zero() {
4083        assert_eq!(
4084            StreamCursor::from_beginning(),
4085            StreamCursor::At("0-0".into())
4086        );
4087    }
4088
4089    #[test]
4090    fn stream_cursor_is_concrete_classifies_variants() {
4091        assert!(!StreamCursor::Start.is_concrete());
4092        assert!(!StreamCursor::End.is_concrete());
4093        assert!(StreamCursor::At("0-0".into()).is_concrete());
4094        assert!(StreamCursor::At("123-0".into()).is_concrete());
4095        assert!(StreamCursor::from_beginning().is_concrete());
4096    }
4097
4098    #[test]
4099    fn stream_cursor_into_wire_string_moves_without_cloning() {
4100        assert_eq!(StreamCursor::Start.into_wire_string(), "-");
4101        assert_eq!(StreamCursor::End.into_wire_string(), "+");
4102        assert_eq!(StreamCursor::At("17-3".into()).into_wire_string(), "17-3");
4103    }
4104}
4105
4106// ─── list_executions ───
4107
4108/// Summary of an execution for list views.
4109#[derive(Clone, Debug, Serialize, Deserialize)]
4110pub struct ExecutionSummary {
4111    pub execution_id: ExecutionId,
4112    pub namespace: String,
4113    pub lane_id: String,
4114    pub execution_kind: String,
4115    pub public_state: String,
4116    pub priority: i32,
4117    pub created_at: String,
4118}
4119
4120/// Result of a list_executions query.
4121#[derive(Clone, Debug, Serialize, Deserialize)]
4122pub struct ListExecutionsResult {
4123    pub executions: Vec<ExecutionSummary>,
4124    pub total_returned: usize,
4125}
4126
4127// ─── list_lanes (issue #184) ───
4128
4129/// One page of lane ids returned by
4130/// [`crate::engine_backend::EngineBackend::list_lanes`].
4131///
4132/// Lanes are global (not partition-scoped) — the backend enumerates
4133/// every registered lane, sorts by [`LaneId`] name, and returns a
4134/// `limit`-sized slice starting after `cursor` (exclusive).
4135///
4136/// `next_cursor` is `Some(last_lane_in_page)` when more pages remain
4137/// and `None` when the caller has read the final page. Callers that
4138/// want the full list loop until `next_cursor` is `None`, threading
4139/// the previous page's `next_cursor` into the next call's `cursor`
4140/// argument.
4141///
4142/// `#[non_exhaustive]` — FF may add fields (e.g. a `total` hint) in
4143/// minor releases without a semver break.
4144#[derive(Clone, Debug, PartialEq, Eq)]
4145#[non_exhaustive]
4146pub struct ListLanesPage {
4147    /// The lanes in this page, sorted by [`LaneId`] name.
4148    pub lanes: Vec<LaneId>,
4149    /// Cursor for the next page (exclusive). `None` ⇒ final page.
4150    pub next_cursor: Option<LaneId>,
4151}
4152
4153impl ListLanesPage {
4154    /// Construct a [`ListLanesPage`]. Present so downstream crates
4155    /// (ff-backend-valkey's `list_lanes` impl) can assemble the
4156    /// struct despite the `#[non_exhaustive]` marker.
4157    pub fn new(lanes: Vec<LaneId>, next_cursor: Option<LaneId>) -> Self {
4158        Self { lanes, next_cursor }
4159    }
4160}
4161
4162// ─── list_suspended ───
4163
4164/// One entry in a [`ListSuspendedPage`] — a suspended execution and
4165/// the reason it is blocked, answering an operator's "what's this
4166/// waiting on?" without a follow-up round-trip.
4167///
4168/// `reason` carries the free-form `reason_code` recorded by the
4169/// suspending worker at `lua/suspension.lua` (HSET `suspension:current
4170/// reason_code`). It is a `String`, not a closed enum: the suspension
4171/// pipeline accepts arbitrary caller-supplied codes (typical values
4172/// are `"signal"`, `"timer"`, `"children"`, `"join"`, but consumers
4173/// embed bespoke codes). A future enum projection can classify
4174/// known codes once the set is frozen; until then, callers that want
4175/// structured routing MUST match on the string explicitly.
4176#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
4177#[non_exhaustive]
4178pub struct SuspendedExecutionEntry {
4179    /// Execution currently in `lifecycle_phase=suspended`.
4180    pub execution_id: ExecutionId,
4181    /// Score stored on the per-lane suspended ZSET — the scheduled
4182    /// `timeout_at` in milliseconds, or the `9999999999999` sentinel
4183    /// when no timeout was set (see `lua/suspension.lua`).
4184    pub suspended_at_ms: i64,
4185    /// Free-form reason code from `suspension:current.reason_code`.
4186    /// Empty string when the suspension hash is absent or does not
4187    /// carry a `reason_code` field (older records). See the struct
4188    /// rustdoc for the deliberate-String rationale.
4189    pub reason: String,
4190}
4191
4192impl SuspendedExecutionEntry {
4193    /// Construct a new entry. Preferred over direct field init for
4194    /// `#[non_exhaustive]` forward-compat.
4195    pub fn new(execution_id: ExecutionId, suspended_at_ms: i64, reason: String) -> Self {
4196        Self {
4197            execution_id,
4198            suspended_at_ms,
4199            reason,
4200        }
4201    }
4202}
4203
4204/// One cursor-paginated page of suspended executions.
4205///
4206/// Pagination is cursor-based (not offset/limit) so a Valkey backend
4207/// can resume a partition scan from the last seen execution id and a
4208/// future Postgres backend can do keyset pagination on
4209/// `executions WHERE state='suspended'`. The cursor is opaque to
4210/// callers: pass `next_cursor` back as the `cursor` argument to the
4211/// next [`EngineBackend::list_suspended`] call to fetch the next
4212/// page. `None` means the stream is exhausted.
4213///
4214/// [`EngineBackend::list_suspended`]: crate::engine_backend::EngineBackend::list_suspended
4215#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
4216#[non_exhaustive]
4217pub struct ListSuspendedPage {
4218    /// Entries on this page, ordered by ascending `suspended_at_ms`
4219    /// (timeout order) with `execution_id` as a lex tiebreak.
4220    pub entries: Vec<SuspendedExecutionEntry>,
4221    /// Resume-point for the next page. `None` when no further
4222    /// entries remain in the partition.
4223    pub next_cursor: Option<ExecutionId>,
4224}
4225
4226impl ListSuspendedPage {
4227    /// Construct a new page. Preferred over direct field init for
4228    /// `#[non_exhaustive]` forward-compat.
4229    pub fn new(entries: Vec<SuspendedExecutionEntry>, next_cursor: Option<ExecutionId>) -> Self {
4230        Self {
4231            entries,
4232            next_cursor,
4233        }
4234    }
4235}
4236
4237// ─── list_executions ───
4238
4239/// One page of partition-scoped execution ids returned by
4240/// [`EngineBackend::list_executions`](crate::engine_backend::EngineBackend::list_executions).
4241///
4242/// Pagination is forward-only and cursor-based. `next_cursor` carries
4243/// the last `ExecutionId` emitted in `executions` iff another page is
4244/// available; callers pass that id back as the next call's `cursor`
4245/// (exclusive start). `next_cursor = None` signals end-of-stream.
4246///
4247/// `#[non_exhaustive]` — FF may add fields (e.g. `approximate_total`)
4248/// in minor releases without a semver break. Use
4249/// [`ListExecutionsPage::new`] for cross-crate construction.
4250#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
4251#[non_exhaustive]
4252pub struct ListExecutionsPage {
4253    /// Execution ids on this page, in ascending lexicographic order.
4254    pub executions: Vec<ExecutionId>,
4255    /// Exclusive cursor to request the next page. `None` ⇒ no more
4256    /// results.
4257    pub next_cursor: Option<ExecutionId>,
4258}
4259
4260impl ListExecutionsPage {
4261    /// Construct a [`ListExecutionsPage`]. Present so downstream
4262    /// crates can assemble the struct despite the `#[non_exhaustive]`
4263    /// marker.
4264    pub fn new(executions: Vec<ExecutionId>, next_cursor: Option<ExecutionId>) -> Self {
4265        Self { executions, next_cursor }
4266    }
4267}
4268
4269// ─── rotate_waitpoint_hmac_secret ───
4270
4271/// Args for `ff_rotate_waitpoint_hmac_secret`. Rotates the HMAC signing
4272/// kid on ONE partition. Callers fan out across every partition themselves
4273/// (ff-server does the parallel fan-out in `rotate_waitpoint_secret`;
4274/// direct-Valkey consumers mirror the pattern).
4275///
4276/// "now" is derived server-side from `redis.call("TIME")` inside the FCALL
4277/// (consistency with `validate_waitpoint_token` and flow scanners).
4278/// `grace_ms` is a duration, not a clock value, so carrying it from the
4279/// caller is safe.
4280#[derive(Clone, Debug)]
4281pub struct RotateWaitpointHmacSecretArgs {
4282    pub new_kid: String,
4283    pub new_secret_hex: String,
4284    /// Grace window in ms. Must be non-negative. Tokens signed by the
4285    /// outgoing kid remain valid for `grace_ms` after this rotation.
4286    pub grace_ms: u64,
4287}
4288
4289/// Outcome of a single-partition rotation.
4290#[derive(Clone, Debug, PartialEq, Eq)]
4291pub enum RotateWaitpointHmacSecretOutcome {
4292    /// Installed the new kid. `previous_kid` is `None` on bootstrap
4293    /// (no prior `current_kid`). `gc_count` counts expired kids reaped
4294    /// during this rotation.
4295    Rotated {
4296        previous_kid: Option<String>,
4297        new_kid: String,
4298        gc_count: u32,
4299    },
4300    /// Exact replay — same kid + same secret already installed. Safe
4301    /// operator retry; no state change.
4302    Noop { kid: String },
4303}
4304
4305// ─── rotate_waitpoint_hmac_secret_all ───
4306
4307/// Args for [`EngineBackend::rotate_waitpoint_hmac_secret_all`] — the
4308/// cluster-wide / backend-native rotation of the waitpoint HMAC
4309/// signing kid.
4310///
4311/// **v0.7 migration-master Q4:** a single additive trait method
4312/// replaces the per-partition fan-out that direct-Valkey consumers
4313/// hand-rolled via
4314/// [`ff_sdk::admin::rotate_waitpoint_hmac_secret_all_partitions`].
4315/// On Valkey it fans out N FCALLs (one per execution partition);
4316/// on Postgres (Wave 4) it resolves to a single INSERT against the
4317/// global `ff_waitpoint_hmac(kid, secret, rotated_at)` table (no
4318/// partition_id column). Consumers prefer this method for clarity —
4319/// the pre-existing free-fn + per-partition surface stays available
4320/// for backwards compat.
4321///
4322/// `#[non_exhaustive]` with a [`Self::new`] constructor per the
4323/// project memory-rule (unbuildable non_exhaustive types are a dead
4324/// API).
4325#[derive(Clone, Debug)]
4326#[non_exhaustive]
4327pub struct RotateWaitpointHmacSecretAllArgs {
4328    pub new_kid: String,
4329    pub new_secret_hex: String,
4330    /// Grace window in ms for tokens signed by the outgoing kid.
4331    /// Duration (not a clock value), identical to
4332    /// [`RotateWaitpointHmacSecretArgs::grace_ms`].
4333    pub grace_ms: u64,
4334}
4335
4336impl RotateWaitpointHmacSecretAllArgs {
4337    /// Build the args. Keeping the constructor so consumers don't
4338    /// struct-literal past the `#[non_exhaustive]` marker.
4339    pub fn new(
4340        new_kid: impl Into<String>,
4341        new_secret_hex: impl Into<String>,
4342        grace_ms: u64,
4343    ) -> Self {
4344        Self {
4345            new_kid: new_kid.into(),
4346            new_secret_hex: new_secret_hex.into(),
4347            grace_ms,
4348        }
4349    }
4350}
4351
4352/// Per-partition entry of [`RotateWaitpointHmacSecretAllResult`].
4353/// Mirrors [`ff_sdk::admin::PartitionRotationOutcome`] but typed at
4354/// the `ff-core` layer so both Valkey and Postgres backends return
4355/// the same shape without a Postgres→ferriskey dep.
4356///
4357/// On backends with no partition concept (Postgres) the entry list
4358/// has length 1 with `partition = 0` and the outcome of the global
4359/// row write.
4360#[derive(Debug)]
4361#[non_exhaustive]
4362pub struct RotateWaitpointHmacSecretAllEntry {
4363    pub partition: u16,
4364    /// The per-partition (or global) rotation outcome. Per-partition
4365    /// failures are surfaced as inner `Err` so the fan-out can report
4366    /// partial success — matching the existing SDK free-fn contract.
4367    pub result: Result<RotateWaitpointHmacSecretOutcome, crate::engine_error::EngineError>,
4368}
4369
4370impl RotateWaitpointHmacSecretAllEntry {
4371    pub fn new(
4372        partition: u16,
4373        result: Result<RotateWaitpointHmacSecretOutcome, crate::engine_error::EngineError>,
4374    ) -> Self {
4375        Self { partition, result }
4376    }
4377}
4378
4379/// Result of [`EngineBackend::rotate_waitpoint_hmac_secret_all`].
4380///
4381/// The Valkey backend returns one entry per execution partition. The
4382/// Postgres backend (Wave 4) will return a single-entry vec with
4383/// `partition = 0` since the Postgres schema stores one global row
4384/// per kid (Q4 §adjudication). Consumers that want a uniform "did
4385/// ALL rotations succeed?" view inspect each entry's `.result`.
4386#[derive(Debug)]
4387#[non_exhaustive]
4388pub struct RotateWaitpointHmacSecretAllResult {
4389    pub entries: Vec<RotateWaitpointHmacSecretAllEntry>,
4390}
4391
4392impl RotateWaitpointHmacSecretAllResult {
4393    pub fn new(entries: Vec<RotateWaitpointHmacSecretAllEntry>) -> Self {
4394        Self { entries }
4395    }
4396}
4397
4398// ─── seed_waitpoint_hmac_secret (issue #280) ───
4399
4400/// Args for [`EngineBackend::seed_waitpoint_hmac_secret`].
4401///
4402/// Two required fields and no optional knobs, so there is no fluent
4403/// builder — just `new(kid, secret_hex)`. `#[non_exhaustive]` is kept
4404/// (with the paired constructor, per the project memory rule) so
4405/// future additive knobs don't break callers.
4406///
4407/// Boot-time provisioning entry point for fresh deployments — see
4408/// issue #280 for why cairn needed this in addition to
4409/// [`RotateWaitpointHmacSecretAllArgs`]. Unlike rotate, seed is
4410/// idempotent: callers invoke it on every boot and the backend
4411/// decides whether to install.
4412#[derive(Clone, Debug)]
4413#[non_exhaustive]
4414pub struct SeedWaitpointHmacSecretArgs {
4415    pub kid: String,
4416    pub secret_hex: String,
4417}
4418
4419impl SeedWaitpointHmacSecretArgs {
4420    pub fn new(kid: impl Into<String>, secret_hex: impl Into<String>) -> Self {
4421        Self {
4422            kid: kid.into(),
4423            secret_hex: secret_hex.into(),
4424        }
4425    }
4426}
4427
4428/// Result of [`EngineBackend::seed_waitpoint_hmac_secret`].
4429///
4430/// * `Seeded` — the backend had no `current_kid` (or no row in the
4431///   global keystore) and installed `kid` as the active signing kid.
4432/// * `AlreadySeeded` — a row for `kid` is already installed.
4433///   `same_secret` reports whether the stored secret bytes match the
4434///   caller-supplied hex; `false` means the caller should pick a fresh
4435///   kid for rotation rather than silently re-installing under the
4436///   existing kid.
4437#[derive(Clone, Debug, PartialEq, Eq)]
4438#[non_exhaustive]
4439pub enum SeedOutcome {
4440    Seeded { kid: String },
4441    AlreadySeeded { kid: String, same_secret: bool },
4442}
4443
4444// ─── list_waitpoint_hmac_kids ───
4445
4446#[derive(Clone, Debug, PartialEq, Eq)]
4447pub struct ListWaitpointHmacKidsArgs {}
4448
4449/// Snapshot of the waitpoint HMAC keystore on ONE partition.
4450#[derive(Clone, Debug, PartialEq, Eq)]
4451pub struct WaitpointHmacKids {
4452    /// The currently-signing kid. `None` if uninitialized.
4453    pub current_kid: Option<String>,
4454    /// Kids that still validate existing tokens but no longer sign
4455    /// new ones. Order is Lua HGETALL traversal order — callers that
4456    /// need a stable sort should sort by `expires_at_ms`.
4457    pub verifying: Vec<VerifyingKid>,
4458}
4459
4460#[derive(Clone, Debug, PartialEq, Eq)]
4461pub struct VerifyingKid {
4462    pub kid: String,
4463    pub expires_at_ms: i64,
4464}
4465
4466// ═══════════════════════════════════════════════════════════════════════
4467// RFC-013 Stage 1d: EngineBackend::suspend typed args + outcome
4468// ═══════════════════════════════════════════════════════════════════════
4469//
4470// `SuspendExecutionArgs` / `SuspendExecutionResult` above remain the
4471// wire-level Lua-ARGV mirror used by the backend serializer. The types
4472// below are the public trait-surface shapes RFC-013 §2.2–§2.6 specifies.
4473//
4474// Every type in this block is `#[non_exhaustive]` per the RFC §2.2.1
4475// memory-rule compliance note; each gets a constructor so external-crate
4476// consumers can build them without struct-literal access.
4477
4478use crate::backend::WaitpointHmac;
4479
4480/// Partition-scoped idempotency key for retry-safe `EngineBackend::suspend`.
4481///
4482/// See RFC-013 §2.2 — when set on [`SuspendArgs::idempotency_key`], the
4483/// backend dedups the call on `(partition, execution_id, idempotency_key)`
4484/// and a second `suspend` with the same triple returns the first call's
4485/// [`SuspendOutcome`] verbatim. Absent a key, `suspend` is NOT retry-
4486/// idempotent; callers must describe-and-reconcile per §3.1.
4487///
4488/// Follows the `UsageDimensions::dedup_key` pattern — opaque to the
4489/// engine, byte-compared at the partition scope.
4490#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
4491#[serde(transparent)]
4492pub struct IdempotencyKey(String);
4493
4494impl IdempotencyKey {
4495    /// Construct from any stringy input. Empty strings are accepted;
4496    /// the backend treats an empty key as "no dedup" at the serialize
4497    /// step so `Some(IdempotencyKey::new(""))` is functionally the same
4498    /// as `None`.
4499    pub fn new(key: impl Into<String>) -> Self {
4500        Self(key.into())
4501    }
4502
4503    /// Borrow the underlying string.
4504    pub fn as_str(&self) -> &str {
4505        &self.0
4506    }
4507}
4508
4509impl std::fmt::Display for IdempotencyKey {
4510    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
4511        f.write_str(&self.0)
4512    }
4513}
4514
4515/// v1 signal-match predicate inside [`ResumeCondition::Single`].
4516///
4517/// RFC-013 §2.4 — `ByName(String)` matches a single concrete signal
4518/// name; `Wildcard` matches any delivered signal. RFC-014 may extend
4519/// (payload predicates, pattern matching) — `#[non_exhaustive]`.
4520#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
4521#[non_exhaustive]
4522pub enum SignalMatcher {
4523    /// Match by exact signal name.
4524    ByName(String),
4525    /// Match any signal delivered to the waitpoint.
4526    Wildcard,
4527}
4528
4529/// Hard cap on composite-condition nesting depth (RFC-014 §5.4
4530/// invariant 4; §5.5 cap rationale). Soft-cap: bumping requires only
4531/// this constant + the cap-rationale paragraph in RFC-014 §5.5 — no
4532/// wire-format change. Keep in sync.
4533pub const MAX_COMPOSITE_DEPTH: usize = 4;
4534
4535/// RFC-013 reserves this enum slot; RFC-014 populates it with the
4536/// concrete composition vocabulary (`AllOf` + `Count`). The enum is
4537/// `#[non_exhaustive]` so RFC-016 or later RFCs may add variants
4538/// (`AnyOf` has been explicitly rejected per RFC-014 §2.3 in favour of
4539/// `Count { n: 1, .. }`; the guard exists for orthogonal future work).
4540#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
4541#[serde(tag = "kind")]
4542#[non_exhaustive]
4543pub enum CompositeBody {
4544    /// All listed sub-conditions must be satisfied. Order-independent.
4545    /// Once satisfied, further signals to member waitpoints are observed
4546    /// but do not re-open satisfaction. RFC-014 §2.1.
4547    AllOf {
4548        members: Vec<ResumeCondition>,
4549    },
4550    /// At least `n` distinct satisfiers (by [`CountKind`]) must match.
4551    /// `matcher` optionally constrains participating signals; `None`
4552    /// lets any signal on any of `waitpoints` count. RFC-014 §2.1.
4553    Count {
4554        n: u32,
4555        count_kind: CountKind,
4556        #[serde(default, skip_serializing_if = "Option::is_none")]
4557        matcher: Option<SignalMatcher>,
4558        waitpoints: Vec<String>,
4559    },
4560}
4561
4562/// How `Count` nodes distinguish satisfiers. RFC-014 §2.1 + §3.2.
4563#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
4564#[non_exhaustive]
4565pub enum CountKind {
4566    /// n distinct `waitpoint_id`s in `waitpoints` must fire.
4567    DistinctWaitpoints,
4568    /// n distinct `signal_id`s across the waitpoint set.
4569    DistinctSignals,
4570    /// n distinct `source_type:source_identity` tuples.
4571    DistinctSources,
4572}
4573
4574impl CompositeBody {
4575    /// `AllOf { members }` constructor (RFC-014 §10.3 SDK surface).
4576    pub fn all_of(members: impl IntoIterator<Item = ResumeCondition>) -> Self {
4577        Self::AllOf {
4578            members: members.into_iter().collect(),
4579        }
4580    }
4581
4582    /// `Count` constructor with explicit kind + waitpoint set.
4583    pub fn count(
4584        n: u32,
4585        count_kind: CountKind,
4586        matcher: Option<SignalMatcher>,
4587        waitpoints: impl IntoIterator<Item = String>,
4588    ) -> Self {
4589        Self::Count {
4590            n,
4591            count_kind,
4592            matcher,
4593            waitpoints: waitpoints.into_iter().collect(),
4594        }
4595    }
4596}
4597
4598/// Declarative resume condition for [`SuspendArgs::resume_condition`].
4599///
4600/// RFC-013 §2.4 — typed replacement for the SDK's former
4601/// `ConditionMatcher` / `resume_condition_json` pair.
4602#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
4603#[non_exhaustive]
4604pub enum ResumeCondition {
4605    /// Single waitpoint-key match with a predicate. `matcher` is
4606    /// evaluated against every signal delivered to `waitpoint_key`.
4607    Single {
4608        waitpoint_key: String,
4609        matcher: SignalMatcher,
4610    },
4611    /// Operator-only resume — no signal satisfies; only an explicit
4612    /// operator resume closes the waitpoint.
4613    OperatorOnly,
4614    /// Pure-timeout suspension. No signal satisfier; the waitpoint
4615    /// resolves only via `timeout_behavior` at `timeout_at`. Requires
4616    /// `SuspendArgs::timeout_at` to be `Some(_)` — otherwise the
4617    /// Rust-side validator rejects as `timeout_only_without_deadline`.
4618    TimeoutOnly,
4619    /// Multi-condition composition; RFC-014 defines the body.
4620    Composite(CompositeBody),
4621}
4622
4623/// RFC-014 §5.1 validation error shape. Emitted by
4624/// [`ResumeCondition::validate_composite`] when a composite fails a
4625/// structural / cardinality invariant at suspend-time, before any Valkey
4626/// call. Carries a human-readable `detail` per §5.1.1.
4627#[derive(Clone, Debug, PartialEq, Eq)]
4628pub struct CompositeValidationError {
4629    pub detail: String,
4630}
4631
4632impl CompositeValidationError {
4633    fn new(detail: impl Into<String>) -> Self {
4634        Self {
4635            detail: detail.into(),
4636        }
4637    }
4638}
4639
4640impl ResumeCondition {
4641    /// RFC-014 §10.3 builder — `AllOf` across N distinct waitpoints,
4642    /// each member a `Single { matcher: Wildcard }` leaf. Canonical
4643    /// Pattern 3 shape for heterogeneous-subsystem "all fired"
4644    /// semantics (e.g. `db-migration-complete` + `cache-warmed` +
4645    /// `feature-flag-set`).
4646    ///
4647    /// Callers that need per-waitpoint matchers should construct the
4648    /// tree directly via
4649    /// [`ResumeCondition::Composite(CompositeBody::all_of(..))`].
4650    pub fn all_of_waitpoints<I, S>(waitpoint_keys: I) -> Self
4651    where
4652        I: IntoIterator<Item = S>,
4653        S: Into<String>,
4654    {
4655        let members: Vec<ResumeCondition> = waitpoint_keys
4656            .into_iter()
4657            .map(|k| ResumeCondition::Single {
4658                waitpoint_key: k.into(),
4659                matcher: SignalMatcher::Wildcard,
4660            })
4661            .collect();
4662        ResumeCondition::Composite(CompositeBody::AllOf { members })
4663    }
4664
4665    /// Collect every distinct `waitpoint_key` the condition targets.
4666    /// Used at suspend-time to validate the condition's wp set against
4667    /// `SuspendArgs.waitpoints` (RFC-014 §5.1 multi-binding cross-
4668    /// check). Order follows tree DFS, de-duplicated preserving first
4669    /// occurrence.
4670    pub fn referenced_waitpoint_keys(&self) -> Vec<String> {
4671        let mut out: Vec<String> = Vec::new();
4672        let mut push = |k: &str| {
4673            if !out.iter().any(|e| e == k) {
4674                out.push(k.to_owned());
4675            }
4676        };
4677        fn walk(cond: &ResumeCondition, push: &mut dyn FnMut(&str)) {
4678            match cond {
4679                ResumeCondition::Single { waitpoint_key, .. } => push(waitpoint_key),
4680                ResumeCondition::Composite(body) => walk_body(body, push),
4681                _ => {}
4682            }
4683        }
4684        fn walk_body(body: &CompositeBody, push: &mut dyn FnMut(&str)) {
4685            match body {
4686                CompositeBody::AllOf { members } => {
4687                    for m in members {
4688                        walk(m, push);
4689                    }
4690                }
4691                CompositeBody::Count { waitpoints, .. } => {
4692                    for w in waitpoints {
4693                        push(w.as_str());
4694                    }
4695                }
4696            }
4697        }
4698        walk(self, &mut push);
4699        out
4700    }
4701
4702    /// Validate RFC-014 structural invariants on a composite condition.
4703    /// Single / OperatorOnly / TimeoutOnly return Ok — they carry no
4704    /// composite body. Checks cover:
4705    /// * `AllOf { members: [] }` — §5.1 `allof_empty_members`
4706    /// * `Count { n: 0 }` — §5.1 `count_n_zero`
4707    /// * `Count { waitpoints: [] }` — §5.1 `count_waitpoints_empty`
4708    /// * `Count { n > waitpoints.len(), DistinctWaitpoints }` — §5.1
4709    ///   `count_exceeds_waitpoint_set`
4710    /// * depth > [`MAX_COMPOSITE_DEPTH`] — §5.1 `condition_depth_exceeded`
4711    pub fn validate_composite(&self) -> Result<(), CompositeValidationError> {
4712        match self {
4713            ResumeCondition::Composite(body) => validate_body(body, 1, ""),
4714            _ => Ok(()),
4715        }
4716    }
4717}
4718
4719fn validate_body(
4720    body: &CompositeBody,
4721    depth: usize,
4722    path: &str,
4723) -> Result<(), CompositeValidationError> {
4724    if depth > MAX_COMPOSITE_DEPTH {
4725        return Err(CompositeValidationError::new(format!(
4726            "depth {} exceeds cap {} at path {}",
4727            depth,
4728            MAX_COMPOSITE_DEPTH,
4729            if path.is_empty() { "<root>" } else { path }
4730        )));
4731    }
4732    match body {
4733        CompositeBody::AllOf { members } => {
4734            if members.is_empty() {
4735                return Err(CompositeValidationError::new(format!(
4736                    "allof_empty_members at path {}",
4737                    if path.is_empty() { "<root>" } else { path }
4738                )));
4739            }
4740            for (i, m) in members.iter().enumerate() {
4741                let child_path = if path.is_empty() {
4742                    format!("members[{i}]")
4743                } else {
4744                    format!("{path}.members[{i}]")
4745                };
4746                if let ResumeCondition::Composite(inner) = m {
4747                    validate_body(inner, depth + 1, &child_path)?;
4748                }
4749                // Leaf `Single` / operator / timeout needs no further
4750                // structural checks — RFC-013 already constrains them.
4751            }
4752            Ok(())
4753        }
4754        CompositeBody::Count {
4755            n,
4756            count_kind,
4757            waitpoints,
4758            ..
4759        } => {
4760            if *n == 0 {
4761                return Err(CompositeValidationError::new(format!(
4762                    "count_n_zero at path {}",
4763                    if path.is_empty() { "<root>" } else { path }
4764                )));
4765            }
4766            if waitpoints.is_empty() {
4767                return Err(CompositeValidationError::new(format!(
4768                    "count_waitpoints_empty at path {}",
4769                    if path.is_empty() { "<root>" } else { path }
4770                )));
4771            }
4772            if matches!(count_kind, CountKind::DistinctWaitpoints)
4773                && (*n as usize) > waitpoints.len()
4774            {
4775                return Err(CompositeValidationError::new(format!(
4776                    "count_exceeds_waitpoint_set: n={} > waitpoints.len()={} at path {}",
4777                    n,
4778                    waitpoints.len(),
4779                    if path.is_empty() { "<root>" } else { path }
4780                )));
4781            }
4782            Ok(())
4783        }
4784    }
4785}
4786
4787/// Where a satisfied suspension routes back to.
4788///
4789/// v1 ships only [`ResumeTarget::Runnable`] — execution returns to
4790/// `runnable` and goes through normal scheduling.
4791#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
4792#[non_exhaustive]
4793pub enum ResumeTarget {
4794    Runnable,
4795}
4796
4797/// Resume-side policy carried alongside [`ResumeCondition`].
4798///
4799/// RFC-013 §2.5 — what happens when the condition is satisfied. Fields
4800/// mirror the `resume_policy_json` the backend serializer writes to Lua
4801/// (RFC-004 §Resume policy fields).
4802#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
4803#[non_exhaustive]
4804pub struct ResumePolicy {
4805    pub resume_target: ResumeTarget,
4806    pub consume_matched_signals: bool,
4807    pub retain_signal_buffer_until_closed: bool,
4808    #[serde(default, skip_serializing_if = "Option::is_none")]
4809    pub resume_delay_ms: Option<u64>,
4810    pub close_waitpoint_on_resume: bool,
4811}
4812
4813impl Default for ResumePolicy {
4814    fn default() -> Self {
4815        Self::normal()
4816    }
4817}
4818
4819impl ResumePolicy {
4820    /// Construct a [`ResumePolicy`] with the canonical v1 defaults
4821    /// (see [`Self::normal`]). Alias for [`Self::normal`] — provided
4822    /// so external consumers have a conventional `new` constructor
4823    /// against this `#[non_exhaustive]` struct.
4824    pub fn new() -> Self {
4825        Self::normal()
4826    }
4827
4828    /// Canonical v1 defaults (RFC-013 §2.2.1):
4829    /// * `resume_target = Runnable`
4830    /// * `consume_matched_signals = true`
4831    /// * `retain_signal_buffer_until_closed = false`
4832    /// * `resume_delay_ms = None`
4833    /// * `close_waitpoint_on_resume = true`
4834    pub fn normal() -> Self {
4835        Self {
4836            resume_target: ResumeTarget::Runnable,
4837            consume_matched_signals: true,
4838            retain_signal_buffer_until_closed: false,
4839            resume_delay_ms: None,
4840            close_waitpoint_on_resume: true,
4841        }
4842    }
4843}
4844
4845/// Timeout behavior at the suspension deadline (RFC-004 §Timeout Behavior).
4846#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
4847#[non_exhaustive]
4848pub enum TimeoutBehavior {
4849    Fail,
4850    Cancel,
4851    Expire,
4852    AutoResumeWithTimeoutSignal,
4853    /// v2 per RFC-004 Implementation Notes; enum slot present for
4854    /// additive RFC-014/RFC-015 landing.
4855    Escalate,
4856}
4857
4858impl TimeoutBehavior {
4859    /// Lua-side string encoding. Matches the wire values Lua's
4860    /// `ff_expire_suspension` matches on.
4861    pub fn as_wire_str(self) -> &'static str {
4862        match self {
4863            Self::Fail => "fail",
4864            Self::Cancel => "cancel",
4865            Self::Expire => "expire",
4866            Self::AutoResumeWithTimeoutSignal => "auto_resume_with_timeout_signal",
4867            Self::Escalate => "escalate",
4868        }
4869    }
4870}
4871
4872/// Reason category for a suspension (RFC-004 §Suspension Reason Categories).
4873#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
4874#[non_exhaustive]
4875pub enum SuspensionReasonCode {
4876    WaitingForSignal,
4877    WaitingForApproval,
4878    WaitingForCallback,
4879    WaitingForToolResult,
4880    WaitingForOperatorReview,
4881    PausedByPolicy,
4882    PausedByBudget,
4883    StepBoundary,
4884    ManualPause,
4885}
4886
4887impl SuspensionReasonCode {
4888    pub fn as_wire_str(self) -> &'static str {
4889        match self {
4890            Self::WaitingForSignal => "waiting_for_signal",
4891            Self::WaitingForApproval => "waiting_for_approval",
4892            Self::WaitingForCallback => "waiting_for_callback",
4893            Self::WaitingForToolResult => "waiting_for_tool_result",
4894            Self::WaitingForOperatorReview => "waiting_for_operator_review",
4895            Self::PausedByPolicy => "paused_by_policy",
4896            Self::PausedByBudget => "paused_by_budget",
4897            Self::StepBoundary => "step_boundary",
4898            Self::ManualPause => "manual_pause",
4899        }
4900    }
4901}
4902
4903/// Who requested the suspension.
4904#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
4905#[non_exhaustive]
4906pub enum SuspensionRequester {
4907    Worker,
4908    Operator,
4909    Policy,
4910    SystemTimeoutPolicy,
4911}
4912
4913impl SuspensionRequester {
4914    pub fn as_wire_str(self) -> &'static str {
4915        match self {
4916            Self::Worker => "worker",
4917            Self::Operator => "operator",
4918            Self::Policy => "policy",
4919            Self::SystemTimeoutPolicy => "system_timeout_policy",
4920        }
4921    }
4922}
4923
4924/// How the waitpoint resource backing a [`SuspendArgs`] is obtained.
4925///
4926/// RFC-013 §2.2 — `Fresh` mints a new waitpoint as part of `suspend`;
4927/// `UsePending` activates a waitpoint previously issued via
4928/// `EngineBackend::create_waitpoint`.
4929#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
4930#[non_exhaustive]
4931pub enum WaitpointBinding {
4932    Fresh {
4933        waitpoint_id: WaitpointId,
4934        waitpoint_key: String,
4935    },
4936    UsePending {
4937        waitpoint_id: WaitpointId,
4938    },
4939}
4940
4941impl WaitpointBinding {
4942    /// Mint a fresh binding with a random `waitpoint_id` (UUID v4) and
4943    /// `waitpoint_key = "wpk:<uuid>"`.
4944    pub fn fresh() -> Self {
4945        let wp_id = WaitpointId::new();
4946        let key = format!("wpk:{wp_id}");
4947        Self::Fresh {
4948            waitpoint_id: wp_id,
4949            waitpoint_key: key,
4950        }
4951    }
4952
4953    /// Construct a `UsePending` binding from a pending waitpoint
4954    /// previously issued by `create_waitpoint`. The HMAC token is
4955    /// resolved Lua-side from the partition's waitpoint hash at
4956    /// `suspend` time (RFC-013 §5.1).
4957    pub fn use_pending(pending: &crate::backend::PendingWaitpoint) -> Self {
4958        Self::UsePending {
4959            waitpoint_id: pending.waitpoint_id.clone(),
4960        }
4961    }
4962}
4963
4964/// Trait-surface input to [`EngineBackend::suspend`] (RFC-013 §2.2 +
4965/// RFC-014 Pattern 3 widening).
4966///
4967/// Built via [`SuspendArgs::new`] + `with_*` setters; direct struct-
4968/// literal construction across crate boundaries is not possible
4969/// (`#[non_exhaustive]`).
4970///
4971/// ## Waitpoints
4972///
4973/// `waitpoints` is a non-empty `Vec<WaitpointBinding>`. The first entry
4974/// is the "primary" binding (accessible via [`primary`](Self::primary))
4975/// and carries the `current_waitpoint_id` written onto `exec_core` for
4976/// operator visibility. Additional entries land in Valkey as their own
4977/// waitpoint hashes / signal streams / HMAC tokens, enabling RFC-014
4978/// Pattern 3 `AllOf { members: [Single{wp1}, Single{wp2}, ...] }` across
4979/// distinct heterogeneous subsystems.
4980///
4981/// [`SuspendArgs::new`] takes exactly the primary binding; call
4982/// [`with_waitpoint`](Self::with_waitpoint) to append further bindings
4983/// (the RFC-014 builder API).
4984#[derive(Clone, Debug, Serialize, Deserialize)]
4985#[non_exhaustive]
4986pub struct SuspendArgs {
4987    pub suspension_id: SuspensionId,
4988    /// RFC-014 Pattern 3: all waitpoint bindings for this suspension.
4989    /// Guaranteed non-empty; `waitpoints[0]` is the primary.
4990    pub waitpoints: Vec<WaitpointBinding>,
4991    pub resume_condition: ResumeCondition,
4992    pub resume_policy: ResumePolicy,
4993    pub reason_code: SuspensionReasonCode,
4994    pub requested_by: SuspensionRequester,
4995    #[serde(default, skip_serializing_if = "Option::is_none")]
4996    pub timeout_at: Option<TimestampMs>,
4997    pub timeout_behavior: TimeoutBehavior,
4998    #[serde(default, skip_serializing_if = "Option::is_none")]
4999    pub continuation_metadata_pointer: Option<String>,
5000    pub now: TimestampMs,
5001    #[serde(default, skip_serializing_if = "Option::is_none")]
5002    pub idempotency_key: Option<IdempotencyKey>,
5003}
5004
5005impl SuspendArgs {
5006    /// Build a minimal `SuspendArgs` for a worker-originated suspension.
5007    ///
5008    /// `waitpoint` becomes the primary binding. Append additional
5009    /// bindings with [`with_waitpoint`](Self::with_waitpoint) (RFC-014
5010    /// Pattern 3) or replace the set with
5011    /// [`with_waitpoints`](Self::with_waitpoints).
5012    ///
5013    /// Defaults: `requested_by = Worker`, `timeout_at = None`,
5014    /// `timeout_behavior = Fail`, `continuation_metadata_pointer = None`,
5015    /// `idempotency_key = None`.
5016    pub fn new(
5017        suspension_id: SuspensionId,
5018        waitpoint: WaitpointBinding,
5019        resume_condition: ResumeCondition,
5020        resume_policy: ResumePolicy,
5021        reason_code: SuspensionReasonCode,
5022        now: TimestampMs,
5023    ) -> Self {
5024        Self {
5025            suspension_id,
5026            waitpoints: vec![waitpoint],
5027            resume_condition,
5028            resume_policy,
5029            reason_code,
5030            requested_by: SuspensionRequester::Worker,
5031            timeout_at: None,
5032            timeout_behavior: TimeoutBehavior::Fail,
5033            continuation_metadata_pointer: None,
5034            now,
5035            idempotency_key: None,
5036        }
5037    }
5038
5039    /// Primary binding — `waitpoints[0]`. Guaranteed present by
5040    /// construction.
5041    pub fn primary(&self) -> &WaitpointBinding {
5042        &self.waitpoints[0]
5043    }
5044
5045    pub fn with_timeout(mut self, at: TimestampMs, behavior: TimeoutBehavior) -> Self {
5046        self.timeout_at = Some(at);
5047        self.timeout_behavior = behavior;
5048        self
5049    }
5050
5051    pub fn with_requester(mut self, requester: SuspensionRequester) -> Self {
5052        self.requested_by = requester;
5053        self
5054    }
5055
5056    pub fn with_continuation_metadata_pointer(mut self, p: String) -> Self {
5057        self.continuation_metadata_pointer = Some(p);
5058        self
5059    }
5060
5061    pub fn with_idempotency_key(mut self, key: IdempotencyKey) -> Self {
5062        self.idempotency_key = Some(key);
5063        self
5064    }
5065
5066    /// RFC-014 Pattern 3 — append a further waitpoint binding to this
5067    /// suspension. Each additional binding yields its own waitpoint
5068    /// hash, signal stream, condition hash and HMAC token in Valkey,
5069    /// but all share the suspension record and composite evaluator
5070    /// under one `suspension:current`.
5071    ///
5072    /// Ordering: the primary (from [`SuspendArgs::new`]) stays at
5073    /// `waitpoints[0]`; subsequent `with_waitpoint` calls append at the
5074    /// tail.
5075    pub fn with_waitpoint(mut self, binding: WaitpointBinding) -> Self {
5076        self.waitpoints.push(binding);
5077        self
5078    }
5079
5080    /// RFC-014 Pattern 3 — replace the full binding vector in one call.
5081    /// Must be non-empty; an empty Vec is a programmer error and will
5082    /// be rejected by the backend's `validate_suspend_args` with
5083    /// `waitpoints_empty`.
5084    pub fn with_waitpoints(mut self, bindings: Vec<WaitpointBinding>) -> Self {
5085        self.waitpoints = bindings;
5086        self
5087    }
5088}
5089
5090/// Shared "what happened on the waitpoint" payload carried in both
5091/// [`SuspendOutcome`] variants.
5092///
5093/// For Pattern 3 (RFC-014) — multi-waitpoint suspensions — the primary
5094/// binding's identity lives at the top level (`waitpoint_id` /
5095/// `waitpoint_key` / `waitpoint_token`) and remaining bindings are
5096/// exposed via `additional_waitpoints`, each carrying its own minted
5097/// HMAC token so external signallers can deliver to any of the N
5098/// waitpoints the suspension is listening on.
5099#[derive(Clone, Debug, PartialEq, Eq)]
5100#[non_exhaustive]
5101pub struct SuspendOutcomeDetails {
5102    pub suspension_id: SuspensionId,
5103    pub waitpoint_id: WaitpointId,
5104    pub waitpoint_key: String,
5105    pub waitpoint_token: WaitpointHmac,
5106    /// RFC-014 Pattern 3 extras (beyond the primary). Empty for
5107    /// single-waitpoint suspensions (patterns 1 + 2); carries one
5108    /// entry per additional binding for Pattern 3.
5109    pub additional_waitpoints: Vec<AdditionalWaitpointBinding>,
5110}
5111
5112/// RFC-014 Pattern 3 — per-binding identity + HMAC token for
5113/// waitpoints beyond the primary. Structure mirrors the top-level
5114/// fields on [`SuspendOutcomeDetails`].
5115#[derive(Clone, Debug, PartialEq, Eq)]
5116#[non_exhaustive]
5117pub struct AdditionalWaitpointBinding {
5118    pub waitpoint_id: WaitpointId,
5119    pub waitpoint_key: String,
5120    pub waitpoint_token: WaitpointHmac,
5121}
5122
5123impl AdditionalWaitpointBinding {
5124    pub fn new(
5125        waitpoint_id: WaitpointId,
5126        waitpoint_key: String,
5127        waitpoint_token: WaitpointHmac,
5128    ) -> Self {
5129        Self {
5130            waitpoint_id,
5131            waitpoint_key,
5132            waitpoint_token,
5133        }
5134    }
5135}
5136
5137impl SuspendOutcomeDetails {
5138    pub fn new(
5139        suspension_id: SuspensionId,
5140        waitpoint_id: WaitpointId,
5141        waitpoint_key: String,
5142        waitpoint_token: WaitpointHmac,
5143    ) -> Self {
5144        Self {
5145            suspension_id,
5146            waitpoint_id,
5147            waitpoint_key,
5148            waitpoint_token,
5149            additional_waitpoints: Vec::new(),
5150        }
5151    }
5152
5153    /// Attach RFC-014 Pattern 3 additional-waitpoint bindings. The
5154    /// primary binding stays at the top-level fields; `extras` lands
5155    /// in [`additional_waitpoints`](Self::additional_waitpoints).
5156    pub fn with_additional_waitpoints(
5157        mut self,
5158        extras: Vec<AdditionalWaitpointBinding>,
5159    ) -> Self {
5160        self.additional_waitpoints = extras;
5161        self
5162    }
5163}
5164
5165/// Trait-surface output from [`EngineBackend::suspend`] (RFC-013 §2.3).
5166///
5167/// Two variants encode the "lease released" vs "lease retained" split.
5168/// See §2.3 for the runtime-enforcement semantics.
5169#[derive(Clone, Debug, PartialEq, Eq)]
5170#[non_exhaustive]
5171pub enum SuspendOutcome {
5172    /// The worker's pre-suspend handle is no longer lease-bearing; a
5173    /// fresh `HandleKind::Suspended` handle supersedes it.
5174    Suspended {
5175        details: SuspendOutcomeDetails,
5176        handle: crate::backend::Handle,
5177    },
5178    /// Buffered signals on a pending waitpoint already satisfied the
5179    /// condition at suspension time; the lease is retained and the
5180    /// caller's pre-suspend handle remains valid.
5181    AlreadySatisfied { details: SuspendOutcomeDetails },
5182}
5183
5184impl SuspendOutcome {
5185    /// Borrow the shared details regardless of variant.
5186    pub fn details(&self) -> &SuspendOutcomeDetails {
5187        match self {
5188            Self::Suspended { details, .. } => details,
5189            Self::AlreadySatisfied { details } => details,
5190        }
5191    }
5192}
5193
5194// `EngineBackend::suspend` type re-exports for `ff_core::backend::*`
5195// consumers. The `backend` module re-exports these below so external
5196// crates can reach them via the idiomatic `ff_core::backend` path that
5197// already sources the other trait-surface types (RFC-013 §9.1).
5198
5199// ─── RFC-017 Stage A — trait-expansion Args/Result types ─────────────
5200//
5201// Per RFC-017 §5.1.1: every struct/enum introduced here is
5202// `#[non_exhaustive]` and ships with a `pub fn new(...)` constructor so
5203// additive field growth post-v0.8 does not force cross-crate churn.
5204
5205// ─── claim_for_worker ───
5206
5207/// Inputs to `EngineBackend::claim_for_worker` (RFC-017 §5, §7). The
5208/// Valkey impl forwards to `ff_scheduler::Scheduler::claim_for_worker`;
5209/// the Postgres impl forwards to its own scheduler module. The trait
5210/// method hides the backend-specific dispatch behind one shape.
5211#[non_exhaustive]
5212#[derive(Clone, Debug)]
5213pub struct ClaimForWorkerArgs {
5214    pub lane_id: LaneId,
5215    pub worker_id: WorkerId,
5216    pub worker_instance_id: WorkerInstanceId,
5217    pub worker_capabilities: std::collections::BTreeSet<String>,
5218    pub grant_ttl_ms: u64,
5219}
5220
5221impl ClaimForWorkerArgs {
5222    /// Required-field constructor. Optional fields today: none — kept
5223    /// for forward-compat so a future optional (e.g. `deadline_ms`)
5224    /// does not break callers using the builder pattern.
5225    pub fn new(
5226        lane_id: LaneId,
5227        worker_id: WorkerId,
5228        worker_instance_id: WorkerInstanceId,
5229        worker_capabilities: std::collections::BTreeSet<String>,
5230        grant_ttl_ms: u64,
5231    ) -> Self {
5232        Self {
5233            lane_id,
5234            worker_id,
5235            worker_instance_id,
5236            worker_capabilities,
5237            grant_ttl_ms,
5238        }
5239    }
5240}
5241
5242/// Outcome of `EngineBackend::claim_for_worker`. `None`-like shape
5243/// modelled as an enum so additive variants (e.g. `BackPressured {
5244/// retry_after_ms }`) do not force a wire break.
5245#[non_exhaustive]
5246#[derive(Clone, Debug, PartialEq, Eq)]
5247pub enum ClaimForWorkerOutcome {
5248    /// No eligible execution on this lane at this scan cycle.
5249    NoWork,
5250    /// Grant issued — worker proceeds to `claim_from_grant`.
5251    Granted(ClaimGrant),
5252}
5253
5254impl ClaimForWorkerOutcome {
5255    /// Build the `NoWork` variant.
5256    pub fn no_work() -> Self {
5257        Self::NoWork
5258    }
5259    /// Build the `Granted` variant.
5260    pub fn granted(grant: ClaimGrant) -> Self {
5261        Self::Granted(grant)
5262    }
5263}
5264
5265// ─── list_pending_waitpoints ───
5266
5267/// Inputs to `EngineBackend::list_pending_waitpoints` (RFC-017 §5, §8).
5268/// Pagination is part of the signature so a flow with 10k pending
5269/// waitpoints cannot force a single-round-trip read regardless of
5270/// backend.
5271#[non_exhaustive]
5272#[derive(Clone, Debug)]
5273pub struct ListPendingWaitpointsArgs {
5274    pub execution_id: ExecutionId,
5275    /// Exclusive cursor — `None` starts from the beginning.
5276    pub after: Option<WaitpointId>,
5277    /// Max page size. `None` → backend default (100). Backend-enforced
5278    /// cap: 1000.
5279    pub limit: Option<u32>,
5280}
5281
5282impl ListPendingWaitpointsArgs {
5283    pub fn new(execution_id: ExecutionId) -> Self {
5284        Self {
5285            execution_id,
5286            after: None,
5287            limit: None,
5288        }
5289    }
5290    pub fn with_after(mut self, after: WaitpointId) -> Self {
5291        self.after = Some(after);
5292        self
5293    }
5294    pub fn with_limit(mut self, limit: u32) -> Self {
5295        self.limit = Some(limit);
5296        self
5297    }
5298}
5299
5300/// Page of pending-waitpoint entries. Stage A preserves the existing
5301/// `PendingWaitpointInfo` shape; the §8 schema rewrite (HMAC
5302/// sanitisation + `(token_kid, token_fingerprint)` additive fields)
5303/// ships in Stage D alongside the HTTP wire-format deprecation.
5304#[non_exhaustive]
5305#[derive(Clone, Debug)]
5306pub struct ListPendingWaitpointsResult {
5307    pub entries: Vec<PendingWaitpointInfo>,
5308    /// Forward-only continuation cursor — `None` signals end-of-stream.
5309    pub next_cursor: Option<WaitpointId>,
5310}
5311
5312impl ListPendingWaitpointsResult {
5313    pub fn new(entries: Vec<PendingWaitpointInfo>) -> Self {
5314        Self {
5315            entries,
5316            next_cursor: None,
5317        }
5318    }
5319    pub fn with_next_cursor(mut self, cursor: WaitpointId) -> Self {
5320        self.next_cursor = Some(cursor);
5321        self
5322    }
5323}
5324
5325// ─── report_usage_admin ───
5326
5327/// Inputs to `EngineBackend::report_usage_admin` (RFC-017 §5 budget+
5328/// quota admin §5, round-1 F4). Admin-path peer of `report_usage` —
5329/// both wrap `ff_report_usage_and_check` on the Valkey side but the
5330/// admin call is worker-less, so it cannot reuse the lease-bound
5331/// `report_usage(&Handle, ...)` signature. `ReportUsageAdminArgs`
5332/// carries the same fields as [`ReportUsageArgs`] without a worker
5333/// handle — kept as a distinct type so future admin-only fields (e.g.
5334/// `actor_identity`, `audit_reason`) don't pollute the worker path.
5335#[non_exhaustive]
5336#[derive(Clone, Debug)]
5337pub struct ReportUsageAdminArgs {
5338    pub dimensions: Vec<String>,
5339    pub deltas: Vec<u64>,
5340    pub dedup_key: Option<String>,
5341    pub now: TimestampMs,
5342}
5343
5344impl ReportUsageAdminArgs {
5345    pub fn new(dimensions: Vec<String>, deltas: Vec<u64>, now: TimestampMs) -> Self {
5346        Self {
5347            dimensions,
5348            deltas,
5349            dedup_key: None,
5350            now,
5351        }
5352    }
5353    pub fn with_dedup_key(mut self, key: String) -> Self {
5354        self.dedup_key = Some(key);
5355        self
5356    }
5357}
5358
5359// ─── #454 — cairn ControlPlaneBackend peer methods ──────────────────
5360//
5361// Four trait methods from cairn-rs #454 that previously routed through
5362// raw `ferriskey::*` FCALLs outside the `EngineBackend` trait. Cairn's
5363// ground-truth shapes at `cairn-fabric/src/engine/control_plane_types.rs`
5364// (commit `a4fdb638`) are mirrored below verbatim so the v0.13 trait
5365// matches cairn's existing Valkey impls 1:1.
5366//
5367// Default bodies on the trait return `EngineError::Unavailable { op }`
5368// at landing; Valkey bodies ship in Phase 3; PG + SQLite in Phases 4+5.
5369
5370// ─── record_spend ───
5371
5372/// Args for [`crate::engine_backend::EngineBackend::record_spend`].
5373///
5374/// Carries an **open-set** `BTreeMap<String, u64>` of dimension deltas
5375/// per cairn's ground-truth shape at
5376/// `cairn-fabric/src/engine/control_plane_types.rs`. Cairn budgets are
5377/// per-tenant open-schema (tenant A tracks `"tokens"` + `"cost_cents"`,
5378/// tenant B tracks `"egress_bytes"`), distinct from FF's fixed-shape
5379/// [`UsageDimensions`] which encodes the internal usage-report surface.
5380///
5381/// `BTreeMap` (not `HashMap`) gives stable iteration order — consistent
5382/// with `UsageDimensions::custom`, and critical for the PG body which
5383/// updates multiple dimension rows per call (deterministic ordering
5384/// prevents deadlocks under concurrent spend).
5385///
5386/// Return shape reuses [`ReportUsageResult`] — same four variants
5387/// (`Ok` / `SoftBreach` / `HardBreach` / `AlreadyApplied`) cairn's UI
5388/// branches on. Not a new enum.
5389#[derive(Clone, Debug, Serialize, Deserialize)]
5390#[non_exhaustive]
5391pub struct RecordSpendArgs {
5392    pub budget_id: BudgetId,
5393    pub execution_id: ExecutionId,
5394    /// Per-dimension positive deltas. Tenant-defined keys; stable
5395    /// iteration order.
5396    pub deltas: BTreeMap<String, u64>,
5397    /// Caller-computed idempotency key (cairn uses SHA-256 hex of
5398    /// `budget_id || execution_id || sorted(deltas)`). FF does not
5399    /// interpret the bytes — dedup is a simple equality check against
5400    /// the prior stamped key.
5401    pub idempotency_key: String,
5402}
5403
5404impl RecordSpendArgs {
5405    pub fn new(
5406        budget_id: BudgetId,
5407        execution_id: ExecutionId,
5408        deltas: BTreeMap<String, u64>,
5409        idempotency_key: impl Into<String>,
5410    ) -> Self {
5411        Self {
5412            budget_id,
5413            execution_id,
5414            deltas,
5415            idempotency_key: idempotency_key.into(),
5416        }
5417    }
5418}
5419
5420// ─── release_budget ───
5421
5422/// Args for [`crate::engine_backend::EngineBackend::release_budget`].
5423///
5424/// **Per-execution release-my-attribution**, not whole-budget flush.
5425/// Called when an execution terminates so the budget persists across
5426/// executions but this execution's attribution is reversed. Per cairn
5427/// clarification on #454.
5428#[derive(Clone, Debug, Serialize, Deserialize)]
5429#[non_exhaustive]
5430pub struct ReleaseBudgetArgs {
5431    pub budget_id: BudgetId,
5432    pub execution_id: ExecutionId,
5433}
5434
5435impl ReleaseBudgetArgs {
5436    pub fn new(budget_id: BudgetId, execution_id: ExecutionId) -> Self {
5437        Self {
5438            budget_id,
5439            execution_id,
5440        }
5441    }
5442}
5443
5444// ─── deliver_approval_signal ───
5445
5446/// Args for [`crate::engine_backend::EngineBackend::deliver_approval_signal`].
5447///
5448/// Pre-shaped variant of [`crate::engine_backend::EngineBackend::deliver_signal`]
5449/// for the operator-driven approval flow. Distinct from `deliver_signal`
5450/// because the caller **does not carry the waitpoint token** — the backend
5451/// reads the token from `ff_waitpoint_pending` (via
5452/// [`crate::engine_backend::EngineBackend::read_waitpoint_token`],
5453/// #434-shipped in v0.12), HMAC-verifies server-side, and dispatches. The
5454/// operator API never handles the token bytes.
5455///
5456/// `signal_name` is a flat string (`"approved"` / `"rejected"` by
5457/// convention; not an enum at the trait level — audit metadata like
5458/// `decided_by` / `note` / `reason` sits in cairn's audit log, not in
5459/// the FF signal surface).
5460#[derive(Clone, Debug, Serialize, Deserialize)]
5461#[non_exhaustive]
5462pub struct DeliverApprovalSignalArgs {
5463    pub execution_id: ExecutionId,
5464    pub lane_id: LaneId,
5465    pub waitpoint_id: WaitpointId,
5466    /// Conventional values: `"approved"` / `"rejected"`. Stored raw on
5467    /// the delivered signal; FF does not interpret.
5468    pub signal_name: String,
5469    /// Cairn-side per-decision idempotency suffix. Combined with
5470    /// `execution_id` + `signal_name` to form the dedup key.
5471    pub idempotency_suffix: String,
5472    /// Dedup TTL in milliseconds.
5473    pub signal_dedup_ttl_ms: u64,
5474    /// Signal stream MAXLEN for the suspension stream.
5475    /// `None` ⇒ backend default (matches [`DeliverSignalArgs::signal_maxlen`]).
5476    #[serde(default)]
5477    pub maxlen: Option<u64>,
5478    /// Per-execution max signal cap (operator quota).
5479    /// `None` ⇒ backend default (matches [`DeliverSignalArgs::max_signals_per_execution`]).
5480    #[serde(default)]
5481    pub max_signals_per_execution: Option<u64>,
5482}
5483
5484impl DeliverApprovalSignalArgs {
5485    #[allow(clippy::too_many_arguments)]
5486    pub fn new(
5487        execution_id: ExecutionId,
5488        lane_id: LaneId,
5489        waitpoint_id: WaitpointId,
5490        signal_name: impl Into<String>,
5491        idempotency_suffix: impl Into<String>,
5492        signal_dedup_ttl_ms: u64,
5493        maxlen: Option<u64>,
5494        max_signals_per_execution: Option<u64>,
5495    ) -> Self {
5496        Self {
5497            execution_id,
5498            lane_id,
5499            waitpoint_id,
5500            signal_name: signal_name.into(),
5501            idempotency_suffix: idempotency_suffix.into(),
5502            signal_dedup_ttl_ms,
5503            maxlen,
5504            max_signals_per_execution,
5505        }
5506    }
5507}
5508
5509// ─── issue_grant_and_claim ───
5510
5511/// Args for [`crate::engine_backend::EngineBackend::issue_grant_and_claim`].
5512///
5513/// Composes `issue_claim_grant` + `claim_execution` into a single
5514/// backend-atomic op per cairn #454 Q4. The composition **must** be
5515/// backend-atomic (not caller-chained) to prevent leaking grants when
5516/// `claim_execution` fails after `issue_claim_grant` succeeded.
5517///
5518/// Valkey: one `ff_issue_grant_and_claim` FCALL composing the two
5519/// primitives in Lua.
5520/// Postgres/SQLite: both primitives inside one tx.
5521///
5522/// Flattened shape (not `IssueClaimGrantArgs + ClaimExecutionArgs`
5523/// composition) — the two arg types overlap on `execution_id` +
5524/// `lane_id`; flattening drops the dup.
5525#[derive(Clone, Debug, Serialize, Deserialize)]
5526#[non_exhaustive]
5527pub struct IssueGrantAndClaimArgs {
5528    pub execution_id: ExecutionId,
5529    pub lane_id: LaneId,
5530    /// Lease TTL in milliseconds. Threaded into both the grant TTL and
5531    /// the claimed attempt's `lease_expires_at_ms`.
5532    pub lease_duration_ms: u64,
5533}
5534
5535impl IssueGrantAndClaimArgs {
5536    pub fn new(execution_id: ExecutionId, lane_id: LaneId, lease_duration_ms: u64) -> Self {
5537        Self {
5538            execution_id,
5539            lane_id,
5540            lease_duration_ms,
5541        }
5542    }
5543}
5544
5545/// Outcome of [`crate::engine_backend::EngineBackend::issue_grant_and_claim`].
5546///
5547/// Distinct from [`ClaimExecutionResult`] because the trait method
5548/// intentionally hides the grant-issuance step — callers only see the
5549/// resulting lease identity. If the backend's transparent dispatch
5550/// routes through `ff_claim_resumed_execution` (when the execution was
5551/// suspended), the return is identical.
5552#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
5553#[non_exhaustive]
5554pub struct ClaimGrantOutcome {
5555    pub lease_id: LeaseId,
5556    pub lease_epoch: LeaseEpoch,
5557    pub attempt_index: AttemptIndex,
5558}
5559
5560impl ClaimGrantOutcome {
5561    pub fn new(
5562        lease_id: LeaseId,
5563        lease_epoch: LeaseEpoch,
5564        attempt_index: AttemptIndex,
5565    ) -> Self {
5566        Self {
5567            lease_id,
5568            lease_epoch,
5569            attempt_index,
5570        }
5571    }
5572}
5573
5574#[cfg(test)]
5575mod rfc_014_validation_tests {
5576    use super::*;
5577
5578    fn single(wp: &str) -> ResumeCondition {
5579        ResumeCondition::Single {
5580            waitpoint_key: wp.to_owned(),
5581            matcher: SignalMatcher::ByName("x".to_owned()),
5582        }
5583    }
5584
5585    #[test]
5586    fn single_passes_validate() {
5587        assert!(single("wpk:a").validate_composite().is_ok());
5588    }
5589
5590    #[test]
5591    fn allof_empty_members_rejected() {
5592        let c = ResumeCondition::Composite(CompositeBody::AllOf { members: vec![] });
5593        let e = c.validate_composite().unwrap_err();
5594        assert!(e.detail.contains("allof_empty_members"), "{}", e.detail);
5595    }
5596
5597    #[test]
5598    fn count_n_zero_rejected() {
5599        let c = ResumeCondition::Composite(CompositeBody::Count {
5600            n: 0,
5601            count_kind: CountKind::DistinctWaitpoints,
5602            matcher: None,
5603            waitpoints: vec!["wpk:a".to_owned()],
5604        });
5605        let e = c.validate_composite().unwrap_err();
5606        assert!(e.detail.contains("count_n_zero"), "{}", e.detail);
5607    }
5608
5609    #[test]
5610    fn count_waitpoints_empty_rejected() {
5611        let c = ResumeCondition::Composite(CompositeBody::Count {
5612            n: 1,
5613            count_kind: CountKind::DistinctSources,
5614            matcher: None,
5615            waitpoints: vec![],
5616        });
5617        let e = c.validate_composite().unwrap_err();
5618        assert!(e.detail.contains("count_waitpoints_empty"), "{}", e.detail);
5619    }
5620
5621    #[test]
5622    fn count_exceeds_waitpoint_set_rejected_only_for_distinct_waitpoints() {
5623        // n=3, only 2 waitpoints, DistinctWaitpoints → reject.
5624        let c = ResumeCondition::Composite(CompositeBody::Count {
5625            n: 3,
5626            count_kind: CountKind::DistinctWaitpoints,
5627            matcher: None,
5628            waitpoints: vec!["a".into(), "b".into()],
5629        });
5630        let e = c.validate_composite().unwrap_err();
5631        assert!(e.detail.contains("count_exceeds_waitpoint_set"), "{}", e.detail);
5632
5633        // Same cardinality, DistinctSignals → allowed (no upper bound).
5634        let c2 = ResumeCondition::Composite(CompositeBody::Count {
5635            n: 3,
5636            count_kind: CountKind::DistinctSignals,
5637            matcher: None,
5638            waitpoints: vec!["a".into(), "b".into()],
5639        });
5640        assert!(c2.validate_composite().is_ok());
5641    }
5642
5643    #[test]
5644    fn depth_4_accepted_depth_5_rejected() {
5645        // Build Depth-4: AllOf { AllOf { AllOf { AllOf { Single } } } }
5646        let leaf = single("wpk:leaf");
5647        let d4 = ResumeCondition::Composite(CompositeBody::AllOf {
5648            members: vec![ResumeCondition::Composite(CompositeBody::AllOf {
5649                members: vec![ResumeCondition::Composite(CompositeBody::AllOf {
5650                    members: vec![ResumeCondition::Composite(CompositeBody::AllOf {
5651                        members: vec![leaf.clone()],
5652                    })],
5653                })],
5654            })],
5655        });
5656        assert!(d4.validate_composite().is_ok());
5657
5658        // Depth-5 → reject.
5659        let d5 = ResumeCondition::Composite(CompositeBody::AllOf {
5660            members: vec![d4],
5661        });
5662        let e = d5.validate_composite().unwrap_err();
5663        assert!(e.detail.contains("exceeds cap"), "{}", e.detail);
5664    }
5665}