Skip to main content

ff_core/contracts/
mod.rs

1//! Phase 1 function contracts — Args + Result types for each FCALL.
2//!
3//! Each Args struct defines the typed inputs to a Valkey Function.
4//! Each Result enum defines the possible outcomes (success variants + error codes).
5
6pub mod decode;
7
8use crate::policy::ExecutionPolicy;
9use crate::state::{AttemptType, PublicState, StateVector};
10use crate::types::{
11    AttemptId, AttemptIndex, CancelSource, EdgeId, ExecutionId, FlowId, LaneId, LeaseEpoch,
12    LeaseFence, LeaseId, Namespace, SignalId, SuspensionId, TimestampMs, WaitpointId,
13    WaitpointToken, WorkerId, WorkerInstanceId,
14};
15use serde::{Deserialize, Serialize};
16use std::collections::{BTreeMap, BTreeSet, HashMap};
17
18// ─── create_execution ───
19
20#[derive(Clone, Debug, Serialize, Deserialize)]
21pub struct CreateExecutionArgs {
22    pub execution_id: ExecutionId,
23    pub namespace: Namespace,
24    pub lane_id: LaneId,
25    pub execution_kind: String,
26    pub input_payload: Vec<u8>,
27    #[serde(default)]
28    pub payload_encoding: Option<String>,
29    pub priority: i32,
30    pub creator_identity: String,
31    #[serde(default)]
32    pub idempotency_key: Option<String>,
33    #[serde(default)]
34    pub tags: HashMap<String, String>,
35    /// Execution policy (retry, timeout, suspension, routing, etc.).
36    #[serde(default)]
37    pub policy: Option<ExecutionPolicy>,
38    /// If set and in the future, execution starts delayed.
39    #[serde(default)]
40    pub delay_until: Option<TimestampMs>,
41    /// Absolute deadline timestamp (ms). Execution expires if not complete by this time.
42    #[serde(default)]
43    pub execution_deadline_at: Option<TimestampMs>,
44    /// Partition ID (pre-computed).
45    pub partition_id: u16,
46    pub now: TimestampMs,
47}
48
49#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
50pub enum CreateExecutionResult {
51    /// Execution created successfully.
52    Created {
53        execution_id: ExecutionId,
54        public_state: PublicState,
55    },
56    /// Idempotent duplicate — existing execution returned.
57    Duplicate { execution_id: ExecutionId },
58}
59
60// ─── issue_claim_grant ───
61
62#[derive(Clone, Debug, Serialize, Deserialize)]
63pub struct IssueClaimGrantArgs {
64    pub execution_id: ExecutionId,
65    pub lane_id: LaneId,
66    pub worker_id: WorkerId,
67    pub worker_instance_id: WorkerInstanceId,
68    #[serde(default)]
69    pub capability_hash: Option<String>,
70    #[serde(default)]
71    pub route_snapshot_json: Option<String>,
72    #[serde(default)]
73    pub admission_summary: Option<String>,
74    /// Capabilities this worker advertises. Serialized as a sorted,
75    /// comma-separated string to the Lua FCALL (see scheduling.lua
76    /// ff_issue_claim_grant). An empty set matches only executions whose
77    /// `required_capabilities` is also empty.
78    #[serde(default)]
79    pub worker_capabilities: BTreeSet<String>,
80    pub grant_ttl_ms: u64,
81    /// Caller-side timestamp for bookkeeping. NOT passed to the Lua FCALL —
82    /// ff_issue_claim_grant uses `redis.call("TIME")` for grant_expires_at.
83    pub now: TimestampMs,
84}
85
86#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
87pub enum IssueClaimGrantResult {
88    /// Grant issued.
89    Granted { execution_id: ExecutionId },
90}
91
92/// A claim grant issued by the scheduler for a specific execution.
93///
94/// The worker uses this to call `ff_claim_execution` (or
95/// `ff_acquire_lease`), which atomically consumes the grant and
96/// creates the lease.
97///
98/// Shared wire-level type between `ff-scheduler` (issuer) and
99/// `ff-sdk` (consumer, via `FlowFabricWorker::claim_from_grant`).
100/// Lives in `ff-core` so neither crate needs a dep on the other.
101///
102/// **Lane asymmetry with [`ReclaimGrant`]:** `ClaimGrant` does NOT
103/// carry `lane_id`. The issuing scheduler's caller already picked
104/// a lane (that's how admission reached this grant) and passes it
105/// through to `claim_from_grant` as a separate argument. The grant
106/// handle stays narrow to what uniquely identifies the admission
107/// decision. The matching field on [`ReclaimGrant`] is an
108/// intentional divergence — see the note on that type.
109#[derive(Clone, Debug, PartialEq, Eq)]
110pub struct ClaimGrant {
111    /// The execution that was granted.
112    pub execution_id: ExecutionId,
113    /// Opaque partition handle for this execution's hash-tag slot.
114    ///
115    /// Public wire type: consumers pass it back to FlowFabric but
116    /// must not parse the interior hash tag for routing decisions.
117    /// Internal consumers that need the typed
118    /// [`crate::partition::Partition`] call [`Self::partition`].
119    pub partition_key: crate::partition::PartitionKey,
120    /// The Valkey key holding the grant hash (for the worker to
121    /// reference).
122    pub grant_key: String,
123    /// When the grant expires if not consumed.
124    pub expires_at_ms: u64,
125}
126
127impl ClaimGrant {
128    /// Parse `partition_key` into a typed
129    /// [`crate::partition::Partition`]. Intended for internal
130    /// consumers (scheduler emitter, SDK worker claim path) that
131    /// need the family/index pair. Fails only on malformed keys
132    /// (which indicates a producer bug).
133    ///
134    /// Alias collapse applies: a grant issued against `Execution`
135    /// family round-trips to `Flow` (see [`crate::partition::PartitionKey`]
136    /// for the rationale — routing is preserved, only the metadata
137    /// family label normalises).
138    pub fn partition(
139        &self,
140    ) -> Result<crate::partition::Partition, crate::partition::PartitionKeyParseError> {
141        self.partition_key.parse()
142    }
143}
144
145/// A reclaim grant issued for a resumed (attempt_interrupted) execution.
146///
147/// Issued by a producer (typically `ff-scheduler` once a Batch-C
148/// reclaim scanner is in place; test fixtures in the interim — no
149/// production Rust caller exists in-tree today). Consumed by
150/// [`FlowFabricWorker::claim_from_reclaim_grant`], which calls
151/// `ff_claim_resumed_execution` atomically: that FCALL validates the
152/// grant, consumes it, and transitions `attempt_interrupted` →
153/// `started` while preserving the existing `attempt_index` +
154/// `attempt_id` (a resumed execution re-uses its attempt; it does
155/// not start a new one).
156///
157/// Mirrors [`ClaimGrant`] for the resume path. Differences:
158///
159///   * [`ClaimGrant`] is issued against a freshly-eligible
160///     execution and `ff_claim_execution` creates a new attempt.
161///   * [`ReclaimGrant`] is issued against an `attempt_interrupted`
162///     execution; `ff_claim_resumed_execution` re-uses the existing
163///     attempt and bumps the lease epoch.
164///
165/// The grant itself is written to the same `claim_grant` Valkey key
166/// that [`ClaimGrant`] uses; the distinction is which Lua FCALL
167/// consumes it (`ff_claim_execution` for new attempts,
168/// `ff_claim_resumed_execution` for resumes).
169///
170/// **Lane asymmetry with [`ClaimGrant`]:** `ReclaimGrant` CARRIES
171/// `lane_id` as a field. The issuing path already knows the lane
172/// (it's read from `exec_core` at grant time); carrying it here
173/// spares the consumer a `HGET exec_core lane_id` round trip on
174/// the hot claim path. The asymmetry is intentional — prefer
175/// one-fewer-HGET on a type that already lives with the resumer's
176/// lifecycle over strict handle symmetry with `ClaimGrant`.
177///
178/// Shared wire-level type between the eventual `ff-scheduler`
179/// producer (Batch-C reclaim scanner — not yet in-tree; test
180/// fixtures construct this type today) and `ff-sdk` (consumer, via
181/// `FlowFabricWorker::claim_from_reclaim_grant`). Lives in
182/// `ff-core` so neither crate needs a dep on the other.
183///
184/// [`FlowFabricWorker::claim_from_reclaim_grant`]: https://docs.rs/ff-sdk
185#[derive(Clone, Debug, PartialEq, Eq)]
186pub struct ReclaimGrant {
187    /// The execution granted for resumption.
188    pub execution_id: ExecutionId,
189    /// Opaque partition handle for this execution's hash-tag slot.
190    ///
191    /// Same wire-opacity contract as [`ClaimGrant::partition_key`].
192    /// Internal consumers call [`Self::partition`] for the parsed
193    /// form.
194    pub partition_key: crate::partition::PartitionKey,
195    /// Valkey key of the grant hash — same key shape as
196    /// [`ClaimGrant`].
197    pub grant_key: String,
198    /// Monotonic ms when the grant expires; unconsumed grants
199    /// vanish.
200    pub expires_at_ms: u64,
201    /// Lane the execution belongs to. Needed by
202    /// `ff_claim_resumed_execution` for `KEYS[3]` (eligible_zset)
203    /// and `KEYS[9]` (active_index).
204    pub lane_id: LaneId,
205}
206
207impl ReclaimGrant {
208    /// Parse `partition_key` into a typed
209    /// [`crate::partition::Partition`]. See [`ClaimGrant::partition`]
210    /// for the alias-collapse note.
211    pub fn partition(
212        &self,
213    ) -> Result<crate::partition::Partition, crate::partition::PartitionKeyParseError> {
214        self.partition_key.parse()
215    }
216}
217
218// ─── claim_execution ───
219
220#[derive(Clone, Debug, Serialize, Deserialize)]
221pub struct ClaimExecutionArgs {
222    pub execution_id: ExecutionId,
223    pub worker_id: WorkerId,
224    pub worker_instance_id: WorkerInstanceId,
225    pub lane_id: LaneId,
226    pub lease_id: LeaseId,
227    pub lease_ttl_ms: u64,
228    pub attempt_id: AttemptId,
229    /// Expected attempt index (pre-read from exec_core.total_attempt_count).
230    /// Used for KEYS construction — must match what the Lua computes.
231    pub expected_attempt_index: AttemptIndex,
232    /// JSON-encoded attempt policy snapshot.
233    #[serde(default)]
234    pub attempt_policy_json: String,
235    /// Per-attempt timeout in ms.
236    #[serde(default)]
237    pub attempt_timeout_ms: Option<u64>,
238    /// Total execution deadline (absolute timestamp ms).
239    #[serde(default)]
240    pub execution_deadline_at: Option<i64>,
241    pub now: TimestampMs,
242}
243
244#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
245pub struct ClaimedExecution {
246    pub execution_id: ExecutionId,
247    pub lease_id: LeaseId,
248    pub lease_epoch: LeaseEpoch,
249    pub attempt_index: AttemptIndex,
250    pub attempt_id: AttemptId,
251    pub attempt_type: AttemptType,
252    pub lease_expires_at: TimestampMs,
253}
254
255#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
256pub enum ClaimExecutionResult {
257    /// Successfully claimed.
258    Claimed(ClaimedExecution),
259}
260
261// ─── complete_execution ───
262
263#[derive(Clone, Debug, Serialize, Deserialize)]
264pub struct CompleteExecutionArgs {
265    pub execution_id: ExecutionId,
266    /// RFC #58.5 — fence triple. `Some` for SDK worker paths (standard
267    /// stale-lease fence). `None` for operator overrides, in which case
268    /// `source` must be `CancelSource::OperatorOverride` or the Lua
269    /// returns `fence_required`.
270    #[serde(default)]
271    pub fence: Option<LeaseFence>,
272    pub attempt_index: AttemptIndex,
273    #[serde(default)]
274    pub result_payload: Option<Vec<u8>>,
275    #[serde(default)]
276    pub result_encoding: Option<String>,
277    /// RFC #58.5 — unfenced-call gate. Ignored when `fence` is `Some`.
278    #[serde(default)]
279    pub source: CancelSource,
280    pub now: TimestampMs,
281}
282
283#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
284pub enum CompleteExecutionResult {
285    /// Execution completed successfully.
286    Completed {
287        execution_id: ExecutionId,
288        public_state: PublicState,
289    },
290}
291
292// ─── renew_lease ───
293
294#[derive(Clone, Debug, Serialize, Deserialize)]
295pub struct RenewLeaseArgs {
296    pub execution_id: ExecutionId,
297    pub attempt_index: AttemptIndex,
298    /// RFC #58.5 — fence triple. Required (no operator override path for
299    /// renew). `None` returns `fence_required`.
300    pub fence: Option<LeaseFence>,
301    /// How long to extend the lease (milliseconds).
302    pub lease_ttl_ms: u64,
303    /// Grace period after lease_expires_at before the lease_current key is auto-deleted.
304    #[serde(default = "default_lease_history_grace_ms")]
305    pub lease_history_grace_ms: u64,
306}
307
308fn default_lease_history_grace_ms() -> u64 {
309    60_000
310}
311
312#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
313pub enum RenewLeaseResult {
314    /// Lease renewed.
315    Renewed { expires_at: TimestampMs },
316}
317
318// ─── mark_lease_expired_if_due ───
319
320#[derive(Clone, Debug, Serialize, Deserialize)]
321pub struct MarkLeaseExpiredArgs {
322    pub execution_id: ExecutionId,
323}
324
325#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
326pub enum MarkLeaseExpiredResult {
327    /// Lease was marked as expired.
328    MarkedExpired,
329    /// No action needed (already expired, not yet due, not active, etc.).
330    AlreadySatisfied { reason: String },
331}
332
333// ─── cancel_execution ───
334
335#[derive(Clone, Debug, Serialize, Deserialize)]
336pub struct CancelExecutionArgs {
337    pub execution_id: ExecutionId,
338    pub reason: String,
339    #[serde(default)]
340    pub source: CancelSource,
341    /// Required if not operator_override and execution is active.
342    #[serde(default)]
343    pub lease_id: Option<LeaseId>,
344    #[serde(default)]
345    pub lease_epoch: Option<LeaseEpoch>,
346    /// Required if not operator_override and execution is active.
347    #[serde(default)]
348    pub attempt_id: Option<AttemptId>,
349    pub now: TimestampMs,
350}
351
352#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
353pub enum CancelExecutionResult {
354    /// Execution cancelled.
355    Cancelled {
356        execution_id: ExecutionId,
357        public_state: PublicState,
358    },
359}
360
361// ─── revoke_lease ───
362
363#[derive(Clone, Debug, Serialize, Deserialize)]
364pub struct RevokeLeaseArgs {
365    pub execution_id: ExecutionId,
366    /// If set, only revoke if this matches the current lease. Empty string skips check.
367    #[serde(default)]
368    pub expected_lease_id: Option<String>,
369    /// Worker instance whose lease set to clean up. Read from exec_core before calling.
370    pub worker_instance_id: WorkerInstanceId,
371    pub reason: String,
372}
373
374#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
375pub enum RevokeLeaseResult {
376    /// Lease revoked.
377    Revoked {
378        lease_id: String,
379        lease_epoch: String,
380    },
381    /// Already revoked or expired — no action needed.
382    AlreadySatisfied { reason: String },
383}
384
385// ─── delay_execution ───
386
387#[derive(Clone, Debug, Serialize, Deserialize)]
388pub struct DelayExecutionArgs {
389    pub execution_id: ExecutionId,
390    /// RFC #58.5 — fence triple. `None` requires `source ==
391    /// CancelSource::OperatorOverride`.
392    #[serde(default)]
393    pub fence: Option<LeaseFence>,
394    pub attempt_index: AttemptIndex,
395    pub delay_until: TimestampMs,
396    #[serde(default)]
397    pub source: CancelSource,
398    pub now: TimestampMs,
399}
400
401#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
402pub enum DelayExecutionResult {
403    /// Execution delayed.
404    Delayed {
405        execution_id: ExecutionId,
406        public_state: PublicState,
407    },
408}
409
410// ─── move_to_waiting_children ───
411
412#[derive(Clone, Debug, Serialize, Deserialize)]
413pub struct MoveToWaitingChildrenArgs {
414    pub execution_id: ExecutionId,
415    /// RFC #58.5 — fence triple. `None` requires `source ==
416    /// CancelSource::OperatorOverride`.
417    #[serde(default)]
418    pub fence: Option<LeaseFence>,
419    pub attempt_index: AttemptIndex,
420    #[serde(default)]
421    pub source: CancelSource,
422    pub now: TimestampMs,
423}
424
425#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
426pub enum MoveToWaitingChildrenResult {
427    /// Moved to waiting children.
428    Moved {
429        execution_id: ExecutionId,
430        public_state: PublicState,
431    },
432}
433
434// ─── change_priority ───
435
436#[derive(Clone, Debug, Serialize, Deserialize)]
437pub struct ChangePriorityArgs {
438    pub execution_id: ExecutionId,
439    pub new_priority: i32,
440    pub lane_id: LaneId,
441    pub now: TimestampMs,
442}
443
444#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
445pub enum ChangePriorityResult {
446    /// Priority changed and re-scored.
447    Changed { execution_id: ExecutionId },
448}
449
450// ─── update_progress ───
451
452#[derive(Clone, Debug, Serialize, Deserialize)]
453pub struct UpdateProgressArgs {
454    pub execution_id: ExecutionId,
455    pub lease_id: LeaseId,
456    pub lease_epoch: LeaseEpoch,
457    pub attempt_id: AttemptId,
458    #[serde(default)]
459    pub progress_pct: Option<u8>,
460    #[serde(default)]
461    pub progress_message: Option<String>,
462    pub now: TimestampMs,
463}
464
465#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
466pub enum UpdateProgressResult {
467    /// Progress updated.
468    Updated,
469}
470
471// ═══════════════════════════════════════════════════════════════════════
472// Phase 2 contracts: fail, reclaim, expire
473// ═══════════════════════════════════════════════════════════════════════
474
475// ─── fail_execution ───
476
477#[derive(Clone, Debug, Serialize, Deserialize)]
478pub struct FailExecutionArgs {
479    pub execution_id: ExecutionId,
480    /// RFC #58.5 — fence triple. `None` requires `source ==
481    /// CancelSource::OperatorOverride`.
482    #[serde(default)]
483    pub fence: Option<LeaseFence>,
484    pub attempt_index: AttemptIndex,
485    pub failure_reason: String,
486    pub failure_category: String,
487    /// JSON-encoded retry policy (from execution policy). Empty = no retries.
488    #[serde(default)]
489    pub retry_policy_json: String,
490    /// JSON-encoded attempt policy for the next retry attempt.
491    #[serde(default)]
492    pub next_attempt_policy_json: String,
493    #[serde(default)]
494    pub source: CancelSource,
495}
496
497/// Outcome of a fail_execution call.
498#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
499pub enum FailExecutionResult {
500    /// Retry was scheduled — execution is delayed with backoff.
501    RetryScheduled {
502        delay_until: TimestampMs,
503        next_attempt_index: AttemptIndex,
504    },
505    /// No retries left — execution is terminal failed.
506    TerminalFailed,
507}
508
509// ─── issue_reclaim_grant ───
510
511#[derive(Clone, Debug, Serialize, Deserialize)]
512pub struct IssueReclaimGrantArgs {
513    pub execution_id: ExecutionId,
514    pub worker_id: WorkerId,
515    pub worker_instance_id: WorkerInstanceId,
516    pub lane_id: LaneId,
517    #[serde(default)]
518    pub capability_hash: Option<String>,
519    pub grant_ttl_ms: u64,
520    #[serde(default)]
521    pub route_snapshot_json: Option<String>,
522    #[serde(default)]
523    pub admission_summary: Option<String>,
524    /// Caller-side timestamp for bookkeeping. NOT passed to the Lua FCALL —
525    /// ff_issue_reclaim_grant uses `redis.call("TIME")` for grant_expires_at
526    /// (same as ff_issue_claim_grant). Kept for contract symmetry with
527    /// IssueClaimGrantArgs and scheduler audit logging.
528    pub now: TimestampMs,
529}
530
531#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
532pub enum IssueReclaimGrantResult {
533    /// Reclaim grant issued.
534    Granted { expires_at_ms: TimestampMs },
535}
536
537// ─── reclaim_execution ───
538
539#[derive(Clone, Debug, Serialize, Deserialize)]
540pub struct ReclaimExecutionArgs {
541    pub execution_id: ExecutionId,
542    pub worker_id: WorkerId,
543    pub worker_instance_id: WorkerInstanceId,
544    pub lane_id: LaneId,
545    #[serde(default)]
546    pub capability_hash: Option<String>,
547    pub lease_id: LeaseId,
548    pub lease_ttl_ms: u64,
549    pub attempt_id: AttemptId,
550    /// JSON-encoded attempt policy for the reclaim attempt.
551    #[serde(default)]
552    pub attempt_policy_json: String,
553    /// Maximum reclaim count before terminal failure. Default: 100.
554    #[serde(default = "default_max_reclaim_count")]
555    pub max_reclaim_count: u32,
556    /// Old worker instance (for old_worker_leases key construction).
557    pub old_worker_instance_id: WorkerInstanceId,
558    /// Current attempt index (for old_attempt/old_stream_meta key construction).
559    pub current_attempt_index: AttemptIndex,
560}
561
562fn default_max_reclaim_count() -> u32 {
563    100
564}
565
566#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
567pub enum ReclaimExecutionResult {
568    /// Execution reclaimed — new attempt + new lease.
569    Reclaimed {
570        new_attempt_index: AttemptIndex,
571        new_attempt_id: AttemptId,
572        new_lease_id: LeaseId,
573        new_lease_epoch: LeaseEpoch,
574        lease_expires_at: TimestampMs,
575    },
576    /// Max reclaims exceeded — execution moved to terminal.
577    MaxReclaimsExceeded,
578}
579
580// ─── expire_execution ───
581
582#[derive(Clone, Debug, Serialize, Deserialize)]
583pub struct ExpireExecutionArgs {
584    pub execution_id: ExecutionId,
585    /// "attempt_timeout" or "execution_deadline"
586    pub expire_reason: String,
587}
588
589#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
590pub enum ExpireExecutionResult {
591    /// Execution expired.
592    Expired { execution_id: ExecutionId },
593    /// Already terminal — no-op.
594    AlreadyTerminal,
595}
596
597// ═══════════════════════════════════════════════════════════════════════
598// Phase 3 contracts: suspend, signal, resume, waitpoint
599// ═══════════════════════════════════════════════════════════════════════
600
601// ─── suspend_execution ───
602
603#[derive(Clone, Debug, Serialize, Deserialize)]
604pub struct SuspendExecutionArgs {
605    pub execution_id: ExecutionId,
606    /// RFC #58.5 — fence triple. Required (no operator override path for
607    /// suspend). `None` returns `fence_required`.
608    pub fence: Option<LeaseFence>,
609    pub attempt_index: AttemptIndex,
610    pub suspension_id: SuspensionId,
611    pub waitpoint_id: WaitpointId,
612    pub waitpoint_key: String,
613    pub reason_code: String,
614    pub requested_by: String,
615    pub resume_condition_json: String,
616    pub resume_policy_json: String,
617    #[serde(default)]
618    pub continuation_metadata_pointer: Option<String>,
619    #[serde(default)]
620    pub timeout_at: Option<TimestampMs>,
621    /// true to activate a pending waitpoint, false to create new.
622    #[serde(default)]
623    pub use_pending_waitpoint: bool,
624    /// Timeout behavior: "fail", "cancel", "expire", "auto_resume", "escalate".
625    #[serde(default = "default_timeout_behavior")]
626    pub timeout_behavior: String,
627}
628
629fn default_timeout_behavior() -> String {
630    "fail".to_owned()
631}
632
633#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
634pub enum SuspendExecutionResult {
635    /// Execution suspended, waitpoint active.
636    Suspended {
637        suspension_id: SuspensionId,
638        waitpoint_id: WaitpointId,
639        waitpoint_key: String,
640        /// HMAC-SHA1 token bound to (waitpoint_id, waitpoint_key, created_at).
641        /// Required by signal-delivery callers to authenticate against this
642        /// waitpoint (RFC-004 §Waitpoint Security).
643        waitpoint_token: WaitpointToken,
644    },
645    /// Buffered signals already satisfied the condition — suspension skipped.
646    /// Lease is still held. Token comes from the pending waitpoint record.
647    AlreadySatisfied {
648        suspension_id: SuspensionId,
649        waitpoint_id: WaitpointId,
650        waitpoint_key: String,
651        waitpoint_token: WaitpointToken,
652    },
653}
654
655// ─── resume_execution ───
656
657#[derive(Clone, Debug, Serialize, Deserialize)]
658pub struct ResumeExecutionArgs {
659    pub execution_id: ExecutionId,
660    /// "signal", "operator", "auto_resume"
661    #[serde(default = "default_trigger_type")]
662    pub trigger_type: String,
663    /// Optional delay before becoming eligible (ms).
664    #[serde(default)]
665    pub resume_delay_ms: u64,
666}
667
668fn default_trigger_type() -> String {
669    "signal".to_owned()
670}
671
672#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
673pub enum ResumeExecutionResult {
674    /// Execution resumed to runnable.
675    Resumed { public_state: PublicState },
676}
677
678// ─── create_pending_waitpoint ───
679
680#[derive(Clone, Debug, Serialize, Deserialize)]
681pub struct CreatePendingWaitpointArgs {
682    pub execution_id: ExecutionId,
683    pub lease_id: LeaseId,
684    pub lease_epoch: LeaseEpoch,
685    pub attempt_index: AttemptIndex,
686    pub attempt_id: AttemptId,
687    pub waitpoint_id: WaitpointId,
688    pub waitpoint_key: String,
689    /// Short expiry for the pending waitpoint (ms).
690    pub expires_in_ms: u64,
691}
692
693#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
694pub enum CreatePendingWaitpointResult {
695    /// Pending waitpoint created.
696    Created {
697        waitpoint_id: WaitpointId,
698        waitpoint_key: String,
699        /// HMAC-SHA1 token bound to the pending waitpoint. Required for
700        /// `buffer_signal_for_pending_waitpoint` and carried forward when
701        /// the waitpoint is activated by `suspend_execution`.
702        waitpoint_token: WaitpointToken,
703    },
704}
705
706// ─── close_waitpoint ───
707
708#[derive(Clone, Debug, Serialize, Deserialize)]
709pub struct CloseWaitpointArgs {
710    pub waitpoint_id: WaitpointId,
711    pub reason: String,
712}
713
714#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
715pub enum CloseWaitpointResult {
716    /// Waitpoint closed.
717    Closed,
718}
719
720// ─── deliver_signal ───
721
722#[derive(Clone, Debug, Serialize, Deserialize)]
723pub struct DeliverSignalArgs {
724    pub execution_id: ExecutionId,
725    pub waitpoint_id: WaitpointId,
726    pub signal_id: SignalId,
727    pub signal_name: String,
728    pub signal_category: String,
729    pub source_type: String,
730    pub source_identity: String,
731    #[serde(default)]
732    pub payload: Option<Vec<u8>>,
733    #[serde(default)]
734    pub payload_encoding: Option<String>,
735    #[serde(default)]
736    pub correlation_id: Option<String>,
737    #[serde(default)]
738    pub idempotency_key: Option<String>,
739    pub target_scope: String,
740    #[serde(default)]
741    pub created_at: Option<TimestampMs>,
742    /// Dedup TTL for idempotency key (ms).
743    #[serde(default)]
744    pub dedup_ttl_ms: Option<u64>,
745    /// Resume delay after signal satisfaction (ms).
746    #[serde(default)]
747    pub resume_delay_ms: Option<u64>,
748    /// Max signals per execution (default 10000).
749    #[serde(default)]
750    pub max_signals_per_execution: Option<u64>,
751    /// MAXLEN for the waitpoint signal stream.
752    #[serde(default)]
753    pub signal_maxlen: Option<u64>,
754    /// HMAC-SHA1 token issued when the waitpoint was created. Required for
755    /// signal delivery; missing/tampered/rotated-past-grace tokens are
756    /// rejected with `invalid_token` or `token_expired` (RFC-004).
757    ///
758    /// Defense-in-depth: `WaitpointToken` is a transparent string newtype,
759    /// so an empty string deserializes successfully from JSON. The
760    /// validation boundary is in Lua (`validate_waitpoint_token` returns
761    /// `missing_token` on empty input); this type intentionally does NOT
762    /// pre-reject at the Rust layer so callers get a consistent typed
763    /// error regardless of how they constructed the args.
764    pub waitpoint_token: WaitpointToken,
765    pub now: TimestampMs,
766}
767
768#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
769pub enum DeliverSignalResult {
770    /// Signal accepted with the given effect.
771    Accepted { signal_id: SignalId, effect: String },
772    /// Duplicate signal (idempotency key matched).
773    Duplicate { existing_signal_id: SignalId },
774}
775
776// ─── buffer_signal_for_pending_waitpoint ───
777
778#[derive(Clone, Debug, Serialize, Deserialize)]
779pub struct BufferSignalArgs {
780    pub execution_id: ExecutionId,
781    pub waitpoint_id: WaitpointId,
782    pub signal_id: SignalId,
783    pub signal_name: String,
784    pub signal_category: String,
785    pub source_type: String,
786    pub source_identity: String,
787    #[serde(default)]
788    pub payload: Option<Vec<u8>>,
789    #[serde(default)]
790    pub payload_encoding: Option<String>,
791    #[serde(default)]
792    pub idempotency_key: Option<String>,
793    pub target_scope: String,
794    /// HMAC-SHA1 token issued when `create_pending_waitpoint` ran. Required
795    /// to authenticate early signals targeting the pending waitpoint.
796    pub waitpoint_token: WaitpointToken,
797    pub now: TimestampMs,
798}
799
800#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
801pub enum BufferSignalResult {
802    /// Signal buffered for pending waitpoint.
803    Buffered { signal_id: SignalId },
804    /// Duplicate signal.
805    Duplicate { existing_signal_id: SignalId },
806}
807
808// ─── list_pending_waitpoints ───
809
810/// One entry in the read-only view of an execution's active waitpoints.
811///
812/// Returned by `EngineBackend::list_pending_waitpoints` (and the
813/// `GET /v1/executions/{id}/pending-waitpoints` REST endpoint).
814///
815/// **RFC-017 §8 schema rewrite (Stage D1).** This struct no longer
816/// carries the raw HMAC `waitpoint_token` at the trait boundary — the
817/// backend emits only the sanitised `(token_kid, token_fingerprint)`
818/// pair. The HTTP handler (see `ff-server::api::list_pending_waitpoints`)
819/// wraps the trait response and re-injects the real token on the
820/// v0.7.x wire for one-release deprecation warning; the wire field is
821/// removed entirely at v0.8.0.
822#[non_exhaustive]
823#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
824pub struct PendingWaitpointInfo {
825    pub waitpoint_id: WaitpointId,
826    pub waitpoint_key: String,
827    /// Current waitpoint state: `pending`, `active`, `closed`. Callers
828    /// typically filter to `pending` or `active`.
829    pub state: String,
830    /// Signal names the resume condition is waiting for. Reviewers that
831    /// need to drive a specific waitpoint — particularly when multiple
832    /// concurrent waitpoints exist on one execution — filter on this to
833    /// pick the right target.
834    ///
835    /// An EMPTY vec means the condition matches any signal (wildcard, per
836    /// `lua/helpers.lua` `initialize_condition`). Callers must not infer
837    /// "no waitpoint" from empty; check `state` / length of the outer
838    /// list for that.
839    #[serde(default)]
840    pub required_signal_names: Vec<String>,
841    /// Timestamp when the waitpoint record was first written.
842    pub created_at: TimestampMs,
843    /// Timestamp when the waitpoint was activated (suspension landed).
844    /// `None` while the waitpoint is still `pending`.
845    #[serde(default, skip_serializing_if = "Option::is_none")]
846    pub activated_at: Option<TimestampMs>,
847    /// Scheduled expiration timestamp. `None` if no timeout configured.
848    #[serde(default, skip_serializing_if = "Option::is_none")]
849    pub expires_at: Option<TimestampMs>,
850    /// Owning execution — surfaces without a separate lookup.
851    pub execution_id: ExecutionId,
852    /// HMAC key identifier (the `<kid>` prefix of the stored
853    /// `waitpoint_token`). Safe to expose — identifies which signing
854    /// key minted the token without revealing the key material.
855    pub token_kid: String,
856    /// 16-hex-char (8-byte) fingerprint of the HMAC digest. Audit-friendly
857    /// handle that correlates across logs without being replayable.
858    pub token_fingerprint: String,
859}
860
861impl PendingWaitpointInfo {
862    /// Construct a `PendingWaitpointInfo` with the 7 required fields.
863    /// Optional fields (`activated_at`, `expires_at`) default to
864    /// `None`; use [`Self::with_activated_at`] / [`Self::with_expires_at`]
865    /// to populate them. `required_signal_names` defaults to empty
866    /// (wildcard condition); use [`Self::with_required_signal_names`]
867    /// to set it.
868    pub fn new(
869        waitpoint_id: WaitpointId,
870        waitpoint_key: String,
871        state: String,
872        created_at: TimestampMs,
873        execution_id: ExecutionId,
874        token_kid: String,
875        token_fingerprint: String,
876    ) -> Self {
877        Self {
878            waitpoint_id,
879            waitpoint_key,
880            state,
881            required_signal_names: Vec::new(),
882            created_at,
883            activated_at: None,
884            expires_at: None,
885            execution_id,
886            token_kid,
887            token_fingerprint,
888        }
889    }
890
891    pub fn with_activated_at(mut self, activated_at: TimestampMs) -> Self {
892        self.activated_at = Some(activated_at);
893        self
894    }
895
896    pub fn with_expires_at(mut self, expires_at: TimestampMs) -> Self {
897        self.expires_at = Some(expires_at);
898        self
899    }
900
901    pub fn with_required_signal_names(mut self, names: Vec<String>) -> Self {
902        self.required_signal_names = names;
903        self
904    }
905}
906
907// ─── expire_suspension ───
908
909#[derive(Clone, Debug, Serialize, Deserialize)]
910pub struct ExpireSuspensionArgs {
911    pub execution_id: ExecutionId,
912}
913
914#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
915pub enum ExpireSuspensionResult {
916    /// Suspension expired with the given behavior applied.
917    Expired { behavior_applied: String },
918    /// Already resolved — no action needed.
919    AlreadySatisfied { reason: String },
920}
921
922// ─── claim_resumed_execution ───
923
924#[derive(Clone, Debug, Serialize, Deserialize)]
925pub struct ClaimResumedExecutionArgs {
926    pub execution_id: ExecutionId,
927    pub worker_id: WorkerId,
928    pub worker_instance_id: WorkerInstanceId,
929    pub lane_id: LaneId,
930    pub lease_id: LeaseId,
931    pub lease_ttl_ms: u64,
932    /// Current attempt index (for KEYS construction — from exec_core).
933    pub current_attempt_index: AttemptIndex,
934    /// Remaining attempt timeout from before suspension (ms). 0 = no timeout.
935    #[serde(default)]
936    pub remaining_attempt_timeout_ms: Option<u64>,
937    pub now: TimestampMs,
938}
939
940#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
941pub struct ClaimedResumedExecution {
942    pub execution_id: ExecutionId,
943    pub lease_id: LeaseId,
944    pub lease_epoch: LeaseEpoch,
945    pub attempt_index: AttemptIndex,
946    pub attempt_id: AttemptId,
947    pub lease_expires_at: TimestampMs,
948}
949
950#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
951pub enum ClaimResumedExecutionResult {
952    /// Successfully claimed resumed execution (same attempt continues).
953    Claimed(ClaimedResumedExecution),
954}
955
956// ═══════════════════════════════════════════════════════════════════════
957// Phase 4 contracts: stream
958// ═══════════════════════════════════════════════════════════════════════
959
960// ─── append_frame ───
961
962#[derive(Clone, Debug, Serialize, Deserialize)]
963pub struct AppendFrameArgs {
964    pub execution_id: ExecutionId,
965    pub attempt_index: AttemptIndex,
966    pub lease_id: LeaseId,
967    pub lease_epoch: LeaseEpoch,
968    pub attempt_id: AttemptId,
969    pub frame_type: String,
970    pub timestamp: TimestampMs,
971    pub payload: Vec<u8>,
972    #[serde(default)]
973    pub encoding: Option<String>,
974    /// Optional structured metadata for the frame (JSON blob).
975    #[serde(default)]
976    pub metadata_json: Option<String>,
977    #[serde(default)]
978    pub correlation_id: Option<String>,
979    #[serde(default)]
980    pub source: Option<String>,
981    /// MAXLEN for the stream. 0 = no trim.
982    #[serde(default)]
983    pub retention_maxlen: Option<u32>,
984    /// Max payload bytes per frame. Default: 65536.
985    #[serde(default)]
986    pub max_payload_bytes: Option<u32>,
987}
988
989#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
990pub enum AppendFrameResult {
991    /// Frame appended successfully.
992    Appended {
993        /// Valkey Stream entry ID (e.g. "1713100800150-0").
994        entry_id: String,
995        /// Total frame count after this append.
996        frame_count: u64,
997    },
998}
999
1000// ─── StreamCursor (issue #92) ───
1001
1002/// Opaque cursor for attempt-stream reads/tails.
1003///
1004/// Replaces the bare `&str` / `String` stream-id parameters previously
1005/// carried on `read_stream` / `tail_stream` / `ReadStreamParams` /
1006/// `TailStreamParams`. The wire form is a flat string — serde is
1007/// transparent via `try_from`/`into` — so `?from=start&to=end` and
1008/// `?after=123-0` continue to work for REST clients.
1009///
1010/// # Public wire grammar
1011///
1012/// The ONLY accepted tokens are:
1013///
1014/// * `"start"` — first entry in the stream (XRANGE `-` equivalent).
1015///   Valid in `read_stream` / `ReadStreamParams`.
1016/// * `"end"` — latest entry in the stream (XRANGE `+` equivalent).
1017///   Valid in `read_stream` / `ReadStreamParams`.
1018/// * `"<ms>"` or `"<ms>-<seq>"` — a concrete Valkey Stream entry id.
1019///   Valid everywhere.
1020///
1021/// The bare XRANGE/XREAD markers `"-"` and `"+"` are **NOT** accepted
1022/// on the wire. The opaque `StreamCursor` grammar is the public
1023/// contract; the Valkey `-`/`+` markers are an internal implementation
1024/// detail carried only inside the Lua-adjacent [`ReadFramesArgs`] /
1025/// `xread_block` path via [`StreamCursor::to_wire`].
1026///
1027/// For XREAD (tail), the documented "from the beginning" convention is
1028/// `StreamCursor::At("0-0".into())` — use the convenience constructor
1029/// [`StreamCursor::from_beginning`] which returns exactly that value.
1030/// `Start` / `End` are rejected by the SDK's `tail_stream` boundary
1031/// because XREAD does not accept `-` / `+` as cursors. The
1032/// [`StreamCursor::is_concrete`] helper centralises this
1033/// Start/End-vs-At decision for boundary-validation call sites.
1034///
1035/// # Why an enum instead of a string
1036///
1037/// A string parameter lets malformed ids escape to the Lua/Valkey
1038/// layer, surfacing as a script error and HTTP 500. An enum with
1039/// fallible `FromStr` / `TryFrom<String>` catches every malformed input
1040/// at the wire boundary with a structured error, and prevents bare `-`
1041/// / `+` from leaking into consumer code as tacit extensions of the
1042/// public API.
1043#[derive(Clone, Debug, PartialEq, Eq, Hash)]
1044pub enum StreamCursor {
1045    /// First entry in the stream (XRANGE start marker).
1046    Start,
1047    /// Latest entry in the stream (XRANGE end marker).
1048    End,
1049    /// A concrete Valkey Stream entry id (`<ms>` or `<ms>-<seq>`).
1050    ///
1051    /// For XREAD-style tails, the documented "from the beginning"
1052    /// convention is `At("0-0".to_owned())` — see
1053    /// [`StreamCursor::from_beginning`].
1054    At(String),
1055}
1056
1057impl StreamCursor {
1058    /// Convenience constructor for the XREAD-from-beginning convention
1059    /// (`"0-0"`). XREAD's `last_id` is exclusive, so passing this as
1060    /// the `after` cursor returns every entry in the stream.
1061    pub fn from_beginning() -> Self {
1062        Self::At("0-0".to_owned())
1063    }
1064
1065    /// Serde default helper — emits `StreamCursor::Start`. Used as
1066    /// `#[serde(default = "StreamCursor::start")]` on REST query
1067    /// structs.
1068    pub fn start() -> Self {
1069        Self::Start
1070    }
1071
1072    /// Serde default helper — emits `StreamCursor::End`.
1073    pub fn end() -> Self {
1074        Self::End
1075    }
1076
1077    /// Serde default helper — emits
1078    /// `StreamCursor::from_beginning()`. Used as the default for
1079    /// `TailStreamParams::after`.
1080    pub fn beginning() -> Self {
1081        Self::from_beginning()
1082    }
1083
1084    /// Internal-only: lower the cursor to the XRANGE/XREAD marker
1085    /// string Valkey expects. `Start → "-"`, `End → "+"`,
1086    /// `At(s) → s`.
1087    ///
1088    /// Used at the ff-script adapter edge (right before constructing
1089    /// `ReadFramesArgs` or calling `xread_block`) to translate the
1090    /// opaque wire grammar into the Lua-ABI form. NOT part of the
1091    /// public wire — do not emit these raw characters to consumers.
1092    /// Hidden from the generated docs to discourage external use;
1093    /// external consumers should never need to see the raw `-` / `+`.
1094    #[doc(hidden)]
1095    pub fn to_wire(&self) -> &str {
1096        match self {
1097            Self::Start => "-",
1098            Self::End => "+",
1099            Self::At(s) => s.as_str(),
1100        }
1101    }
1102
1103    /// Internal-only owned variant of [`Self::to_wire`] — moves the
1104    /// inner `String` out of `At(s)` without cloning. Use at adapter
1105    /// edges that construct an owned wire string (e.g.
1106    /// `ReadFramesArgs.from_id`) from a `StreamCursor` that is about
1107    /// to be dropped.
1108    #[doc(hidden)]
1109    pub fn into_wire_string(self) -> String {
1110        match self {
1111            Self::Start => "-".to_owned(),
1112            Self::End => "+".to_owned(),
1113            Self::At(s) => s,
1114        }
1115    }
1116
1117    /// True iff this cursor is a concrete entry id
1118    /// (`"<ms>"` / `"<ms>-<seq>"`). False for the open markers
1119    /// `Start` / `End`.
1120    ///
1121    /// Used by boundaries like XREAD (tailing) that do not accept
1122    /// open markers — rejecting a cursor is equivalent to
1123    /// `!cursor.is_concrete()`. Centralised here to keep the SDK and
1124    /// REST guards in lock-step.
1125    pub fn is_concrete(&self) -> bool {
1126        matches!(self, Self::At(_))
1127    }
1128}
1129
1130impl std::fmt::Display for StreamCursor {
1131    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1132        match self {
1133            Self::Start => f.write_str("start"),
1134            Self::End => f.write_str("end"),
1135            Self::At(s) => f.write_str(s),
1136        }
1137    }
1138}
1139
1140/// Error produced when parsing a [`StreamCursor`] from a string.
1141#[derive(Clone, Debug, PartialEq, Eq)]
1142pub enum StreamCursorParseError {
1143    /// Empty input.
1144    Empty,
1145    /// Input matched a rejected bare-marker alias (`"-"`, `"+"`).
1146    /// The public wire requires `"start"` / `"end"`; the raw Valkey
1147    /// markers are internal-only.
1148    BareMarkerRejected(String),
1149    /// Input was neither a recognized keyword nor a well-formed
1150    /// Stream entry id. Entry ids must match `^\d+(?:-\d+)?$`.
1151    Malformed(String),
1152}
1153
1154impl std::fmt::Display for StreamCursorParseError {
1155    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1156        match self {
1157            Self::Empty => f.write_str("stream cursor must not be empty"),
1158            Self::BareMarkerRejected(s) => write!(
1159                f,
1160                "bare marker '{s}' is not a valid stream cursor; use 'start' or 'end'"
1161            ),
1162            Self::Malformed(s) => write!(
1163                f,
1164                "invalid stream cursor '{s}' (expected 'start', 'end', '<ms>', or '<ms>-<seq>')"
1165            ),
1166        }
1167    }
1168}
1169
1170impl std::error::Error for StreamCursorParseError {}
1171
1172/// Shared grammar check — classifies `s` as `Start` / `End` / a
1173/// concrete-id shape / malformed / empty, WITHOUT allocating. The
1174/// owned vs borrowed entry points ([`StreamCursor::from_str`],
1175/// [`StreamCursor::try_from`]) consume this classification and move
1176/// the owned `String` into `At` when applicable, avoiding a
1177/// round-trip `String → &str → String::to_owned` for the common
1178/// REST-query path.
1179enum StreamCursorClass {
1180    Start,
1181    End,
1182    Concrete,
1183    BareMarker,
1184    Empty,
1185    Malformed,
1186}
1187
1188fn classify_stream_cursor(s: &str) -> StreamCursorClass {
1189    if s.is_empty() {
1190        return StreamCursorClass::Empty;
1191    }
1192    if s == "-" || s == "+" {
1193        return StreamCursorClass::BareMarker;
1194    }
1195    if s == "start" {
1196        return StreamCursorClass::Start;
1197    }
1198    if s == "end" {
1199        return StreamCursorClass::End;
1200    }
1201    if !s.is_ascii() {
1202        return StreamCursorClass::Malformed;
1203    }
1204    let (ms_part, seq_part) = match s.split_once('-') {
1205        Some((ms, seq)) => (ms, Some(seq)),
1206        None => (s, None),
1207    };
1208    let ms_ok = !ms_part.is_empty() && ms_part.bytes().all(|b| b.is_ascii_digit());
1209    let seq_ok = seq_part
1210        .map(|p| !p.is_empty() && p.bytes().all(|b| b.is_ascii_digit()))
1211        .unwrap_or(true);
1212    if ms_ok && seq_ok {
1213        StreamCursorClass::Concrete
1214    } else {
1215        StreamCursorClass::Malformed
1216    }
1217}
1218
1219impl std::str::FromStr for StreamCursor {
1220    type Err = StreamCursorParseError;
1221
1222    fn from_str(s: &str) -> Result<Self, Self::Err> {
1223        match classify_stream_cursor(s) {
1224            StreamCursorClass::Start => Ok(Self::Start),
1225            StreamCursorClass::End => Ok(Self::End),
1226            StreamCursorClass::Concrete => Ok(Self::At(s.to_owned())),
1227            StreamCursorClass::BareMarker => {
1228                Err(StreamCursorParseError::BareMarkerRejected(s.to_owned()))
1229            }
1230            StreamCursorClass::Empty => Err(StreamCursorParseError::Empty),
1231            StreamCursorClass::Malformed => Err(StreamCursorParseError::Malformed(s.to_owned())),
1232        }
1233    }
1234}
1235
1236impl TryFrom<String> for StreamCursor {
1237    type Error = StreamCursorParseError;
1238
1239    fn try_from(s: String) -> Result<Self, Self::Error> {
1240        // Owned parsing path — the `At` variant moves `s` in directly,
1241        // avoiding the `&str → String::to_owned` re-allocation that a
1242        // blind forward to `FromStr::from_str(&s)` would force. Error
1243        // paths still pay one allocation to describe the offending
1244        // input.
1245        match classify_stream_cursor(&s) {
1246            StreamCursorClass::Start => Ok(Self::Start),
1247            StreamCursorClass::End => Ok(Self::End),
1248            StreamCursorClass::Concrete => Ok(Self::At(s)),
1249            StreamCursorClass::BareMarker => Err(StreamCursorParseError::BareMarkerRejected(s)),
1250            StreamCursorClass::Empty => Err(StreamCursorParseError::Empty),
1251            StreamCursorClass::Malformed => Err(StreamCursorParseError::Malformed(s)),
1252        }
1253    }
1254}
1255
1256impl From<StreamCursor> for String {
1257    fn from(c: StreamCursor) -> Self {
1258        c.to_string()
1259    }
1260}
1261
1262impl Serialize for StreamCursor {
1263    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
1264        serializer.collect_str(self)
1265    }
1266}
1267
1268impl<'de> Deserialize<'de> for StreamCursor {
1269    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
1270        let s = String::deserialize(deserializer)?;
1271        Self::try_from(s).map_err(serde::de::Error::custom)
1272    }
1273}
1274
1275// ─── read_attempt_stream / tail_attempt_stream ───
1276
1277/// Hard cap on the number of frames returned by a single read/tail call.
1278///
1279/// Single source of truth across the Rust layer (ff-script, ff-server,
1280/// ff-sdk). The Lua side in `lua/stream.lua` keeps a matching literal with
1281/// an inline reference back here; bump both together if you ever need to
1282/// lift the cap.
1283pub const STREAM_READ_HARD_CAP: u64 = 10_000;
1284
1285/// A single frame read from an attempt-scoped stream.
1286///
1287/// Field set mirrors what `ff_append_frame` writes: `frame_type`, `ts`,
1288/// `payload`, `encoding`, `source`, and optionally `correlation_id`. Stored
1289/// as an ordered map so field order is deterministic across read calls.
1290#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1291pub struct StreamFrame {
1292    /// Valkey Stream entry ID, e.g. "1713100800150-0".
1293    pub id: String,
1294    /// Frame fields in sorted order.
1295    pub fields: std::collections::BTreeMap<String, String>,
1296}
1297
1298/// Inputs to `ff_read_attempt_stream` (XRANGE wrapper).
1299#[derive(Clone, Debug, Serialize, Deserialize)]
1300pub struct ReadFramesArgs {
1301    pub execution_id: ExecutionId,
1302    pub attempt_index: AttemptIndex,
1303    /// XRANGE start ID. Use "-" for earliest.
1304    pub from_id: String,
1305    /// XRANGE end ID. Use "+" for latest.
1306    pub to_id: String,
1307    /// XRANGE COUNT limit. MUST be `>= 1`. The REST and SDK layers reject
1308    /// `0` at the boundary; the Lua side rejects it too. `STREAM_READ_HARD_CAP`
1309    /// is the upper bound.
1310    pub count_limit: u64,
1311}
1312
1313/// Result of reading frames from an attempt stream — frames plus terminal
1314/// signal so consumers can stop polling without a timeout fallback.
1315#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1316pub struct StreamFrames {
1317    /// Entries in the requested range (possibly empty).
1318    pub frames: Vec<StreamFrame>,
1319    /// Timestamp when the upstream writer closed the stream. `None` if the
1320    /// stream is still open (or has never been written).
1321    #[serde(default, skip_serializing_if = "Option::is_none")]
1322    pub closed_at: Option<TimestampMs>,
1323    /// Reason from the closing writer. Current values:
1324    /// `attempt_success`, `attempt_failure`, `attempt_cancelled`,
1325    /// `attempt_interrupted`. `None` iff the stream is still open.
1326    #[serde(default, skip_serializing_if = "Option::is_none")]
1327    pub closed_reason: Option<String>,
1328}
1329
1330impl StreamFrames {
1331    /// Construct an empty open-stream result (no frames, no terminal
1332    /// markers). Useful for fast-path peek helpers.
1333    pub fn empty_open() -> Self {
1334        Self {
1335            frames: Vec::new(),
1336            closed_at: None,
1337            closed_reason: None,
1338        }
1339    }
1340
1341    /// True iff the producer has closed this stream. Consumers should stop
1342    /// polling and drain once this returns true.
1343    pub fn is_closed(&self) -> bool {
1344        self.closed_at.is_some()
1345    }
1346}
1347
1348#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1349pub enum ReadFramesResult {
1350    /// Frames returned (possibly empty) plus optional closed markers.
1351    Frames(StreamFrames),
1352}
1353
1354// ═══════════════════════════════════════════════════════════════════════
1355// Phase 5 contracts: budget, quota, block/unblock
1356// ═══════════════════════════════════════════════════════════════════════
1357
1358// ─── create_budget ───
1359
1360#[derive(Clone, Debug, Serialize, Deserialize)]
1361pub struct CreateBudgetArgs {
1362    pub budget_id: crate::types::BudgetId,
1363    pub scope_type: String,
1364    pub scope_id: String,
1365    pub enforcement_mode: String,
1366    pub on_hard_limit: String,
1367    pub on_soft_limit: String,
1368    pub reset_interval_ms: u64,
1369    /// Dimension names.
1370    pub dimensions: Vec<String>,
1371    /// Hard limits per dimension (parallel with dimensions).
1372    pub hard_limits: Vec<u64>,
1373    /// Soft limits per dimension (parallel with dimensions).
1374    pub soft_limits: Vec<u64>,
1375    pub now: TimestampMs,
1376}
1377
1378#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1379pub enum CreateBudgetResult {
1380    /// Budget created.
1381    Created { budget_id: crate::types::BudgetId },
1382    /// Already exists (idempotent).
1383    AlreadySatisfied { budget_id: crate::types::BudgetId },
1384}
1385
1386// ─── create_quota_policy ───
1387
1388#[derive(Clone, Debug, Serialize, Deserialize)]
1389pub struct CreateQuotaPolicyArgs {
1390    pub quota_policy_id: crate::types::QuotaPolicyId,
1391    pub window_seconds: u64,
1392    pub max_requests_per_window: u64,
1393    pub max_concurrent: u64,
1394    pub now: TimestampMs,
1395}
1396
1397#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1398pub enum CreateQuotaPolicyResult {
1399    /// Quota policy created.
1400    Created {
1401        quota_policy_id: crate::types::QuotaPolicyId,
1402    },
1403    /// Already exists (idempotent).
1404    AlreadySatisfied {
1405        quota_policy_id: crate::types::QuotaPolicyId,
1406    },
1407}
1408
1409// ─── budget_status (read-only) ───
1410
1411/// Operator-facing budget status snapshot (not an FCALL — direct HGETALL reads).
1412#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1413pub struct BudgetStatus {
1414    pub budget_id: String,
1415    pub scope_type: String,
1416    pub scope_id: String,
1417    pub enforcement_mode: String,
1418    /// Current usage per dimension: {dimension_name: current_value}.
1419    pub usage: HashMap<String, u64>,
1420    /// Hard limits per dimension: {dimension_name: limit}.
1421    pub hard_limits: HashMap<String, u64>,
1422    /// Soft limits per dimension: {dimension_name: limit}.
1423    pub soft_limits: HashMap<String, u64>,
1424    pub breach_count: u64,
1425    pub soft_breach_count: u64,
1426    pub last_breach_at: Option<String>,
1427    pub last_breach_dim: Option<String>,
1428    pub next_reset_at: Option<String>,
1429    pub created_at: Option<String>,
1430}
1431
1432// ─── report_usage_and_check ───
1433
1434#[derive(Clone, Debug, Serialize, Deserialize)]
1435pub struct ReportUsageArgs {
1436    /// Dimension names to increment.
1437    pub dimensions: Vec<String>,
1438    /// Increment values (parallel with dimensions).
1439    pub deltas: Vec<u64>,
1440    pub now: TimestampMs,
1441    /// Optional idempotency key to prevent double-counting on retries.
1442    /// Pass the raw dedup id (e.g. `"retry-42"`); the typed FCALL wrapper
1443    /// wraps it into `ff:usagededup:{b:M}:<id>` using the budget
1444    /// partition's hash tag so it co-locates with the other budget keys
1445    /// (#108).
1446    #[serde(default)]
1447    pub dedup_key: Option<String>,
1448}
1449
1450#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1451#[non_exhaustive]
1452pub enum ReportUsageResult {
1453    /// All increments applied, no breach.
1454    Ok,
1455    /// Soft limit breached on a dimension (advisory, increments applied).
1456    SoftBreach {
1457        dimension: String,
1458        current_usage: u64,
1459        soft_limit: u64,
1460    },
1461    /// Hard limit breached (increments NOT applied).
1462    HardBreach {
1463        dimension: String,
1464        current_usage: u64,
1465        hard_limit: u64,
1466    },
1467    /// Dedup key matched — usage already applied in a prior call.
1468    AlreadyApplied,
1469}
1470
1471// ─── reset_budget ───
1472
1473#[derive(Clone, Debug, Serialize, Deserialize)]
1474pub struct ResetBudgetArgs {
1475    pub budget_id: crate::types::BudgetId,
1476    pub now: TimestampMs,
1477}
1478
1479#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1480pub enum ResetBudgetResult {
1481    /// Budget reset successfully.
1482    Reset { next_reset_at: TimestampMs },
1483}
1484
1485// ─── check_admission_and_record ───
1486
1487#[derive(Clone, Debug, Serialize, Deserialize)]
1488pub struct CheckAdmissionArgs {
1489    pub execution_id: ExecutionId,
1490    pub now: TimestampMs,
1491    pub window_seconds: u64,
1492    pub rate_limit: u64,
1493    pub concurrency_cap: u64,
1494    #[serde(default)]
1495    pub jitter_ms: Option<u64>,
1496}
1497
1498#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1499pub enum CheckAdmissionResult {
1500    /// Admitted — execution may proceed.
1501    Admitted,
1502    /// Already admitted in this window (idempotent).
1503    AlreadyAdmitted,
1504    /// Rate limit exceeded.
1505    RateExceeded { retry_after_ms: u64 },
1506    /// Concurrency cap hit.
1507    ConcurrencyExceeded,
1508}
1509
1510// ─── release_admission ───
1511
1512#[derive(Clone, Debug, Serialize, Deserialize)]
1513pub struct ReleaseAdmissionArgs {
1514    pub execution_id: ExecutionId,
1515}
1516
1517#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1518pub enum ReleaseAdmissionResult {
1519    Released,
1520}
1521
1522// ─── block_execution_for_admission ───
1523
1524#[derive(Clone, Debug, Serialize, Deserialize)]
1525pub struct BlockExecutionArgs {
1526    pub execution_id: ExecutionId,
1527    pub blocking_reason: String,
1528    #[serde(default)]
1529    pub blocking_detail: Option<String>,
1530    pub now: TimestampMs,
1531}
1532
1533#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1534pub enum BlockExecutionResult {
1535    /// Execution blocked.
1536    Blocked,
1537}
1538
1539// ─── unblock_execution ───
1540
1541#[derive(Clone, Debug, Serialize, Deserialize)]
1542pub struct UnblockExecutionArgs {
1543    pub execution_id: ExecutionId,
1544    pub now: TimestampMs,
1545    /// Expected blocking reason (prevents stale unblock).
1546    #[serde(default)]
1547    pub expected_blocking_reason: Option<String>,
1548}
1549
1550#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1551pub enum UnblockExecutionResult {
1552    /// Execution unblocked and moved to eligible.
1553    Unblocked,
1554}
1555
1556// ═══════════════════════════════════════════════════════════════════════
1557// Phase 6 contracts: flow coordination and dependencies
1558// ═══════════════════════════════════════════════════════════════════════
1559
1560// ─── create_flow ───
1561
1562#[derive(Clone, Debug, Serialize, Deserialize)]
1563pub struct CreateFlowArgs {
1564    pub flow_id: crate::types::FlowId,
1565    pub flow_kind: String,
1566    pub namespace: Namespace,
1567    pub now: TimestampMs,
1568}
1569
1570#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1571pub enum CreateFlowResult {
1572    /// Flow created successfully.
1573    Created { flow_id: crate::types::FlowId },
1574    /// Flow already exists (idempotent).
1575    AlreadySatisfied { flow_id: crate::types::FlowId },
1576}
1577
1578// ─── add_execution_to_flow ───
1579
1580#[derive(Clone, Debug, Serialize, Deserialize)]
1581pub struct AddExecutionToFlowArgs {
1582    pub flow_id: crate::types::FlowId,
1583    pub execution_id: ExecutionId,
1584    pub now: TimestampMs,
1585}
1586
1587#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1588pub enum AddExecutionToFlowResult {
1589    /// Execution added to flow.
1590    Added {
1591        execution_id: ExecutionId,
1592        new_node_count: u32,
1593    },
1594    /// Already a member (idempotent).
1595    AlreadyMember {
1596        execution_id: ExecutionId,
1597        node_count: u32,
1598    },
1599}
1600
1601// ─── cancel_flow ───
1602
1603#[derive(Clone, Debug, Serialize, Deserialize)]
1604pub struct CancelFlowArgs {
1605    pub flow_id: crate::types::FlowId,
1606    pub reason: String,
1607    pub cancellation_policy: String,
1608    pub now: TimestampMs,
1609}
1610
1611#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1612pub enum CancelFlowResult {
1613    /// Flow cancelled and all member cancellations (if any) have completed
1614    /// synchronously. Used when `cancellation_policy != "cancel_all"`, when
1615    /// the flow has no members, when the caller opted into synchronous
1616    /// dispatch (e.g. `?wait=true`), or when the flow was already in a
1617    /// terminal state (idempotent retry).
1618    ///
1619    /// On the idempotent-retry path `member_execution_ids` may be *capped*
1620    /// at the server (default 1000 entries) to bound response bandwidth on
1621    /// flows with very large membership. The first (non-idempotent) call
1622    /// always returns the full list, so clients that need every member id
1623    /// should persist the initial response.
1624    Cancelled {
1625        cancellation_policy: String,
1626        member_execution_ids: Vec<String>,
1627    },
1628    /// Flow state was flipped to cancelled atomically, but member
1629    /// cancellations are dispatched asynchronously in the background.
1630    /// Clients may poll `GET /v1/executions/{id}/state` for each member
1631    /// execution id to track terminal state.
1632    CancellationScheduled {
1633        cancellation_policy: String,
1634        member_count: u32,
1635        member_execution_ids: Vec<String>,
1636    },
1637    /// `?wait=true` dispatch completed but one or more member cancellations
1638    /// failed mid-loop (e.g. ghost member, Lua error, transport fault after
1639    /// retries exhausted). The flow itself is still flipped to cancelled
1640    /// (atomic Lua already ran); callers SHOULD inspect
1641    /// `failed_member_execution_ids` and either retry those ids directly
1642    /// via `cancel_execution` or wait for the cancel-backlog reconciler
1643    /// to retry them in the background.
1644    ///
1645    /// Only emitted by the synchronous wait path
1646    /// ([`crate::CancelFlowArgs`] via `?wait=true`). The async path returns
1647    /// [`CancelFlowResult::CancellationScheduled`] and delegates retries
1648    /// to the reconciler — there is no visible "partial" state on the
1649    /// async path because the dispatch result is not observed inline.
1650    PartiallyCancelled {
1651        cancellation_policy: String,
1652        /// All member execution ids that the cancel_flow FCALL returned
1653        /// (i.e. the full membership at the moment of cancellation).
1654        member_execution_ids: Vec<String>,
1655        /// Strict subset of `member_execution_ids` whose per-member cancel
1656        /// FCALL returned an error. Order is deterministic (matches the
1657        /// iteration order over `member_execution_ids`).
1658        failed_member_execution_ids: Vec<String>,
1659    },
1660}
1661
1662/// RFC-017 Stage E2: result of the "header" portion of a cancel_flow
1663/// operation — the atomic flow-state flip + member enumeration.
1664///
1665/// The Server composes this with its own wait/async member-dispatch
1666/// machinery to build the wire-level [`CancelFlowResult`]. Backends
1667/// implement [`crate::engine_backend::EngineBackend::cancel_flow_header`]
1668/// (default: `Unavailable`) so the Valkey-native `ff_cancel_flow`
1669/// FCALL (with its `flow_already_terminal` idempotency branch) can be
1670/// driven through the trait without re-shaping the existing public
1671/// `cancel_flow(id, policy, wait)` signature.
1672#[derive(Clone, Debug, PartialEq, Eq)]
1673#[non_exhaustive]
1674pub enum CancelFlowHeader {
1675    /// Flow-state flipped this call. `member_execution_ids` is the
1676    /// full (uncapped) membership at flip time.
1677    Cancelled {
1678        cancellation_policy: String,
1679        member_execution_ids: Vec<String>,
1680    },
1681    /// Flow was already in a terminal state on entry. The backend has
1682    /// surfaced the *stored* `cancellation_policy`, `cancel_reason`,
1683    /// and full membership so the Server can return an idempotent
1684    /// [`CancelFlowResult::Cancelled`] without re-doing the flip.
1685    AlreadyTerminal {
1686        /// `None` only for flows cancelled by pre-E2 Lua that never
1687        /// persisted the policy field.
1688        stored_cancellation_policy: Option<String>,
1689        /// `None` when the flow was never cancel-reason-stamped.
1690        stored_cancel_reason: Option<String>,
1691        /// Full membership. Server caps to
1692        /// `ALREADY_TERMINAL_MEMBER_CAP` before wiring.
1693        member_execution_ids: Vec<String>,
1694    },
1695}
1696
1697// ─── stage_dependency_edge ───
1698
1699#[derive(Clone, Debug, Serialize, Deserialize)]
1700pub struct StageDependencyEdgeArgs {
1701    pub flow_id: crate::types::FlowId,
1702    pub edge_id: crate::types::EdgeId,
1703    pub upstream_execution_id: ExecutionId,
1704    pub downstream_execution_id: ExecutionId,
1705    #[serde(default = "default_dependency_kind")]
1706    pub dependency_kind: String,
1707    #[serde(default)]
1708    pub data_passing_ref: Option<String>,
1709    pub expected_graph_revision: u64,
1710    pub now: TimestampMs,
1711}
1712
1713fn default_dependency_kind() -> String {
1714    "success_only".to_owned()
1715}
1716
1717#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1718pub enum StageDependencyEdgeResult {
1719    /// Edge staged, new graph revision.
1720    Staged {
1721        edge_id: crate::types::EdgeId,
1722        new_graph_revision: u64,
1723    },
1724}
1725
1726// ─── apply_dependency_to_child ───
1727
1728#[derive(Clone, Debug, Serialize, Deserialize)]
1729pub struct ApplyDependencyToChildArgs {
1730    pub flow_id: crate::types::FlowId,
1731    pub edge_id: crate::types::EdgeId,
1732    /// The child execution that receives the dependency.
1733    pub downstream_execution_id: ExecutionId,
1734    pub upstream_execution_id: ExecutionId,
1735    pub graph_revision: u64,
1736    #[serde(default = "default_dependency_kind")]
1737    pub dependency_kind: String,
1738    #[serde(default)]
1739    pub data_passing_ref: Option<String>,
1740    pub now: TimestampMs,
1741}
1742
1743#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1744pub enum ApplyDependencyToChildResult {
1745    /// Dependency applied, N unsatisfied deps remaining.
1746    Applied { unsatisfied_count: u32 },
1747    /// Already applied (idempotent).
1748    AlreadyApplied,
1749}
1750
1751// ─── resolve_dependency ───
1752
1753#[derive(Clone, Debug, Serialize, Deserialize)]
1754pub struct ResolveDependencyArgs {
1755    pub edge_id: crate::types::EdgeId,
1756    /// "success", "failed", "cancelled", "expired", "skipped"
1757    pub upstream_outcome: String,
1758    pub now: TimestampMs,
1759}
1760
1761#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1762pub enum ResolveDependencyResult {
1763    /// Edge satisfied — downstream may become eligible.
1764    Satisfied,
1765    /// Edge made impossible — downstream becomes skipped.
1766    Impossible,
1767    /// Already resolved (idempotent).
1768    AlreadyResolved,
1769}
1770
1771// ─── promote_blocked_to_eligible ───
1772
1773#[derive(Clone, Debug, Serialize, Deserialize)]
1774pub struct PromoteBlockedToEligibleArgs {
1775    pub execution_id: ExecutionId,
1776    pub now: TimestampMs,
1777}
1778
1779#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1780pub enum PromoteBlockedToEligibleResult {
1781    Promoted,
1782}
1783
1784// ─── evaluate_flow_eligibility ───
1785
1786#[derive(Clone, Debug, Serialize, Deserialize)]
1787pub struct EvaluateFlowEligibilityArgs {
1788    pub execution_id: ExecutionId,
1789}
1790
1791#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1792pub enum EvaluateFlowEligibilityResult {
1793    /// Execution eligibility status.
1794    Status { status: String },
1795}
1796
1797// ─── replay_execution ───
1798
1799#[derive(Clone, Debug, Serialize, Deserialize)]
1800pub struct ReplayExecutionArgs {
1801    pub execution_id: ExecutionId,
1802    pub now: TimestampMs,
1803}
1804
1805#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1806pub enum ReplayExecutionResult {
1807    /// Replayed to runnable.
1808    Replayed { public_state: PublicState },
1809}
1810
1811// ─── get_execution (full read) ───
1812
1813/// Full execution info returned by `Server::get_execution`.
1814#[derive(Clone, Debug, Serialize, Deserialize)]
1815pub struct ExecutionInfo {
1816    pub execution_id: ExecutionId,
1817    pub namespace: String,
1818    pub lane_id: String,
1819    pub priority: i32,
1820    pub execution_kind: String,
1821    pub state_vector: StateVector,
1822    pub public_state: PublicState,
1823    pub created_at: String,
1824    /// TimestampMs (ms since epoch) when the execution's first attempt
1825    /// was started by a worker claim. Empty string until the first
1826    /// claim lands. Serialised as `Option<String>` so pre-claim reads
1827    /// deserialise cleanly even if the field is absent from the wire.
1828    #[serde(default, skip_serializing_if = "Option::is_none")]
1829    pub started_at: Option<String>,
1830    /// TimestampMs when the execution reached a terminal
1831    /// `completed`/`failed`/`cancelled`/`expired` state. Empty /
1832    /// absent while still in flight.
1833    #[serde(default, skip_serializing_if = "Option::is_none")]
1834    pub completed_at: Option<String>,
1835    pub current_attempt_index: u32,
1836    pub flow_id: Option<String>,
1837    pub blocking_detail: String,
1838}
1839
1840// ─── set_execution_tags / set_flow_tags (issue #58.4) ───
1841
1842/// Args for `ff_set_execution_tags`. Tag keys MUST match
1843/// `^[a-z][a-z0-9_]*\.` — the caller-namespace rule — or the FCALL
1844/// returns `invalid_tag_key`. Values are arbitrary strings. The map is
1845/// ordered (`BTreeMap`) so two callers submitting the same logical set
1846/// of tags produce identical ARGV.
1847#[derive(Clone, Debug, Serialize, Deserialize)]
1848pub struct SetExecutionTagsArgs {
1849    pub execution_id: ExecutionId,
1850    pub tags: BTreeMap<String, String>,
1851}
1852
1853/// Result of `ff_set_execution_tags`.
1854#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1855pub enum SetExecutionTagsResult {
1856    /// Tags written. `count` is the number of key-value pairs applied.
1857    Ok { count: u32 },
1858}
1859
1860/// Args for `ff_set_flow_tags`. Same namespace rule as
1861/// [`SetExecutionTagsArgs`]. The Lua function also lazy-migrates any
1862/// pre-58.4 reserved-namespace fields stashed inline on `flow_core` into
1863/// the new tags key.
1864#[derive(Clone, Debug, Serialize, Deserialize)]
1865pub struct SetFlowTagsArgs {
1866    pub flow_id: FlowId,
1867    pub tags: BTreeMap<String, String>,
1868}
1869
1870/// Result of `ff_set_flow_tags`.
1871#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1872pub enum SetFlowTagsResult {
1873    /// Tags written. `count` is the number of key-value pairs applied.
1874    Ok { count: u32 },
1875}
1876
1877// ─── describe_execution (issue #58.1) ───
1878
1879/// Engine-decoupled read-model for one execution.
1880///
1881/// Returned by `ff_sdk::FlowFabricWorker::describe_execution`. Consumers
1882/// consult this struct instead of reaching into Valkey's exec_core hash
1883/// directly — the engine is free to rename fields or restructure storage
1884/// under this surface.
1885///
1886/// `#[non_exhaustive]` — FF may add fields in minor releases without a
1887/// semver break. Match with `..` or use field-by-field construction.
1888#[derive(Clone, Debug, PartialEq, Eq)]
1889#[non_exhaustive]
1890pub struct ExecutionSnapshot {
1891    pub execution_id: ExecutionId,
1892    pub flow_id: Option<FlowId>,
1893    pub lane_id: LaneId,
1894    pub namespace: Namespace,
1895    pub public_state: PublicState,
1896    /// Blocking reason string (e.g. `"waiting_for_worker"`,
1897    /// `"waiting_for_delay"`, `"waiting_for_dependencies"`). `None` when
1898    /// the exec_core field is empty.
1899    pub blocking_reason: Option<String>,
1900    /// Free-form operator-readable detail explaining `blocking_reason`.
1901    /// `None` when the exec_core field is empty.
1902    pub blocking_detail: Option<String>,
1903    /// Summary of the execution's currently-active attempt. `None` when
1904    /// no attempt has been started (pre-claim) or when the exec_core
1905    /// attempt fields are all empty.
1906    pub current_attempt: Option<AttemptSummary>,
1907    /// Summary of the execution's currently-held lease. `None` when the
1908    /// execution is not held by a worker.
1909    pub current_lease: Option<LeaseSummary>,
1910    /// The waitpoint this execution is currently suspended on, if any.
1911    pub current_waitpoint: Option<WaitpointId>,
1912    pub created_at: TimestampMs,
1913    /// Timestamp of the last write that mutated exec_core. Engine-maintained.
1914    pub last_mutation_at: TimestampMs,
1915    pub total_attempt_count: u32,
1916    /// Caller-owned labels. The prefix `^[a-z][a-z0-9_]*\.` is reserved for
1917    /// consumer metadata (e.g. `cairn.task_id`); FF guarantees it will not
1918    /// write keys matching that shape. FF's own fields stay in snake_case
1919    /// without dots. Empty when no tags are set.
1920    pub tags: BTreeMap<String, String>,
1921}
1922
1923impl ExecutionSnapshot {
1924    /// Construct an [`ExecutionSnapshot`]. Present so downstream crates
1925    /// (ff-sdk's `describe_execution`) can assemble the struct despite
1926    /// the `#[non_exhaustive]` marker. Prefer adding builder-style
1927    /// helpers here over loosening `non_exhaustive`.
1928    #[allow(clippy::too_many_arguments)]
1929    pub fn new(
1930        execution_id: ExecutionId,
1931        flow_id: Option<FlowId>,
1932        lane_id: LaneId,
1933        namespace: Namespace,
1934        public_state: PublicState,
1935        blocking_reason: Option<String>,
1936        blocking_detail: Option<String>,
1937        current_attempt: Option<AttemptSummary>,
1938        current_lease: Option<LeaseSummary>,
1939        current_waitpoint: Option<WaitpointId>,
1940        created_at: TimestampMs,
1941        last_mutation_at: TimestampMs,
1942        total_attempt_count: u32,
1943        tags: BTreeMap<String, String>,
1944    ) -> Self {
1945        Self {
1946            execution_id,
1947            flow_id,
1948            lane_id,
1949            namespace,
1950            public_state,
1951            blocking_reason,
1952            blocking_detail,
1953            current_attempt,
1954            current_lease,
1955            current_waitpoint,
1956            created_at,
1957            last_mutation_at,
1958            total_attempt_count,
1959            tags,
1960        }
1961    }
1962}
1963
1964/// Currently-active attempt summary inside an [`ExecutionSnapshot`].
1965///
1966/// `#[non_exhaustive]`.
1967#[derive(Clone, Debug, PartialEq, Eq)]
1968#[non_exhaustive]
1969pub struct AttemptSummary {
1970    pub attempt_id: AttemptId,
1971    pub attempt_index: AttemptIndex,
1972}
1973
1974impl AttemptSummary {
1975    /// Construct an [`AttemptSummary`]. See [`ExecutionSnapshot::new`]
1976    /// for the rationale — `#[non_exhaustive]` blocks cross-crate
1977    /// struct-literal construction.
1978    pub fn new(attempt_id: AttemptId, attempt_index: AttemptIndex) -> Self {
1979        Self {
1980            attempt_id,
1981            attempt_index,
1982        }
1983    }
1984}
1985
1986/// Currently-held lease summary inside an [`ExecutionSnapshot`].
1987///
1988/// `#[non_exhaustive]`. New fields may be added in minor releases — use
1989/// [`LeaseSummary::new`] plus the fluent `with_*` setters to construct
1990/// one, and match with `..` in destructuring.
1991///
1992/// # Field provenance (FF#278)
1993///
1994/// * `lease_id` — minted at claim time (`ff_claim_execution` /
1995///   `ff_claim_resumed_execution`), cleared atomically with the other
1996///   lease fields on revoke/expire/complete. Stable for the lifetime
1997///   of a single lease; a fresh one is minted per re-claim. Defaults
1998///   to the nil UUID when a backend does not surface per-lease ids
1999///   (treat as "field not populated").
2000/// * `attempt_index` — the 1-based attempt counter (`current_attempt_index`
2001///   on `exec_core`). Set atomically with `current_attempt_id` at claim
2002///   time; always populated while a Valkey-backed lease is held.
2003/// * `last_heartbeat_at` — the most recent `ff_renew_lease` timestamp
2004///   (`lease_last_renewed_at` on `exec_core`). `None` when the field
2005///   is empty (e.g. legacy data pre-0.9, or a backend that does not
2006///   surface per-renewal heartbeats).
2007#[derive(Clone, Debug, PartialEq, Eq)]
2008#[non_exhaustive]
2009pub struct LeaseSummary {
2010    pub lease_epoch: LeaseEpoch,
2011    pub worker_instance_id: WorkerInstanceId,
2012    pub expires_at: TimestampMs,
2013    /// Per-lease unique identity. Correlates audit-log entries,
2014    /// reclaim events, and recovery traces.
2015    pub lease_id: LeaseId,
2016    /// 1-based attempt counter; `.0` mirrors `current_attempt_index`
2017    /// on `exec_core`.
2018    pub attempt_index: AttemptIndex,
2019    /// Most recent heartbeat (lease-renewal) timestamp. `None` when the
2020    /// backend does not surface per-renewal ticks on this lease.
2021    pub last_heartbeat_at: Option<TimestampMs>,
2022}
2023
2024impl LeaseSummary {
2025    /// Construct a [`LeaseSummary`] with the three always-present
2026    /// fields. Use the `with_*` setters to populate the FF#278
2027    /// additions (`lease_id`, `attempt_index`, `last_heartbeat_at`);
2028    /// otherwise they default to the nil / zero / empty forms, which
2029    /// callers should treat as "field not surfaced by this backend".
2030    ///
2031    /// See [`ExecutionSnapshot::new`] for the broader `#[non_exhaustive]`
2032    /// construction rationale.
2033    pub fn new(
2034        lease_epoch: LeaseEpoch,
2035        worker_instance_id: WorkerInstanceId,
2036        expires_at: TimestampMs,
2037    ) -> Self {
2038        Self {
2039            lease_epoch,
2040            worker_instance_id,
2041            expires_at,
2042            lease_id: LeaseId::from_uuid(uuid::Uuid::nil()),
2043            attempt_index: AttemptIndex::new(0),
2044            last_heartbeat_at: None,
2045        }
2046    }
2047
2048    /// Set the lease's unique identity (FF#278).
2049    #[must_use]
2050    pub fn with_lease_id(mut self, lease_id: LeaseId) -> Self {
2051        self.lease_id = lease_id;
2052        self
2053    }
2054
2055    /// Set the 1-based attempt counter (FF#278).
2056    #[must_use]
2057    pub fn with_attempt_index(mut self, attempt_index: AttemptIndex) -> Self {
2058        self.attempt_index = attempt_index;
2059        self
2060    }
2061
2062    /// Set the most recent heartbeat timestamp (FF#278).
2063    #[must_use]
2064    pub fn with_last_heartbeat_at(mut self, ts: TimestampMs) -> Self {
2065        self.last_heartbeat_at = Some(ts);
2066        self
2067    }
2068}
2069
2070// ─── Common sub-types ───
2071
2072// ─── describe_flow (issue #58.2) ───
2073
2074/// Engine-decoupled read-model for one flow.
2075///
2076/// Returned by `ff_sdk::FlowFabricWorker::describe_flow`. Consumers
2077/// consult this struct instead of reaching into Valkey's flow_core hash
2078/// directly — the engine is free to rename fields or restructure storage
2079/// under this surface.
2080///
2081/// `#[non_exhaustive]` — FF may add fields in minor releases without a
2082/// semver break. Match with `..` or use [`FlowSnapshot::new`].
2083///
2084/// # `public_flow_state`
2085///
2086/// Stored as an engine-written string literal on `flow_core`. Known
2087/// values today: `open`, `running`, `blocked`, `cancelled`, `completed`,
2088/// `failed`. Surfaced as `String` (not a typed enum) because FF does
2089/// not yet expose a `PublicFlowState` type — callers that need to act
2090/// on specific values should match on the literal. The flow_projector
2091/// writes a parallel `public_flow_state` into the flow's summary hash;
2092/// this field reflects the authoritative value on `flow_core`, which
2093/// is what mutation guards (cancel/add-member) consult.
2094///
2095/// # `tags`
2096///
2097/// Unlike [`ExecutionSnapshot::tags`] (which has a dedicated tags
2098/// hash), flow tags live inline on `flow_core`. FF's own fields are
2099/// snake_case without a `.`; any field whose name starts with
2100/// `<lowercase>.` (e.g. `cairn.task_id`) is treated as consumer-owned
2101/// metadata and routed here. An empty map means no namespaced tags
2102/// were written. The prefix convention mirrors
2103/// [`ExecutionSnapshot::tags`] — consumers should keep tag keys
2104/// namespaced (`cairn.*`, `operator.*`, etc.) so future FF field
2105/// additions don't collide.
2106#[derive(Clone, Debug, PartialEq, Eq)]
2107#[non_exhaustive]
2108pub struct FlowSnapshot {
2109    pub flow_id: FlowId,
2110    /// The `flow_kind` literal passed to `create_flow` (e.g. `dag`,
2111    /// `pipeline`). Preserved as-is; FF does not interpret it.
2112    pub flow_kind: String,
2113    pub namespace: Namespace,
2114    /// Authoritative flow state on `flow_core`. See the struct-level
2115    /// docs for the set of known values.
2116    pub public_flow_state: String,
2117    /// Monotonically increasing revision bumped on every structural
2118    /// mutation (add-member, stage-edge). Used by optimistic-concurrency
2119    /// writers via `expected_graph_revision`.
2120    pub graph_revision: u64,
2121    /// Number of member executions added so far. Never decremented.
2122    pub node_count: u32,
2123    /// Number of dependency edges staged so far. Never decremented.
2124    pub edge_count: u32,
2125    pub created_at: TimestampMs,
2126    /// Timestamp of the last write that mutated `flow_core`.
2127    /// Engine-maintained.
2128    pub last_mutation_at: TimestampMs,
2129    /// When the flow reached a terminal state via `cancel_flow`. `None`
2130    /// while the flow is live. Only written by the cancel path today;
2131    /// `completed`/`failed` terminal states do not populate this field
2132    /// (the flow_projector derives them from membership).
2133    pub cancelled_at: Option<TimestampMs>,
2134    /// Operator-supplied reason from the `cancel_flow` call. `None`
2135    /// when the flow has not been cancelled.
2136    pub cancel_reason: Option<String>,
2137    /// The `cancellation_policy` value persisted by `cancel_flow`
2138    /// (e.g. `cancel_all`, `cancel_flow_only`). `None` for flows
2139    /// cancelled before this field was persisted, or not yet cancelled.
2140    pub cancellation_policy: Option<String>,
2141    /// Consumer-owned namespaced metadata (e.g. `cairn.task_id`). See
2142    /// the struct-level docs for the routing rule.
2143    pub tags: BTreeMap<String, String>,
2144    /// RFC-016 Stage A: inbound edge groups known on this flow.
2145    ///
2146    /// One entry per downstream execution that has at least one staged
2147    /// inbound dependency edge. Populated from the
2148    /// `ff:flow:{fp:N}:<flow_id>:edgegroup:<downstream_eid>` hash —
2149    /// when that hash is absent (existing flows created before Stage A),
2150    /// the backend falls back to reading the legacy
2151    /// `deps_meta.unsatisfied_required_count` counter on the
2152    /// downstream's exec partition and reports the group as
2153    /// [`EdgeDependencyPolicy::AllOf`] with the derived counters
2154    /// (backward-compat shim — see RFC-016 §11 Stage A).
2155    ///
2156    /// Every entry in Stage A reports `policy = AllOf`; Stage B/C/D/E
2157    /// extend the variants and wire the quorum counters.
2158    pub edge_groups: Vec<EdgeGroupSnapshot>,
2159}
2160
2161impl FlowSnapshot {
2162    /// Construct a [`FlowSnapshot`]. Present so downstream crates
2163    /// (ff-sdk's `describe_flow`) can assemble the struct despite the
2164    /// `#[non_exhaustive]` marker.
2165    #[allow(clippy::too_many_arguments)]
2166    pub fn new(
2167        flow_id: FlowId,
2168        flow_kind: String,
2169        namespace: Namespace,
2170        public_flow_state: String,
2171        graph_revision: u64,
2172        node_count: u32,
2173        edge_count: u32,
2174        created_at: TimestampMs,
2175        last_mutation_at: TimestampMs,
2176        cancelled_at: Option<TimestampMs>,
2177        cancel_reason: Option<String>,
2178        cancellation_policy: Option<String>,
2179        tags: BTreeMap<String, String>,
2180        edge_groups: Vec<EdgeGroupSnapshot>,
2181    ) -> Self {
2182        Self {
2183            flow_id,
2184            flow_kind,
2185            namespace,
2186            public_flow_state,
2187            graph_revision,
2188            node_count,
2189            edge_count,
2190            created_at,
2191            last_mutation_at,
2192            cancelled_at,
2193            cancel_reason,
2194            cancellation_policy,
2195            tags,
2196            edge_groups,
2197        }
2198    }
2199}
2200
2201// ─── describe_edge / list_*_edges (issue #58.3) ───
2202
2203/// Engine-decoupled read-model for one dependency edge.
2204///
2205/// Returned by `ff_sdk::FlowFabricWorker::describe_edge`,
2206/// `list_incoming_edges`, and `list_outgoing_edges`. Consumers consult
2207/// this struct instead of reaching into Valkey's per-flow `edge:` hash
2208/// directly — the engine is free to rename hash fields or restructure
2209/// key layout under this surface.
2210///
2211/// `#[non_exhaustive]` — FF may add fields in minor releases without a
2212/// semver break. Match with `..` or use [`EdgeSnapshot::new`].
2213///
2214/// # Fields
2215///
2216/// The struct mirrors the immutable edge record written by
2217/// `ff_stage_dependency_edge` (see `lua/flow.lua`). The flow-scoped
2218/// edge hash is only ever written once, at staging time; per-execution
2219/// resolution state lives on a separate `dep:<edge_id>` hash and is not
2220/// surfaced here. The `edge_state` field therefore reflects the
2221/// staging-time literal (currently `pending`), not the downstream
2222/// execution's dep-edge state.
2223#[derive(Clone, Debug, PartialEq, Eq)]
2224#[non_exhaustive]
2225pub struct EdgeSnapshot {
2226    pub edge_id: EdgeId,
2227    pub flow_id: FlowId,
2228    pub upstream_execution_id: ExecutionId,
2229    pub downstream_execution_id: ExecutionId,
2230    /// The `dependency_kind` literal (e.g. `success_only`) from
2231    /// `stage_dependency_edge`. Preserved as-is; FF does not interpret
2232    /// it on reads.
2233    pub dependency_kind: String,
2234    /// The satisfaction-condition literal stamped at staging time
2235    /// (e.g. `all_required`).
2236    pub satisfaction_condition: String,
2237    /// Optional opaque handle to a data-passing artifact. `None` when
2238    /// the stored field is empty (the most common case).
2239    pub data_passing_ref: Option<String>,
2240    /// Edge-state literal on the flow-scoped edge hash. Written once
2241    /// at staging as `pending`; this hash is immutable on the flow
2242    /// side. Per-execution resolution state is tracked separately on
2243    /// the child's `dep:<edge_id>` hash.
2244    pub edge_state: String,
2245    pub created_at: TimestampMs,
2246    /// Origin of the edge (e.g. `engine`). Preserved as-is.
2247    pub created_by: String,
2248}
2249
2250/// Direction marker for [`crate::engine_backend::EngineBackend::list_edges`].
2251///
2252/// Carries the subject execution whose adjacency side the caller wants
2253/// to list — mirrors the internal `AdjacencySide + subject_eid` pair
2254/// the ff-sdk free-fn `list_edges_from_set` already uses. Keeping
2255/// direction + subject fused in one enum means the trait method has a
2256/// single `direction` parameter rather than a `(side, eid)` pair, and
2257/// the backend impl can't forget one of the two.
2258///
2259/// * `Outgoing { from_node }` — the caller wants every edge whose
2260///   `upstream_execution_id == from_node`. Corresponds to the
2261///   `out:<execution_id>` adjacency SET under the execution's flow
2262///   partition.
2263/// * `Incoming { to_node }` — the caller wants every edge whose
2264///   `downstream_execution_id == to_node`. Corresponds to the
2265///   `in:<execution_id>` adjacency SET under the execution's flow
2266///   partition.
2267#[derive(Clone, Debug, PartialEq, Eq)]
2268pub enum EdgeDirection {
2269    /// Edges leaving `from_node` — `out:` adjacency SET.
2270    Outgoing {
2271        /// The subject execution whose outgoing edges to list.
2272        from_node: ExecutionId,
2273    },
2274    /// Edges landing on `to_node` — `in:` adjacency SET.
2275    Incoming {
2276        /// The subject execution whose incoming edges to list.
2277        to_node: ExecutionId,
2278    },
2279}
2280
2281impl EdgeDirection {
2282    /// Return the subject execution id regardless of direction. Shared
2283    /// helper for backend impls that need the execution id for the
2284    /// initial `HGET exec_core.flow_id` lookup (flow routing) before
2285    /// they know which adjacency SET to read.
2286    pub fn subject(&self) -> &ExecutionId {
2287        match self {
2288            Self::Outgoing { from_node } => from_node,
2289            Self::Incoming { to_node } => to_node,
2290        }
2291    }
2292}
2293
2294impl EdgeSnapshot {
2295    /// Construct an [`EdgeSnapshot`]. Present so downstream crates
2296    /// (ff-sdk's `describe_edge` / `list_*_edges`) can assemble the
2297    /// struct despite the `#[non_exhaustive]` marker.
2298    #[allow(clippy::too_many_arguments)]
2299    pub fn new(
2300        edge_id: EdgeId,
2301        flow_id: FlowId,
2302        upstream_execution_id: ExecutionId,
2303        downstream_execution_id: ExecutionId,
2304        dependency_kind: String,
2305        satisfaction_condition: String,
2306        data_passing_ref: Option<String>,
2307        edge_state: String,
2308        created_at: TimestampMs,
2309        created_by: String,
2310    ) -> Self {
2311        Self {
2312            edge_id,
2313            flow_id,
2314            upstream_execution_id,
2315            downstream_execution_id,
2316            dependency_kind,
2317            satisfaction_condition,
2318            data_passing_ref,
2319            edge_state,
2320            created_at,
2321            created_by,
2322        }
2323    }
2324}
2325
2326// ─── RFC-016 edge-group policy (Stage A) ───
2327
2328/// Policy controlling how an inbound edge group's satisfaction is
2329/// decided.
2330///
2331/// Stage A honours only [`EdgeDependencyPolicy::AllOf`] — the two
2332/// quorum variants exist so the wire/snapshot surface is stable for
2333/// Stage B/C/D's resolver extensions, but
2334/// [`crate::engine_backend::EngineBackend::set_edge_group_policy`]
2335/// rejects them with [`crate::engine_error::EngineError::Validation`]
2336/// until Stage B lands.
2337///
2338/// `#[non_exhaustive]` — future stages may add variants (e.g.
2339/// `Threshold` — see RFC-016 §10.3) without a semver break. Construct
2340/// via the [`EdgeDependencyPolicy::all_of`], [`EdgeDependencyPolicy::any_of`],
2341/// and [`EdgeDependencyPolicy::quorum`] helpers.
2342#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2343#[non_exhaustive]
2344#[serde(tag = "kind", rename_all = "snake_case")]
2345pub enum EdgeDependencyPolicy {
2346    /// Today's behavior: every edge in the inbound group must be
2347    /// satisfied (RFC-007 `all_required` + `success_only`).
2348    AllOf,
2349    /// k-of-n where k==1 — satisfied on the first upstream success.
2350    /// Stage A: rejected on
2351    /// [`crate::engine_backend::EngineBackend::set_edge_group_policy`];
2352    /// resolver emits nothing for this variant yet.
2353    AnyOf {
2354        #[serde(rename = "on_satisfied")]
2355        on_satisfied: OnSatisfied,
2356    },
2357    /// k-of-n quorum. Stage A: rejected on
2358    /// [`crate::engine_backend::EngineBackend::set_edge_group_policy`].
2359    Quorum {
2360        k: u32,
2361        #[serde(rename = "on_satisfied")]
2362        on_satisfied: OnSatisfied,
2363    },
2364}
2365
2366impl EdgeDependencyPolicy {
2367    /// Construct the default all-of policy (RFC-007 behavior).
2368    pub fn all_of() -> Self {
2369        Self::AllOf
2370    }
2371
2372    /// Construct an any-of policy — reserved for Stage B.
2373    pub fn any_of(on_satisfied: OnSatisfied) -> Self {
2374        Self::AnyOf { on_satisfied }
2375    }
2376
2377    /// Construct a quorum policy — reserved for Stage B.
2378    pub fn quorum(k: u32, on_satisfied: OnSatisfied) -> Self {
2379        Self::Quorum { k, on_satisfied }
2380    }
2381
2382    /// Stable string label used for wire format + metric labels.
2383    /// `all_of` | `any_of` | `quorum`.
2384    pub fn variant_str(&self) -> &'static str {
2385        match self {
2386            Self::AllOf => "all_of",
2387            Self::AnyOf { .. } => "any_of",
2388            Self::Quorum { .. } => "quorum",
2389        }
2390    }
2391}
2392
2393/// Policy for unfinished sibling upstreams once the quorum is met.
2394///
2395/// `#[non_exhaustive]` — RFC-016 §10.5 rejects a third variant today
2396/// but keeps the door open. Construct via [`OnSatisfied::cancel_remaining`]
2397/// / [`OnSatisfied::let_run`].
2398#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2399#[non_exhaustive]
2400#[serde(rename_all = "snake_case")]
2401pub enum OnSatisfied {
2402    /// Default. Cancel any still-running siblings once quorum met.
2403    CancelRemaining,
2404    /// Let stragglers finish; their terminals update counters for
2405    /// observability only (one-shot downstream).
2406    LetRun,
2407}
2408
2409impl OnSatisfied {
2410    /// Construct the default `cancel_remaining` disposition.
2411    pub fn cancel_remaining() -> Self {
2412        Self::CancelRemaining
2413    }
2414
2415    /// Construct the `let_run` disposition.
2416    pub fn let_run() -> Self {
2417        Self::LetRun
2418    }
2419
2420    /// Stable string label for wire format.
2421    pub fn variant_str(&self) -> &'static str {
2422        match self {
2423            Self::CancelRemaining => "cancel_remaining",
2424            Self::LetRun => "let_run",
2425        }
2426    }
2427}
2428
2429/// Edge-group lifecycle state (Stage A exposes only `pending` +
2430/// `satisfied` + `impossible`; `cancelled` reserved for Stage C).
2431#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
2432#[non_exhaustive]
2433#[serde(rename_all = "snake_case")]
2434pub enum EdgeGroupState {
2435    Pending,
2436    Satisfied,
2437    Impossible,
2438    Cancelled,
2439}
2440
2441impl EdgeGroupState {
2442    pub fn from_literal(s: &str) -> Self {
2443        match s {
2444            "satisfied" => Self::Satisfied,
2445            "impossible" => Self::Impossible,
2446            "cancelled" => Self::Cancelled,
2447            _ => Self::Pending,
2448        }
2449    }
2450
2451    pub fn as_str(&self) -> &'static str {
2452        match self {
2453            Self::Pending => "pending",
2454            Self::Satisfied => "satisfied",
2455            Self::Impossible => "impossible",
2456            Self::Cancelled => "cancelled",
2457        }
2458    }
2459}
2460
2461/// Snapshot of one inbound edge group (per downstream execution).
2462///
2463/// Exposed via [`FlowSnapshot::edge_groups`]. Stage A only populates
2464/// `AllOf` groups and their counters; Stage B/C add `failed` /
2465/// `skipped` / `satisfied_at` wiring for the quorum variants.
2466///
2467/// `#[non_exhaustive]` — future stages will add fields (`satisfied_at`,
2468/// `failed_count` write-path, `cancel_siblings_pending`). Match with
2469/// `..` or use [`EdgeGroupSnapshot::new`].
2470#[derive(Clone, Debug, PartialEq, Eq)]
2471#[non_exhaustive]
2472pub struct EdgeGroupSnapshot {
2473    pub downstream_execution_id: ExecutionId,
2474    pub policy: EdgeDependencyPolicy,
2475    pub total_deps: u32,
2476    pub satisfied_count: u32,
2477    pub failed_count: u32,
2478    pub skipped_count: u32,
2479    pub running_count: u32,
2480    pub group_state: EdgeGroupState,
2481}
2482
2483impl EdgeGroupSnapshot {
2484    #[allow(clippy::too_many_arguments)]
2485    pub fn new(
2486        downstream_execution_id: ExecutionId,
2487        policy: EdgeDependencyPolicy,
2488        total_deps: u32,
2489        satisfied_count: u32,
2490        failed_count: u32,
2491        skipped_count: u32,
2492        running_count: u32,
2493        group_state: EdgeGroupState,
2494    ) -> Self {
2495        Self {
2496            downstream_execution_id,
2497            policy,
2498            total_deps,
2499            satisfied_count,
2500            failed_count,
2501            skipped_count,
2502            running_count,
2503            group_state,
2504        }
2505    }
2506}
2507
2508// ─── set_edge_group_policy (RFC-016 §6.1) ───
2509
2510#[derive(Clone, Debug, Serialize, Deserialize)]
2511pub struct SetEdgeGroupPolicyArgs {
2512    pub flow_id: FlowId,
2513    pub downstream_execution_id: ExecutionId,
2514    pub policy: EdgeDependencyPolicy,
2515    pub now: TimestampMs,
2516}
2517
2518#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2519pub enum SetEdgeGroupPolicyResult {
2520    /// Policy stored (fresh write).
2521    Set,
2522    /// Policy already stored with an identical value (idempotent).
2523    AlreadySet,
2524}
2525
2526// ─── list_flows (issue #185) ───
2527
2528/// Typed flow-lifecycle status surfaced on [`FlowSummary`].
2529///
2530/// Mirrors the free-form `public_flow_state` literal that FF's flow
2531/// lifecycle writes onto `flow_core` (known values: `open`, `running`,
2532/// `blocked`, `cancelled`, `completed`, `failed` — see [`FlowSnapshot`]).
2533/// The three "active" runtime states (`open`, `running`, `blocked`)
2534/// collapse to [`FlowStatus::Active`] here — callers that need the
2535/// exact runtime sub-state should use [`FlowSnapshot::public_flow_state`]
2536/// via [`crate::engine_backend::EngineBackend::describe_flow`]. `failed`
2537/// maps to [`FlowStatus::Failed`].
2538///
2539/// `#[non_exhaustive]` so future lifecycle states (if FF introduces
2540/// any) can be added without a semver break.
2541#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
2542#[non_exhaustive]
2543pub enum FlowStatus {
2544    /// `open` / `running` / `blocked` — flow is still live on the engine.
2545    Active,
2546    /// Terminal success: all members reached a successful terminal state
2547    /// and the flow projector flipped `public_flow_state` to `completed`.
2548    Completed,
2549    /// Terminal failure: one or more members failed and the flow
2550    /// projector flipped `public_flow_state` to `failed`.
2551    Failed,
2552    /// Cancelled by an operator via `cancel_flow`.
2553    Cancelled,
2554    /// The stored `public_flow_state` literal is present but not a
2555    /// known value. The raw literal is preserved on
2556    /// [`FlowSnapshot::public_flow_state`] — callers that need to act
2557    /// on it should fall back to [`crate::engine_backend::EngineBackend::describe_flow`].
2558    Unknown,
2559}
2560
2561impl FlowStatus {
2562    /// Map the raw `public_flow_state` literal stored on `flow_core`
2563    /// to a typed [`FlowStatus`]. Unknown literals surface as
2564    /// [`FlowStatus::Unknown`] so the list surface stays forwards-
2565    /// compatible with future engine-side state additions.
2566    pub fn from_public_flow_state(raw: &str) -> Self {
2567        match raw {
2568            "open" | "running" | "blocked" => Self::Active,
2569            "completed" => Self::Completed,
2570            "failed" => Self::Failed,
2571            "cancelled" => Self::Cancelled,
2572            _ => Self::Unknown,
2573        }
2574    }
2575}
2576
2577/// Lightweight per-flow projection returned by
2578/// [`crate::engine_backend::EngineBackend::list_flows`].
2579///
2580/// Deliberately narrower than [`FlowSnapshot`] — listing pages serve
2581/// dashboard-style enumerations where the caller does not want to pay
2582/// for the full `flow_core` hash on every row. Consumers that need
2583/// revision / node-count / tags / cancel metadata should fan out to
2584/// [`crate::engine_backend::EngineBackend::describe_flow`] for the
2585/// specific ids they care about.
2586///
2587/// `#[non_exhaustive]` — FF may add fields in minor releases without
2588/// a semver break. Match with `..` or use [`FlowSummary::new`].
2589#[derive(Clone, Debug, PartialEq, Eq)]
2590#[non_exhaustive]
2591pub struct FlowSummary {
2592    pub flow_id: FlowId,
2593    /// Timestamp (ms since unix epoch) `flow_core.created_at` was
2594    /// stamped. Mirrors [`FlowSnapshot::created_at`]; kept typed so
2595    /// callers that want raw millis can read `.0`.
2596    pub created_at: TimestampMs,
2597    /// Typed projection of `flow_core.public_flow_state`. See
2598    /// [`FlowStatus`] for the mapping.
2599    pub status: FlowStatus,
2600}
2601
2602impl FlowSummary {
2603    /// Construct a [`FlowSummary`]. Present so downstream crates can
2604    /// assemble the struct despite the `#[non_exhaustive]` marker.
2605    pub fn new(flow_id: FlowId, created_at: TimestampMs, status: FlowStatus) -> Self {
2606        Self {
2607            flow_id,
2608            created_at,
2609            status,
2610        }
2611    }
2612}
2613
2614/// One page of [`FlowSummary`] rows returned by
2615/// [`crate::engine_backend::EngineBackend::list_flows`].
2616///
2617/// `next_cursor` is `Some(last_flow_id)` when at least one more row
2618/// may exist on the partition — callers forward it verbatim as the
2619/// next call's `cursor` argument to continue iteration. `None` means
2620/// the listing is exhausted. Cursor semantics match the Postgres
2621/// `WHERE flow_id > $cursor ORDER BY flow_id LIMIT $limit` pattern
2622/// (see the trait method's rustdoc).
2623///
2624/// `#[non_exhaustive]` — FF may add summary-level fields (total count,
2625/// partition hint) in future minor releases without a semver break.
2626#[derive(Clone, Debug, PartialEq, Eq)]
2627#[non_exhaustive]
2628pub struct ListFlowsPage {
2629    pub flows: Vec<FlowSummary>,
2630    pub next_cursor: Option<FlowId>,
2631}
2632
2633impl ListFlowsPage {
2634    /// Construct a [`ListFlowsPage`]. Present so downstream crates can
2635    /// assemble the struct despite the `#[non_exhaustive]` marker.
2636    pub fn new(flows: Vec<FlowSummary>, next_cursor: Option<FlowId>) -> Self {
2637        Self { flows, next_cursor }
2638    }
2639}
2640
2641/// Summary of state after a mutation, returned by many functions.
2642#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2643pub struct StateSummary {
2644    pub state_vector: StateVector,
2645    pub current_attempt_index: AttemptIndex,
2646}
2647
2648#[cfg(test)]
2649mod tests {
2650    use super::*;
2651    use crate::types::FlowId;
2652
2653    #[test]
2654    fn create_execution_args_serde() {
2655        let config = crate::partition::PartitionConfig::default();
2656        let args = CreateExecutionArgs {
2657            execution_id: ExecutionId::for_flow(&FlowId::new(), &config),
2658            namespace: Namespace::new("test"),
2659            lane_id: LaneId::new("default"),
2660            execution_kind: "llm_call".to_owned(),
2661            input_payload: b"hello".to_vec(),
2662            payload_encoding: Some("json".to_owned()),
2663            priority: 0,
2664            creator_identity: "test-user".to_owned(),
2665            idempotency_key: None,
2666            tags: HashMap::new(),
2667            policy: None,
2668            delay_until: None,
2669            execution_deadline_at: None,
2670            partition_id: 42,
2671            now: TimestampMs::now(),
2672        };
2673        let json = serde_json::to_string(&args).unwrap();
2674        let parsed: CreateExecutionArgs = serde_json::from_str(&json).unwrap();
2675        assert_eq!(args.execution_id, parsed.execution_id);
2676    }
2677
2678    #[test]
2679    fn claim_result_serde() {
2680        let config = crate::partition::PartitionConfig::default();
2681        let result = ClaimExecutionResult::Claimed(ClaimedExecution {
2682            execution_id: ExecutionId::for_flow(&FlowId::new(), &config),
2683            lease_id: LeaseId::new(),
2684            lease_epoch: LeaseEpoch::new(1),
2685            attempt_index: AttemptIndex::new(0),
2686            attempt_id: AttemptId::new(),
2687            attempt_type: AttemptType::Initial,
2688            lease_expires_at: TimestampMs::from_millis(1000),
2689        });
2690        let json = serde_json::to_string(&result).unwrap();
2691        let parsed: ClaimExecutionResult = serde_json::from_str(&json).unwrap();
2692        assert_eq!(result, parsed);
2693    }
2694
2695    // ── StreamCursor (issue #92) ──
2696
2697    #[test]
2698    fn stream_cursor_display_matches_wire_tokens() {
2699        assert_eq!(StreamCursor::Start.to_string(), "start");
2700        assert_eq!(StreamCursor::End.to_string(), "end");
2701        assert_eq!(StreamCursor::At("123".into()).to_string(), "123");
2702        assert_eq!(StreamCursor::At("123-4".into()).to_string(), "123-4");
2703    }
2704
2705    #[test]
2706    fn stream_cursor_to_wire_maps_to_valkey_markers() {
2707        assert_eq!(StreamCursor::Start.to_wire(), "-");
2708        assert_eq!(StreamCursor::End.to_wire(), "+");
2709        assert_eq!(StreamCursor::At("0-0".into()).to_wire(), "0-0");
2710        assert_eq!(StreamCursor::At("17-3".into()).to_wire(), "17-3");
2711    }
2712
2713    #[test]
2714    fn stream_cursor_from_str_accepts_wire_tokens() {
2715        use std::str::FromStr;
2716        assert_eq!(
2717            StreamCursor::from_str("start").unwrap(),
2718            StreamCursor::Start
2719        );
2720        assert_eq!(StreamCursor::from_str("end").unwrap(), StreamCursor::End);
2721        assert_eq!(
2722            StreamCursor::from_str("123").unwrap(),
2723            StreamCursor::At("123".into())
2724        );
2725        assert_eq!(
2726            StreamCursor::from_str("0-0").unwrap(),
2727            StreamCursor::At("0-0".into())
2728        );
2729        assert_eq!(
2730            StreamCursor::from_str("1713100800150-0").unwrap(),
2731            StreamCursor::At("1713100800150-0".into())
2732        );
2733    }
2734
2735    #[test]
2736    fn stream_cursor_from_str_rejects_bare_markers() {
2737        use std::str::FromStr;
2738        assert!(matches!(
2739            StreamCursor::from_str("-"),
2740            Err(StreamCursorParseError::BareMarkerRejected(s)) if s == "-"
2741        ));
2742        assert!(matches!(
2743            StreamCursor::from_str("+"),
2744            Err(StreamCursorParseError::BareMarkerRejected(s)) if s == "+"
2745        ));
2746    }
2747
2748    #[test]
2749    fn stream_cursor_from_str_rejects_empty() {
2750        use std::str::FromStr;
2751        assert_eq!(
2752            StreamCursor::from_str(""),
2753            Err(StreamCursorParseError::Empty)
2754        );
2755    }
2756
2757    #[test]
2758    fn stream_cursor_from_str_rejects_malformed() {
2759        use std::str::FromStr;
2760        for bad in [
2761            "abc", "-1", "1-", "-1-2", "1-2-3", "1.2", "1 2", "Start", "END",
2762        ] {
2763            assert!(
2764                matches!(
2765                    StreamCursor::from_str(bad),
2766                    Err(StreamCursorParseError::Malformed(_))
2767                ),
2768                "must reject {bad:?}",
2769            );
2770        }
2771    }
2772
2773    #[test]
2774    fn stream_cursor_from_str_rejects_non_ascii() {
2775        use std::str::FromStr;
2776        assert!(matches!(
2777            StreamCursor::from_str("1\u{2013}2"),
2778            Err(StreamCursorParseError::Malformed(_))
2779        ));
2780    }
2781
2782    #[test]
2783    fn stream_cursor_serde_round_trip() {
2784        for c in [
2785            StreamCursor::Start,
2786            StreamCursor::End,
2787            StreamCursor::At("0-0".into()),
2788            StreamCursor::At("1713100800150-0".into()),
2789        ] {
2790            let json = serde_json::to_string(&c).unwrap();
2791            let back: StreamCursor = serde_json::from_str(&json).unwrap();
2792            assert_eq!(back, c);
2793        }
2794    }
2795
2796    #[test]
2797    fn stream_cursor_serializes_as_bare_string() {
2798        assert_eq!(
2799            serde_json::to_string(&StreamCursor::Start).unwrap(),
2800            r#""start""#
2801        );
2802        assert_eq!(
2803            serde_json::to_string(&StreamCursor::End).unwrap(),
2804            r#""end""#
2805        );
2806        assert_eq!(
2807            serde_json::to_string(&StreamCursor::At("123-0".into())).unwrap(),
2808            r#""123-0""#
2809        );
2810    }
2811
2812    #[test]
2813    fn stream_cursor_deserialize_rejects_bare_markers() {
2814        assert!(serde_json::from_str::<StreamCursor>(r#""-""#).is_err());
2815        assert!(serde_json::from_str::<StreamCursor>(r#""+""#).is_err());
2816    }
2817
2818    #[test]
2819    fn stream_cursor_from_beginning_is_zero_zero() {
2820        assert_eq!(
2821            StreamCursor::from_beginning(),
2822            StreamCursor::At("0-0".into())
2823        );
2824    }
2825
2826    #[test]
2827    fn stream_cursor_is_concrete_classifies_variants() {
2828        assert!(!StreamCursor::Start.is_concrete());
2829        assert!(!StreamCursor::End.is_concrete());
2830        assert!(StreamCursor::At("0-0".into()).is_concrete());
2831        assert!(StreamCursor::At("123-0".into()).is_concrete());
2832        assert!(StreamCursor::from_beginning().is_concrete());
2833    }
2834
2835    #[test]
2836    fn stream_cursor_into_wire_string_moves_without_cloning() {
2837        assert_eq!(StreamCursor::Start.into_wire_string(), "-");
2838        assert_eq!(StreamCursor::End.into_wire_string(), "+");
2839        assert_eq!(StreamCursor::At("17-3".into()).into_wire_string(), "17-3");
2840    }
2841}
2842
2843// ─── list_executions ───
2844
2845/// Summary of an execution for list views.
2846#[derive(Clone, Debug, Serialize, Deserialize)]
2847pub struct ExecutionSummary {
2848    pub execution_id: ExecutionId,
2849    pub namespace: String,
2850    pub lane_id: String,
2851    pub execution_kind: String,
2852    pub public_state: String,
2853    pub priority: i32,
2854    pub created_at: String,
2855}
2856
2857/// Result of a list_executions query.
2858#[derive(Clone, Debug, Serialize, Deserialize)]
2859pub struct ListExecutionsResult {
2860    pub executions: Vec<ExecutionSummary>,
2861    pub total_returned: usize,
2862}
2863
2864// ─── list_lanes (issue #184) ───
2865
2866/// One page of lane ids returned by
2867/// [`crate::engine_backend::EngineBackend::list_lanes`].
2868///
2869/// Lanes are global (not partition-scoped) — the backend enumerates
2870/// every registered lane, sorts by [`LaneId`] name, and returns a
2871/// `limit`-sized slice starting after `cursor` (exclusive).
2872///
2873/// `next_cursor` is `Some(last_lane_in_page)` when more pages remain
2874/// and `None` when the caller has read the final page. Callers that
2875/// want the full list loop until `next_cursor` is `None`, threading
2876/// the previous page's `next_cursor` into the next call's `cursor`
2877/// argument.
2878///
2879/// `#[non_exhaustive]` — FF may add fields (e.g. a `total` hint) in
2880/// minor releases without a semver break.
2881#[derive(Clone, Debug, PartialEq, Eq)]
2882#[non_exhaustive]
2883pub struct ListLanesPage {
2884    /// The lanes in this page, sorted by [`LaneId`] name.
2885    pub lanes: Vec<LaneId>,
2886    /// Cursor for the next page (exclusive). `None` ⇒ final page.
2887    pub next_cursor: Option<LaneId>,
2888}
2889
2890impl ListLanesPage {
2891    /// Construct a [`ListLanesPage`]. Present so downstream crates
2892    /// (ff-backend-valkey's `list_lanes` impl) can assemble the
2893    /// struct despite the `#[non_exhaustive]` marker.
2894    pub fn new(lanes: Vec<LaneId>, next_cursor: Option<LaneId>) -> Self {
2895        Self { lanes, next_cursor }
2896    }
2897}
2898
2899// ─── list_suspended ───
2900
2901/// One entry in a [`ListSuspendedPage`] — a suspended execution and
2902/// the reason it is blocked, answering an operator's "what's this
2903/// waiting on?" without a follow-up round-trip.
2904///
2905/// `reason` carries the free-form `reason_code` recorded by the
2906/// suspending worker at `lua/suspension.lua` (HSET `suspension:current
2907/// reason_code`). It is a `String`, not a closed enum: the suspension
2908/// pipeline accepts arbitrary caller-supplied codes (typical values
2909/// are `"signal"`, `"timer"`, `"children"`, `"join"`, but consumers
2910/// embed bespoke codes). A future enum projection can classify
2911/// known codes once the set is frozen; until then, callers that want
2912/// structured routing MUST match on the string explicitly.
2913#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2914#[non_exhaustive]
2915pub struct SuspendedExecutionEntry {
2916    /// Execution currently in `lifecycle_phase=suspended`.
2917    pub execution_id: ExecutionId,
2918    /// Score stored on the per-lane suspended ZSET — the scheduled
2919    /// `timeout_at` in milliseconds, or the `9999999999999` sentinel
2920    /// when no timeout was set (see `lua/suspension.lua`).
2921    pub suspended_at_ms: i64,
2922    /// Free-form reason code from `suspension:current.reason_code`.
2923    /// Empty string when the suspension hash is absent or does not
2924    /// carry a `reason_code` field (older records). See the struct
2925    /// rustdoc for the deliberate-String rationale.
2926    pub reason: String,
2927}
2928
2929impl SuspendedExecutionEntry {
2930    /// Construct a new entry. Preferred over direct field init for
2931    /// `#[non_exhaustive]` forward-compat.
2932    pub fn new(execution_id: ExecutionId, suspended_at_ms: i64, reason: String) -> Self {
2933        Self {
2934            execution_id,
2935            suspended_at_ms,
2936            reason,
2937        }
2938    }
2939}
2940
2941/// One cursor-paginated page of suspended executions.
2942///
2943/// Pagination is cursor-based (not offset/limit) so a Valkey backend
2944/// can resume a partition scan from the last seen execution id and a
2945/// future Postgres backend can do keyset pagination on
2946/// `executions WHERE state='suspended'`. The cursor is opaque to
2947/// callers: pass `next_cursor` back as the `cursor` argument to the
2948/// next [`EngineBackend::list_suspended`] call to fetch the next
2949/// page. `None` means the stream is exhausted.
2950///
2951/// [`EngineBackend::list_suspended`]: crate::engine_backend::EngineBackend::list_suspended
2952#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2953#[non_exhaustive]
2954pub struct ListSuspendedPage {
2955    /// Entries on this page, ordered by ascending `suspended_at_ms`
2956    /// (timeout order) with `execution_id` as a lex tiebreak.
2957    pub entries: Vec<SuspendedExecutionEntry>,
2958    /// Resume-point for the next page. `None` when no further
2959    /// entries remain in the partition.
2960    pub next_cursor: Option<ExecutionId>,
2961}
2962
2963impl ListSuspendedPage {
2964    /// Construct a new page. Preferred over direct field init for
2965    /// `#[non_exhaustive]` forward-compat.
2966    pub fn new(entries: Vec<SuspendedExecutionEntry>, next_cursor: Option<ExecutionId>) -> Self {
2967        Self {
2968            entries,
2969            next_cursor,
2970        }
2971    }
2972}
2973
2974// ─── list_executions ───
2975
2976/// One page of partition-scoped execution ids returned by
2977/// [`EngineBackend::list_executions`](crate::engine_backend::EngineBackend::list_executions).
2978///
2979/// Pagination is forward-only and cursor-based. `next_cursor` carries
2980/// the last `ExecutionId` emitted in `executions` iff another page is
2981/// available; callers pass that id back as the next call's `cursor`
2982/// (exclusive start). `next_cursor = None` signals end-of-stream.
2983///
2984/// `#[non_exhaustive]` — FF may add fields (e.g. `approximate_total`)
2985/// in minor releases without a semver break. Use
2986/// [`ListExecutionsPage::new`] for cross-crate construction.
2987#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2988#[non_exhaustive]
2989pub struct ListExecutionsPage {
2990    /// Execution ids on this page, in ascending lexicographic order.
2991    pub executions: Vec<ExecutionId>,
2992    /// Exclusive cursor to request the next page. `None` ⇒ no more
2993    /// results.
2994    pub next_cursor: Option<ExecutionId>,
2995}
2996
2997impl ListExecutionsPage {
2998    /// Construct a [`ListExecutionsPage`]. Present so downstream
2999    /// crates can assemble the struct despite the `#[non_exhaustive]`
3000    /// marker.
3001    pub fn new(executions: Vec<ExecutionId>, next_cursor: Option<ExecutionId>) -> Self {
3002        Self { executions, next_cursor }
3003    }
3004}
3005
3006// ─── rotate_waitpoint_hmac_secret ───
3007
3008/// Args for `ff_rotate_waitpoint_hmac_secret`. Rotates the HMAC signing
3009/// kid on ONE partition. Callers fan out across every partition themselves
3010/// (ff-server does the parallel fan-out in `rotate_waitpoint_secret`;
3011/// direct-Valkey consumers mirror the pattern).
3012///
3013/// "now" is derived server-side from `redis.call("TIME")` inside the FCALL
3014/// (consistency with `validate_waitpoint_token` and flow scanners).
3015/// `grace_ms` is a duration, not a clock value, so carrying it from the
3016/// caller is safe.
3017#[derive(Clone, Debug)]
3018pub struct RotateWaitpointHmacSecretArgs {
3019    pub new_kid: String,
3020    pub new_secret_hex: String,
3021    /// Grace window in ms. Must be non-negative. Tokens signed by the
3022    /// outgoing kid remain valid for `grace_ms` after this rotation.
3023    pub grace_ms: u64,
3024}
3025
3026/// Outcome of a single-partition rotation.
3027#[derive(Clone, Debug, PartialEq, Eq)]
3028pub enum RotateWaitpointHmacSecretOutcome {
3029    /// Installed the new kid. `previous_kid` is `None` on bootstrap
3030    /// (no prior `current_kid`). `gc_count` counts expired kids reaped
3031    /// during this rotation.
3032    Rotated {
3033        previous_kid: Option<String>,
3034        new_kid: String,
3035        gc_count: u32,
3036    },
3037    /// Exact replay — same kid + same secret already installed. Safe
3038    /// operator retry; no state change.
3039    Noop { kid: String },
3040}
3041
3042// ─── rotate_waitpoint_hmac_secret_all ───
3043
3044/// Args for [`EngineBackend::rotate_waitpoint_hmac_secret_all`] — the
3045/// cluster-wide / backend-native rotation of the waitpoint HMAC
3046/// signing kid.
3047///
3048/// **v0.7 migration-master Q4:** a single additive trait method
3049/// replaces the per-partition fan-out that direct-Valkey consumers
3050/// hand-rolled via
3051/// [`ff_sdk::admin::rotate_waitpoint_hmac_secret_all_partitions`].
3052/// On Valkey it fans out N FCALLs (one per execution partition);
3053/// on Postgres (Wave 4) it resolves to a single INSERT against the
3054/// global `ff_waitpoint_hmac(kid, secret, rotated_at)` table (no
3055/// partition_id column). Consumers prefer this method for clarity —
3056/// the pre-existing free-fn + per-partition surface stays available
3057/// for backwards compat.
3058///
3059/// `#[non_exhaustive]` with a [`Self::new`] constructor per the
3060/// project memory-rule (unbuildable non_exhaustive types are a dead
3061/// API).
3062#[derive(Clone, Debug)]
3063#[non_exhaustive]
3064pub struct RotateWaitpointHmacSecretAllArgs {
3065    pub new_kid: String,
3066    pub new_secret_hex: String,
3067    /// Grace window in ms for tokens signed by the outgoing kid.
3068    /// Duration (not a clock value), identical to
3069    /// [`RotateWaitpointHmacSecretArgs::grace_ms`].
3070    pub grace_ms: u64,
3071}
3072
3073impl RotateWaitpointHmacSecretAllArgs {
3074    /// Build the args. Keeping the constructor so consumers don't
3075    /// struct-literal past the `#[non_exhaustive]` marker.
3076    pub fn new(
3077        new_kid: impl Into<String>,
3078        new_secret_hex: impl Into<String>,
3079        grace_ms: u64,
3080    ) -> Self {
3081        Self {
3082            new_kid: new_kid.into(),
3083            new_secret_hex: new_secret_hex.into(),
3084            grace_ms,
3085        }
3086    }
3087}
3088
3089/// Per-partition entry of [`RotateWaitpointHmacSecretAllResult`].
3090/// Mirrors [`ff_sdk::admin::PartitionRotationOutcome`] but typed at
3091/// the `ff-core` layer so both Valkey and Postgres backends return
3092/// the same shape without a Postgres→ferriskey dep.
3093///
3094/// On backends with no partition concept (Postgres) the entry list
3095/// has length 1 with `partition = 0` and the outcome of the global
3096/// row write.
3097#[derive(Debug)]
3098#[non_exhaustive]
3099pub struct RotateWaitpointHmacSecretAllEntry {
3100    pub partition: u16,
3101    /// The per-partition (or global) rotation outcome. Per-partition
3102    /// failures are surfaced as inner `Err` so the fan-out can report
3103    /// partial success — matching the existing SDK free-fn contract.
3104    pub result: Result<RotateWaitpointHmacSecretOutcome, crate::engine_error::EngineError>,
3105}
3106
3107impl RotateWaitpointHmacSecretAllEntry {
3108    pub fn new(
3109        partition: u16,
3110        result: Result<RotateWaitpointHmacSecretOutcome, crate::engine_error::EngineError>,
3111    ) -> Self {
3112        Self { partition, result }
3113    }
3114}
3115
3116/// Result of [`EngineBackend::rotate_waitpoint_hmac_secret_all`].
3117///
3118/// The Valkey backend returns one entry per execution partition. The
3119/// Postgres backend (Wave 4) will return a single-entry vec with
3120/// `partition = 0` since the Postgres schema stores one global row
3121/// per kid (Q4 §adjudication). Consumers that want a uniform "did
3122/// ALL rotations succeed?" view inspect each entry's `.result`.
3123#[derive(Debug)]
3124#[non_exhaustive]
3125pub struct RotateWaitpointHmacSecretAllResult {
3126    pub entries: Vec<RotateWaitpointHmacSecretAllEntry>,
3127}
3128
3129impl RotateWaitpointHmacSecretAllResult {
3130    pub fn new(entries: Vec<RotateWaitpointHmacSecretAllEntry>) -> Self {
3131        Self { entries }
3132    }
3133}
3134
3135// ─── seed_waitpoint_hmac_secret (issue #280) ───
3136
3137/// Args for [`EngineBackend::seed_waitpoint_hmac_secret`].
3138///
3139/// Two required fields and no optional knobs, so there is no fluent
3140/// builder — just `new(kid, secret_hex)`. `#[non_exhaustive]` is kept
3141/// (with the paired constructor, per the project memory rule) so
3142/// future additive knobs don't break callers.
3143///
3144/// Boot-time provisioning entry point for fresh deployments — see
3145/// issue #280 for why cairn needed this in addition to
3146/// [`RotateWaitpointHmacSecretAllArgs`]. Unlike rotate, seed is
3147/// idempotent: callers invoke it on every boot and the backend
3148/// decides whether to install.
3149#[derive(Clone, Debug)]
3150#[non_exhaustive]
3151pub struct SeedWaitpointHmacSecretArgs {
3152    pub kid: String,
3153    pub secret_hex: String,
3154}
3155
3156impl SeedWaitpointHmacSecretArgs {
3157    pub fn new(kid: impl Into<String>, secret_hex: impl Into<String>) -> Self {
3158        Self {
3159            kid: kid.into(),
3160            secret_hex: secret_hex.into(),
3161        }
3162    }
3163}
3164
3165/// Result of [`EngineBackend::seed_waitpoint_hmac_secret`].
3166///
3167/// * `Seeded` — the backend had no `current_kid` (or no row in the
3168///   global keystore) and installed `kid` as the active signing kid.
3169/// * `AlreadySeeded` — a row for `kid` is already installed.
3170///   `same_secret` reports whether the stored secret bytes match the
3171///   caller-supplied hex; `false` means the caller should pick a fresh
3172///   kid for rotation rather than silently re-installing under the
3173///   existing kid.
3174#[derive(Clone, Debug, PartialEq, Eq)]
3175#[non_exhaustive]
3176pub enum SeedOutcome {
3177    Seeded { kid: String },
3178    AlreadySeeded { kid: String, same_secret: bool },
3179}
3180
3181// ─── list_waitpoint_hmac_kids ───
3182
3183#[derive(Clone, Debug, PartialEq, Eq)]
3184pub struct ListWaitpointHmacKidsArgs {}
3185
3186/// Snapshot of the waitpoint HMAC keystore on ONE partition.
3187#[derive(Clone, Debug, PartialEq, Eq)]
3188pub struct WaitpointHmacKids {
3189    /// The currently-signing kid. `None` if uninitialized.
3190    pub current_kid: Option<String>,
3191    /// Kids that still validate existing tokens but no longer sign
3192    /// new ones. Order is Lua HGETALL traversal order — callers that
3193    /// need a stable sort should sort by `expires_at_ms`.
3194    pub verifying: Vec<VerifyingKid>,
3195}
3196
3197#[derive(Clone, Debug, PartialEq, Eq)]
3198pub struct VerifyingKid {
3199    pub kid: String,
3200    pub expires_at_ms: i64,
3201}
3202
3203// ═══════════════════════════════════════════════════════════════════════
3204// RFC-013 Stage 1d: EngineBackend::suspend typed args + outcome
3205// ═══════════════════════════════════════════════════════════════════════
3206//
3207// `SuspendExecutionArgs` / `SuspendExecutionResult` above remain the
3208// wire-level Lua-ARGV mirror used by the backend serializer. The types
3209// below are the public trait-surface shapes RFC-013 §2.2–§2.6 specifies.
3210//
3211// Every type in this block is `#[non_exhaustive]` per the RFC §2.2.1
3212// memory-rule compliance note; each gets a constructor so external-crate
3213// consumers can build them without struct-literal access.
3214
3215use crate::backend::WaitpointHmac;
3216
3217/// Partition-scoped idempotency key for retry-safe `EngineBackend::suspend`.
3218///
3219/// See RFC-013 §2.2 — when set on [`SuspendArgs::idempotency_key`], the
3220/// backend dedups the call on `(partition, execution_id, idempotency_key)`
3221/// and a second `suspend` with the same triple returns the first call's
3222/// [`SuspendOutcome`] verbatim. Absent a key, `suspend` is NOT retry-
3223/// idempotent; callers must describe-and-reconcile per §3.1.
3224///
3225/// Follows the `UsageDimensions::dedup_key` pattern — opaque to the
3226/// engine, byte-compared at the partition scope.
3227#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
3228#[serde(transparent)]
3229pub struct IdempotencyKey(String);
3230
3231impl IdempotencyKey {
3232    /// Construct from any stringy input. Empty strings are accepted;
3233    /// the backend treats an empty key as "no dedup" at the serialize
3234    /// step so `Some(IdempotencyKey::new(""))` is functionally the same
3235    /// as `None`.
3236    pub fn new(key: impl Into<String>) -> Self {
3237        Self(key.into())
3238    }
3239
3240    /// Borrow the underlying string.
3241    pub fn as_str(&self) -> &str {
3242        &self.0
3243    }
3244}
3245
3246impl std::fmt::Display for IdempotencyKey {
3247    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3248        f.write_str(&self.0)
3249    }
3250}
3251
3252/// v1 signal-match predicate inside [`ResumeCondition::Single`].
3253///
3254/// RFC-013 §2.4 — `ByName(String)` matches a single concrete signal
3255/// name; `Wildcard` matches any delivered signal. RFC-014 may extend
3256/// (payload predicates, pattern matching) — `#[non_exhaustive]`.
3257#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
3258#[non_exhaustive]
3259pub enum SignalMatcher {
3260    /// Match by exact signal name.
3261    ByName(String),
3262    /// Match any signal delivered to the waitpoint.
3263    Wildcard,
3264}
3265
3266/// Hard cap on composite-condition nesting depth (RFC-014 §5.4
3267/// invariant 4; §5.5 cap rationale). Soft-cap: bumping requires only
3268/// this constant + the cap-rationale paragraph in RFC-014 §5.5 — no
3269/// wire-format change. Keep in sync.
3270pub const MAX_COMPOSITE_DEPTH: usize = 4;
3271
3272/// RFC-013 reserves this enum slot; RFC-014 populates it with the
3273/// concrete composition vocabulary (`AllOf` + `Count`). The enum is
3274/// `#[non_exhaustive]` so RFC-016 or later RFCs may add variants
3275/// (`AnyOf` has been explicitly rejected per RFC-014 §2.3 in favour of
3276/// `Count { n: 1, .. }`; the guard exists for orthogonal future work).
3277#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
3278#[serde(tag = "kind")]
3279#[non_exhaustive]
3280pub enum CompositeBody {
3281    /// All listed sub-conditions must be satisfied. Order-independent.
3282    /// Once satisfied, further signals to member waitpoints are observed
3283    /// but do not re-open satisfaction. RFC-014 §2.1.
3284    AllOf {
3285        members: Vec<ResumeCondition>,
3286    },
3287    /// At least `n` distinct satisfiers (by [`CountKind`]) must match.
3288    /// `matcher` optionally constrains participating signals; `None`
3289    /// lets any signal on any of `waitpoints` count. RFC-014 §2.1.
3290    Count {
3291        n: u32,
3292        count_kind: CountKind,
3293        #[serde(default, skip_serializing_if = "Option::is_none")]
3294        matcher: Option<SignalMatcher>,
3295        waitpoints: Vec<String>,
3296    },
3297}
3298
3299/// How `Count` nodes distinguish satisfiers. RFC-014 §2.1 + §3.2.
3300#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
3301#[non_exhaustive]
3302pub enum CountKind {
3303    /// n distinct `waitpoint_id`s in `waitpoints` must fire.
3304    DistinctWaitpoints,
3305    /// n distinct `signal_id`s across the waitpoint set.
3306    DistinctSignals,
3307    /// n distinct `source_type:source_identity` tuples.
3308    DistinctSources,
3309}
3310
3311impl CompositeBody {
3312    /// `AllOf { members }` constructor (RFC-014 §10.3 SDK surface).
3313    pub fn all_of(members: impl IntoIterator<Item = ResumeCondition>) -> Self {
3314        Self::AllOf {
3315            members: members.into_iter().collect(),
3316        }
3317    }
3318
3319    /// `Count` constructor with explicit kind + waitpoint set.
3320    pub fn count(
3321        n: u32,
3322        count_kind: CountKind,
3323        matcher: Option<SignalMatcher>,
3324        waitpoints: impl IntoIterator<Item = String>,
3325    ) -> Self {
3326        Self::Count {
3327            n,
3328            count_kind,
3329            matcher,
3330            waitpoints: waitpoints.into_iter().collect(),
3331        }
3332    }
3333}
3334
3335/// Declarative resume condition for [`SuspendArgs::resume_condition`].
3336///
3337/// RFC-013 §2.4 — typed replacement for the SDK's former
3338/// `ConditionMatcher` / `resume_condition_json` pair.
3339#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
3340#[non_exhaustive]
3341pub enum ResumeCondition {
3342    /// Single waitpoint-key match with a predicate. `matcher` is
3343    /// evaluated against every signal delivered to `waitpoint_key`.
3344    Single {
3345        waitpoint_key: String,
3346        matcher: SignalMatcher,
3347    },
3348    /// Operator-only resume — no signal satisfies; only an explicit
3349    /// operator resume closes the waitpoint.
3350    OperatorOnly,
3351    /// Pure-timeout suspension. No signal satisfier; the waitpoint
3352    /// resolves only via `timeout_behavior` at `timeout_at`. Requires
3353    /// `SuspendArgs::timeout_at` to be `Some(_)` — otherwise the
3354    /// Rust-side validator rejects as `timeout_only_without_deadline`.
3355    TimeoutOnly,
3356    /// Multi-condition composition; RFC-014 defines the body.
3357    Composite(CompositeBody),
3358}
3359
3360/// RFC-014 §5.1 validation error shape. Emitted by
3361/// [`ResumeCondition::validate_composite`] when a composite fails a
3362/// structural / cardinality invariant at suspend-time, before any Valkey
3363/// call. Carries a human-readable `detail` per §5.1.1.
3364#[derive(Clone, Debug, PartialEq, Eq)]
3365pub struct CompositeValidationError {
3366    pub detail: String,
3367}
3368
3369impl CompositeValidationError {
3370    fn new(detail: impl Into<String>) -> Self {
3371        Self {
3372            detail: detail.into(),
3373        }
3374    }
3375}
3376
3377impl ResumeCondition {
3378    /// RFC-014 §10.3 builder — `AllOf` across N distinct waitpoints,
3379    /// each member a `Single { matcher: Wildcard }` leaf. Canonical
3380    /// Pattern 3 shape for heterogeneous-subsystem "all fired"
3381    /// semantics (e.g. `db-migration-complete` + `cache-warmed` +
3382    /// `feature-flag-set`).
3383    ///
3384    /// Callers that need per-waitpoint matchers should construct the
3385    /// tree directly via
3386    /// [`ResumeCondition::Composite(CompositeBody::all_of(..))`].
3387    pub fn all_of_waitpoints<I, S>(waitpoint_keys: I) -> Self
3388    where
3389        I: IntoIterator<Item = S>,
3390        S: Into<String>,
3391    {
3392        let members: Vec<ResumeCondition> = waitpoint_keys
3393            .into_iter()
3394            .map(|k| ResumeCondition::Single {
3395                waitpoint_key: k.into(),
3396                matcher: SignalMatcher::Wildcard,
3397            })
3398            .collect();
3399        ResumeCondition::Composite(CompositeBody::AllOf { members })
3400    }
3401
3402    /// Collect every distinct `waitpoint_key` the condition targets.
3403    /// Used at suspend-time to validate the condition's wp set against
3404    /// `SuspendArgs.waitpoints` (RFC-014 §5.1 multi-binding cross-
3405    /// check). Order follows tree DFS, de-duplicated preserving first
3406    /// occurrence.
3407    pub fn referenced_waitpoint_keys(&self) -> Vec<String> {
3408        let mut out: Vec<String> = Vec::new();
3409        let mut push = |k: &str| {
3410            if !out.iter().any(|e| e == k) {
3411                out.push(k.to_owned());
3412            }
3413        };
3414        fn walk(cond: &ResumeCondition, push: &mut dyn FnMut(&str)) {
3415            match cond {
3416                ResumeCondition::Single { waitpoint_key, .. } => push(waitpoint_key),
3417                ResumeCondition::Composite(body) => walk_body(body, push),
3418                _ => {}
3419            }
3420        }
3421        fn walk_body(body: &CompositeBody, push: &mut dyn FnMut(&str)) {
3422            match body {
3423                CompositeBody::AllOf { members } => {
3424                    for m in members {
3425                        walk(m, push);
3426                    }
3427                }
3428                CompositeBody::Count { waitpoints, .. } => {
3429                    for w in waitpoints {
3430                        push(w.as_str());
3431                    }
3432                }
3433            }
3434        }
3435        walk(self, &mut push);
3436        out
3437    }
3438
3439    /// Validate RFC-014 structural invariants on a composite condition.
3440    /// Single / OperatorOnly / TimeoutOnly return Ok — they carry no
3441    /// composite body. Checks cover:
3442    /// * `AllOf { members: [] }` — §5.1 `allof_empty_members`
3443    /// * `Count { n: 0 }` — §5.1 `count_n_zero`
3444    /// * `Count { waitpoints: [] }` — §5.1 `count_waitpoints_empty`
3445    /// * `Count { n > waitpoints.len(), DistinctWaitpoints }` — §5.1
3446    ///   `count_exceeds_waitpoint_set`
3447    /// * depth > [`MAX_COMPOSITE_DEPTH`] — §5.1 `condition_depth_exceeded`
3448    pub fn validate_composite(&self) -> Result<(), CompositeValidationError> {
3449        match self {
3450            ResumeCondition::Composite(body) => validate_body(body, 1, ""),
3451            _ => Ok(()),
3452        }
3453    }
3454}
3455
3456fn validate_body(
3457    body: &CompositeBody,
3458    depth: usize,
3459    path: &str,
3460) -> Result<(), CompositeValidationError> {
3461    if depth > MAX_COMPOSITE_DEPTH {
3462        return Err(CompositeValidationError::new(format!(
3463            "depth {} exceeds cap {} at path {}",
3464            depth,
3465            MAX_COMPOSITE_DEPTH,
3466            if path.is_empty() { "<root>" } else { path }
3467        )));
3468    }
3469    match body {
3470        CompositeBody::AllOf { members } => {
3471            if members.is_empty() {
3472                return Err(CompositeValidationError::new(format!(
3473                    "allof_empty_members at path {}",
3474                    if path.is_empty() { "<root>" } else { path }
3475                )));
3476            }
3477            for (i, m) in members.iter().enumerate() {
3478                let child_path = if path.is_empty() {
3479                    format!("members[{i}]")
3480                } else {
3481                    format!("{path}.members[{i}]")
3482                };
3483                if let ResumeCondition::Composite(inner) = m {
3484                    validate_body(inner, depth + 1, &child_path)?;
3485                }
3486                // Leaf `Single` / operator / timeout needs no further
3487                // structural checks — RFC-013 already constrains them.
3488            }
3489            Ok(())
3490        }
3491        CompositeBody::Count {
3492            n,
3493            count_kind,
3494            waitpoints,
3495            ..
3496        } => {
3497            if *n == 0 {
3498                return Err(CompositeValidationError::new(format!(
3499                    "count_n_zero at path {}",
3500                    if path.is_empty() { "<root>" } else { path }
3501                )));
3502            }
3503            if waitpoints.is_empty() {
3504                return Err(CompositeValidationError::new(format!(
3505                    "count_waitpoints_empty at path {}",
3506                    if path.is_empty() { "<root>" } else { path }
3507                )));
3508            }
3509            if matches!(count_kind, CountKind::DistinctWaitpoints)
3510                && (*n as usize) > waitpoints.len()
3511            {
3512                return Err(CompositeValidationError::new(format!(
3513                    "count_exceeds_waitpoint_set: n={} > waitpoints.len()={} at path {}",
3514                    n,
3515                    waitpoints.len(),
3516                    if path.is_empty() { "<root>" } else { path }
3517                )));
3518            }
3519            Ok(())
3520        }
3521    }
3522}
3523
3524/// Where a satisfied suspension routes back to.
3525///
3526/// v1 ships only [`ResumeTarget::Runnable`] — execution returns to
3527/// `runnable` and goes through normal scheduling.
3528#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
3529#[non_exhaustive]
3530pub enum ResumeTarget {
3531    Runnable,
3532}
3533
3534/// Resume-side policy carried alongside [`ResumeCondition`].
3535///
3536/// RFC-013 §2.5 — what happens when the condition is satisfied. Fields
3537/// mirror the `resume_policy_json` the backend serializer writes to Lua
3538/// (RFC-004 §Resume policy fields).
3539#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
3540#[non_exhaustive]
3541pub struct ResumePolicy {
3542    pub resume_target: ResumeTarget,
3543    pub consume_matched_signals: bool,
3544    pub retain_signal_buffer_until_closed: bool,
3545    #[serde(default, skip_serializing_if = "Option::is_none")]
3546    pub resume_delay_ms: Option<u64>,
3547    pub close_waitpoint_on_resume: bool,
3548}
3549
3550impl ResumePolicy {
3551    /// Canonical v1 defaults (RFC-013 §2.2.1):
3552    /// * `resume_target = Runnable`
3553    /// * `consume_matched_signals = true`
3554    /// * `retain_signal_buffer_until_closed = false`
3555    /// * `resume_delay_ms = None`
3556    /// * `close_waitpoint_on_resume = true`
3557    pub fn normal() -> Self {
3558        Self {
3559            resume_target: ResumeTarget::Runnable,
3560            consume_matched_signals: true,
3561            retain_signal_buffer_until_closed: false,
3562            resume_delay_ms: None,
3563            close_waitpoint_on_resume: true,
3564        }
3565    }
3566}
3567
3568/// Timeout behavior at the suspension deadline (RFC-004 §Timeout Behavior).
3569#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
3570#[non_exhaustive]
3571pub enum TimeoutBehavior {
3572    Fail,
3573    Cancel,
3574    Expire,
3575    AutoResumeWithTimeoutSignal,
3576    /// v2 per RFC-004 Implementation Notes; enum slot present for
3577    /// additive RFC-014/RFC-015 landing.
3578    Escalate,
3579}
3580
3581impl TimeoutBehavior {
3582    /// Lua-side string encoding. Matches the wire values Lua's
3583    /// `ff_expire_suspension` matches on.
3584    pub fn as_wire_str(self) -> &'static str {
3585        match self {
3586            Self::Fail => "fail",
3587            Self::Cancel => "cancel",
3588            Self::Expire => "expire",
3589            Self::AutoResumeWithTimeoutSignal => "auto_resume_with_timeout_signal",
3590            Self::Escalate => "escalate",
3591        }
3592    }
3593}
3594
3595/// Reason category for a suspension (RFC-004 §Suspension Reason Categories).
3596#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
3597#[non_exhaustive]
3598pub enum SuspensionReasonCode {
3599    WaitingForSignal,
3600    WaitingForApproval,
3601    WaitingForCallback,
3602    WaitingForToolResult,
3603    WaitingForOperatorReview,
3604    PausedByPolicy,
3605    PausedByBudget,
3606    StepBoundary,
3607    ManualPause,
3608}
3609
3610impl SuspensionReasonCode {
3611    pub fn as_wire_str(self) -> &'static str {
3612        match self {
3613            Self::WaitingForSignal => "waiting_for_signal",
3614            Self::WaitingForApproval => "waiting_for_approval",
3615            Self::WaitingForCallback => "waiting_for_callback",
3616            Self::WaitingForToolResult => "waiting_for_tool_result",
3617            Self::WaitingForOperatorReview => "waiting_for_operator_review",
3618            Self::PausedByPolicy => "paused_by_policy",
3619            Self::PausedByBudget => "paused_by_budget",
3620            Self::StepBoundary => "step_boundary",
3621            Self::ManualPause => "manual_pause",
3622        }
3623    }
3624}
3625
3626/// Who requested the suspension.
3627#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
3628#[non_exhaustive]
3629pub enum SuspensionRequester {
3630    Worker,
3631    Operator,
3632    Policy,
3633    SystemTimeoutPolicy,
3634}
3635
3636impl SuspensionRequester {
3637    pub fn as_wire_str(self) -> &'static str {
3638        match self {
3639            Self::Worker => "worker",
3640            Self::Operator => "operator",
3641            Self::Policy => "policy",
3642            Self::SystemTimeoutPolicy => "system_timeout_policy",
3643        }
3644    }
3645}
3646
3647/// How the waitpoint resource backing a [`SuspendArgs`] is obtained.
3648///
3649/// RFC-013 §2.2 — `Fresh` mints a new waitpoint as part of `suspend`;
3650/// `UsePending` activates a waitpoint previously issued via
3651/// `EngineBackend::create_waitpoint`.
3652#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
3653#[non_exhaustive]
3654pub enum WaitpointBinding {
3655    Fresh {
3656        waitpoint_id: WaitpointId,
3657        waitpoint_key: String,
3658    },
3659    UsePending {
3660        waitpoint_id: WaitpointId,
3661    },
3662}
3663
3664impl WaitpointBinding {
3665    /// Mint a fresh binding with a random `waitpoint_id` (UUID v4) and
3666    /// `waitpoint_key = "wpk:<uuid>"`.
3667    pub fn fresh() -> Self {
3668        let wp_id = WaitpointId::new();
3669        let key = format!("wpk:{wp_id}");
3670        Self::Fresh {
3671            waitpoint_id: wp_id,
3672            waitpoint_key: key,
3673        }
3674    }
3675
3676    /// Construct a `UsePending` binding from a pending waitpoint
3677    /// previously issued by `create_waitpoint`. The HMAC token is
3678    /// resolved Lua-side from the partition's waitpoint hash at
3679    /// `suspend` time (RFC-013 §5.1).
3680    pub fn use_pending(pending: &crate::backend::PendingWaitpoint) -> Self {
3681        Self::UsePending {
3682            waitpoint_id: pending.waitpoint_id.clone(),
3683        }
3684    }
3685}
3686
3687/// Trait-surface input to [`EngineBackend::suspend`] (RFC-013 §2.2 +
3688/// RFC-014 Pattern 3 widening).
3689///
3690/// Built via [`SuspendArgs::new`] + `with_*` setters; direct struct-
3691/// literal construction across crate boundaries is not possible
3692/// (`#[non_exhaustive]`).
3693///
3694/// ## Waitpoints
3695///
3696/// `waitpoints` is a non-empty `Vec<WaitpointBinding>`. The first entry
3697/// is the "primary" binding (accessible via [`primary`](Self::primary))
3698/// and carries the `current_waitpoint_id` written onto `exec_core` for
3699/// operator visibility. Additional entries land in Valkey as their own
3700/// waitpoint hashes / signal streams / HMAC tokens, enabling RFC-014
3701/// Pattern 3 `AllOf { members: [Single{wp1}, Single{wp2}, ...] }` across
3702/// distinct heterogeneous subsystems.
3703///
3704/// [`SuspendArgs::new`] takes exactly the primary binding; call
3705/// [`with_waitpoint`](Self::with_waitpoint) to append further bindings
3706/// (the RFC-014 builder API).
3707#[derive(Clone, Debug, Serialize, Deserialize)]
3708#[non_exhaustive]
3709pub struct SuspendArgs {
3710    pub suspension_id: SuspensionId,
3711    /// RFC-014 Pattern 3: all waitpoint bindings for this suspension.
3712    /// Guaranteed non-empty; `waitpoints[0]` is the primary.
3713    pub waitpoints: Vec<WaitpointBinding>,
3714    pub resume_condition: ResumeCondition,
3715    pub resume_policy: ResumePolicy,
3716    pub reason_code: SuspensionReasonCode,
3717    pub requested_by: SuspensionRequester,
3718    #[serde(default, skip_serializing_if = "Option::is_none")]
3719    pub timeout_at: Option<TimestampMs>,
3720    pub timeout_behavior: TimeoutBehavior,
3721    #[serde(default, skip_serializing_if = "Option::is_none")]
3722    pub continuation_metadata_pointer: Option<String>,
3723    pub now: TimestampMs,
3724    #[serde(default, skip_serializing_if = "Option::is_none")]
3725    pub idempotency_key: Option<IdempotencyKey>,
3726}
3727
3728impl SuspendArgs {
3729    /// Build a minimal `SuspendArgs` for a worker-originated suspension.
3730    ///
3731    /// `waitpoint` becomes the primary binding. Append additional
3732    /// bindings with [`with_waitpoint`](Self::with_waitpoint) (RFC-014
3733    /// Pattern 3) or replace the set with
3734    /// [`with_waitpoints`](Self::with_waitpoints).
3735    ///
3736    /// Defaults: `requested_by = Worker`, `timeout_at = None`,
3737    /// `timeout_behavior = Fail`, `continuation_metadata_pointer = None`,
3738    /// `idempotency_key = None`.
3739    pub fn new(
3740        suspension_id: SuspensionId,
3741        waitpoint: WaitpointBinding,
3742        resume_condition: ResumeCondition,
3743        resume_policy: ResumePolicy,
3744        reason_code: SuspensionReasonCode,
3745        now: TimestampMs,
3746    ) -> Self {
3747        Self {
3748            suspension_id,
3749            waitpoints: vec![waitpoint],
3750            resume_condition,
3751            resume_policy,
3752            reason_code,
3753            requested_by: SuspensionRequester::Worker,
3754            timeout_at: None,
3755            timeout_behavior: TimeoutBehavior::Fail,
3756            continuation_metadata_pointer: None,
3757            now,
3758            idempotency_key: None,
3759        }
3760    }
3761
3762    /// Primary binding — `waitpoints[0]`. Guaranteed present by
3763    /// construction.
3764    pub fn primary(&self) -> &WaitpointBinding {
3765        &self.waitpoints[0]
3766    }
3767
3768    pub fn with_timeout(mut self, at: TimestampMs, behavior: TimeoutBehavior) -> Self {
3769        self.timeout_at = Some(at);
3770        self.timeout_behavior = behavior;
3771        self
3772    }
3773
3774    pub fn with_requester(mut self, requester: SuspensionRequester) -> Self {
3775        self.requested_by = requester;
3776        self
3777    }
3778
3779    pub fn with_continuation_metadata_pointer(mut self, p: String) -> Self {
3780        self.continuation_metadata_pointer = Some(p);
3781        self
3782    }
3783
3784    pub fn with_idempotency_key(mut self, key: IdempotencyKey) -> Self {
3785        self.idempotency_key = Some(key);
3786        self
3787    }
3788
3789    /// RFC-014 Pattern 3 — append a further waitpoint binding to this
3790    /// suspension. Each additional binding yields its own waitpoint
3791    /// hash, signal stream, condition hash and HMAC token in Valkey,
3792    /// but all share the suspension record and composite evaluator
3793    /// under one `suspension:current`.
3794    ///
3795    /// Ordering: the primary (from [`SuspendArgs::new`]) stays at
3796    /// `waitpoints[0]`; subsequent `with_waitpoint` calls append at the
3797    /// tail.
3798    pub fn with_waitpoint(mut self, binding: WaitpointBinding) -> Self {
3799        self.waitpoints.push(binding);
3800        self
3801    }
3802
3803    /// RFC-014 Pattern 3 — replace the full binding vector in one call.
3804    /// Must be non-empty; an empty Vec is a programmer error and will
3805    /// be rejected by the backend's `validate_suspend_args` with
3806    /// `waitpoints_empty`.
3807    pub fn with_waitpoints(mut self, bindings: Vec<WaitpointBinding>) -> Self {
3808        self.waitpoints = bindings;
3809        self
3810    }
3811}
3812
3813/// Shared "what happened on the waitpoint" payload carried in both
3814/// [`SuspendOutcome`] variants.
3815///
3816/// For Pattern 3 (RFC-014) — multi-waitpoint suspensions — the primary
3817/// binding's identity lives at the top level (`waitpoint_id` /
3818/// `waitpoint_key` / `waitpoint_token`) and remaining bindings are
3819/// exposed via `additional_waitpoints`, each carrying its own minted
3820/// HMAC token so external signallers can deliver to any of the N
3821/// waitpoints the suspension is listening on.
3822#[derive(Clone, Debug, PartialEq, Eq)]
3823#[non_exhaustive]
3824pub struct SuspendOutcomeDetails {
3825    pub suspension_id: SuspensionId,
3826    pub waitpoint_id: WaitpointId,
3827    pub waitpoint_key: String,
3828    pub waitpoint_token: WaitpointHmac,
3829    /// RFC-014 Pattern 3 extras (beyond the primary). Empty for
3830    /// single-waitpoint suspensions (patterns 1 + 2); carries one
3831    /// entry per additional binding for Pattern 3.
3832    pub additional_waitpoints: Vec<AdditionalWaitpointBinding>,
3833}
3834
3835/// RFC-014 Pattern 3 — per-binding identity + HMAC token for
3836/// waitpoints beyond the primary. Structure mirrors the top-level
3837/// fields on [`SuspendOutcomeDetails`].
3838#[derive(Clone, Debug, PartialEq, Eq)]
3839#[non_exhaustive]
3840pub struct AdditionalWaitpointBinding {
3841    pub waitpoint_id: WaitpointId,
3842    pub waitpoint_key: String,
3843    pub waitpoint_token: WaitpointHmac,
3844}
3845
3846impl AdditionalWaitpointBinding {
3847    pub fn new(
3848        waitpoint_id: WaitpointId,
3849        waitpoint_key: String,
3850        waitpoint_token: WaitpointHmac,
3851    ) -> Self {
3852        Self {
3853            waitpoint_id,
3854            waitpoint_key,
3855            waitpoint_token,
3856        }
3857    }
3858}
3859
3860impl SuspendOutcomeDetails {
3861    pub fn new(
3862        suspension_id: SuspensionId,
3863        waitpoint_id: WaitpointId,
3864        waitpoint_key: String,
3865        waitpoint_token: WaitpointHmac,
3866    ) -> Self {
3867        Self {
3868            suspension_id,
3869            waitpoint_id,
3870            waitpoint_key,
3871            waitpoint_token,
3872            additional_waitpoints: Vec::new(),
3873        }
3874    }
3875
3876    /// Attach RFC-014 Pattern 3 additional-waitpoint bindings. The
3877    /// primary binding stays at the top-level fields; `extras` lands
3878    /// in [`additional_waitpoints`](Self::additional_waitpoints).
3879    pub fn with_additional_waitpoints(
3880        mut self,
3881        extras: Vec<AdditionalWaitpointBinding>,
3882    ) -> Self {
3883        self.additional_waitpoints = extras;
3884        self
3885    }
3886}
3887
3888/// Trait-surface output from [`EngineBackend::suspend`] (RFC-013 §2.3).
3889///
3890/// Two variants encode the "lease released" vs "lease retained" split.
3891/// See §2.3 for the runtime-enforcement semantics.
3892#[derive(Clone, Debug, PartialEq, Eq)]
3893#[non_exhaustive]
3894pub enum SuspendOutcome {
3895    /// The worker's pre-suspend handle is no longer lease-bearing; a
3896    /// fresh `HandleKind::Suspended` handle supersedes it.
3897    Suspended {
3898        details: SuspendOutcomeDetails,
3899        handle: crate::backend::Handle,
3900    },
3901    /// Buffered signals on a pending waitpoint already satisfied the
3902    /// condition at suspension time; the lease is retained and the
3903    /// caller's pre-suspend handle remains valid.
3904    AlreadySatisfied { details: SuspendOutcomeDetails },
3905}
3906
3907impl SuspendOutcome {
3908    /// Borrow the shared details regardless of variant.
3909    pub fn details(&self) -> &SuspendOutcomeDetails {
3910        match self {
3911            Self::Suspended { details, .. } => details,
3912            Self::AlreadySatisfied { details } => details,
3913        }
3914    }
3915}
3916
3917// `EngineBackend::suspend` type re-exports for `ff_core::backend::*`
3918// consumers. The `backend` module re-exports these below so external
3919// crates can reach them via the idiomatic `ff_core::backend` path that
3920// already sources the other trait-surface types (RFC-013 §9.1).
3921
3922// ─── RFC-017 Stage A — trait-expansion Args/Result types ─────────────
3923//
3924// Per RFC-017 §5.1.1: every struct/enum introduced here is
3925// `#[non_exhaustive]` and ships with a `pub fn new(...)` constructor so
3926// additive field growth post-v0.8 does not force cross-crate churn.
3927
3928// ─── claim_for_worker ───
3929
3930/// Inputs to `EngineBackend::claim_for_worker` (RFC-017 §5, §7). The
3931/// Valkey impl forwards to `ff_scheduler::Scheduler::claim_for_worker`;
3932/// the Postgres impl forwards to its own scheduler module. The trait
3933/// method hides the backend-specific dispatch behind one shape.
3934#[non_exhaustive]
3935#[derive(Clone, Debug)]
3936pub struct ClaimForWorkerArgs {
3937    pub lane_id: LaneId,
3938    pub worker_id: WorkerId,
3939    pub worker_instance_id: WorkerInstanceId,
3940    pub worker_capabilities: std::collections::BTreeSet<String>,
3941    pub grant_ttl_ms: u64,
3942}
3943
3944impl ClaimForWorkerArgs {
3945    /// Required-field constructor. Optional fields today: none — kept
3946    /// for forward-compat so a future optional (e.g. `deadline_ms`)
3947    /// does not break callers using the builder pattern.
3948    pub fn new(
3949        lane_id: LaneId,
3950        worker_id: WorkerId,
3951        worker_instance_id: WorkerInstanceId,
3952        worker_capabilities: std::collections::BTreeSet<String>,
3953        grant_ttl_ms: u64,
3954    ) -> Self {
3955        Self {
3956            lane_id,
3957            worker_id,
3958            worker_instance_id,
3959            worker_capabilities,
3960            grant_ttl_ms,
3961        }
3962    }
3963}
3964
3965/// Outcome of `EngineBackend::claim_for_worker`. `None`-like shape
3966/// modelled as an enum so additive variants (e.g. `BackPressured {
3967/// retry_after_ms }`) do not force a wire break.
3968#[non_exhaustive]
3969#[derive(Clone, Debug, PartialEq, Eq)]
3970pub enum ClaimForWorkerOutcome {
3971    /// No eligible execution on this lane at this scan cycle.
3972    NoWork,
3973    /// Grant issued — worker proceeds to `claim_from_grant`.
3974    Granted(ClaimGrant),
3975}
3976
3977impl ClaimForWorkerOutcome {
3978    /// Build the `NoWork` variant.
3979    pub fn no_work() -> Self {
3980        Self::NoWork
3981    }
3982    /// Build the `Granted` variant.
3983    pub fn granted(grant: ClaimGrant) -> Self {
3984        Self::Granted(grant)
3985    }
3986}
3987
3988// ─── list_pending_waitpoints ───
3989
3990/// Inputs to `EngineBackend::list_pending_waitpoints` (RFC-017 §5, §8).
3991/// Pagination is part of the signature so a flow with 10k pending
3992/// waitpoints cannot force a single-round-trip read regardless of
3993/// backend.
3994#[non_exhaustive]
3995#[derive(Clone, Debug)]
3996pub struct ListPendingWaitpointsArgs {
3997    pub execution_id: ExecutionId,
3998    /// Exclusive cursor — `None` starts from the beginning.
3999    pub after: Option<WaitpointId>,
4000    /// Max page size. `None` → backend default (100). Backend-enforced
4001    /// cap: 1000.
4002    pub limit: Option<u32>,
4003}
4004
4005impl ListPendingWaitpointsArgs {
4006    pub fn new(execution_id: ExecutionId) -> Self {
4007        Self {
4008            execution_id,
4009            after: None,
4010            limit: None,
4011        }
4012    }
4013    pub fn with_after(mut self, after: WaitpointId) -> Self {
4014        self.after = Some(after);
4015        self
4016    }
4017    pub fn with_limit(mut self, limit: u32) -> Self {
4018        self.limit = Some(limit);
4019        self
4020    }
4021}
4022
4023/// Page of pending-waitpoint entries. Stage A preserves the existing
4024/// `PendingWaitpointInfo` shape; the §8 schema rewrite (HMAC
4025/// sanitisation + `(token_kid, token_fingerprint)` additive fields)
4026/// ships in Stage D alongside the HTTP wire-format deprecation.
4027#[non_exhaustive]
4028#[derive(Clone, Debug)]
4029pub struct ListPendingWaitpointsResult {
4030    pub entries: Vec<PendingWaitpointInfo>,
4031    /// Forward-only continuation cursor — `None` signals end-of-stream.
4032    pub next_cursor: Option<WaitpointId>,
4033}
4034
4035impl ListPendingWaitpointsResult {
4036    pub fn new(entries: Vec<PendingWaitpointInfo>) -> Self {
4037        Self {
4038            entries,
4039            next_cursor: None,
4040        }
4041    }
4042    pub fn with_next_cursor(mut self, cursor: WaitpointId) -> Self {
4043        self.next_cursor = Some(cursor);
4044        self
4045    }
4046}
4047
4048// ─── report_usage_admin ───
4049
4050/// Inputs to `EngineBackend::report_usage_admin` (RFC-017 §5 budget+
4051/// quota admin §5, round-1 F4). Admin-path peer of `report_usage` —
4052/// both wrap `ff_report_usage_and_check` on the Valkey side but the
4053/// admin call is worker-less, so it cannot reuse the lease-bound
4054/// `report_usage(&Handle, ...)` signature. `ReportUsageAdminArgs`
4055/// carries the same fields as [`ReportUsageArgs`] without a worker
4056/// handle — kept as a distinct type so future admin-only fields (e.g.
4057/// `actor_identity`, `audit_reason`) don't pollute the worker path.
4058#[non_exhaustive]
4059#[derive(Clone, Debug)]
4060pub struct ReportUsageAdminArgs {
4061    pub dimensions: Vec<String>,
4062    pub deltas: Vec<u64>,
4063    pub dedup_key: Option<String>,
4064    pub now: TimestampMs,
4065}
4066
4067impl ReportUsageAdminArgs {
4068    pub fn new(dimensions: Vec<String>, deltas: Vec<u64>, now: TimestampMs) -> Self {
4069        Self {
4070            dimensions,
4071            deltas,
4072            dedup_key: None,
4073            now,
4074        }
4075    }
4076    pub fn with_dedup_key(mut self, key: String) -> Self {
4077        self.dedup_key = Some(key);
4078        self
4079    }
4080}
4081
4082#[cfg(test)]
4083mod rfc_014_validation_tests {
4084    use super::*;
4085
4086    fn single(wp: &str) -> ResumeCondition {
4087        ResumeCondition::Single {
4088            waitpoint_key: wp.to_owned(),
4089            matcher: SignalMatcher::ByName("x".to_owned()),
4090        }
4091    }
4092
4093    #[test]
4094    fn single_passes_validate() {
4095        assert!(single("wpk:a").validate_composite().is_ok());
4096    }
4097
4098    #[test]
4099    fn allof_empty_members_rejected() {
4100        let c = ResumeCondition::Composite(CompositeBody::AllOf { members: vec![] });
4101        let e = c.validate_composite().unwrap_err();
4102        assert!(e.detail.contains("allof_empty_members"), "{}", e.detail);
4103    }
4104
4105    #[test]
4106    fn count_n_zero_rejected() {
4107        let c = ResumeCondition::Composite(CompositeBody::Count {
4108            n: 0,
4109            count_kind: CountKind::DistinctWaitpoints,
4110            matcher: None,
4111            waitpoints: vec!["wpk:a".to_owned()],
4112        });
4113        let e = c.validate_composite().unwrap_err();
4114        assert!(e.detail.contains("count_n_zero"), "{}", e.detail);
4115    }
4116
4117    #[test]
4118    fn count_waitpoints_empty_rejected() {
4119        let c = ResumeCondition::Composite(CompositeBody::Count {
4120            n: 1,
4121            count_kind: CountKind::DistinctSources,
4122            matcher: None,
4123            waitpoints: vec![],
4124        });
4125        let e = c.validate_composite().unwrap_err();
4126        assert!(e.detail.contains("count_waitpoints_empty"), "{}", e.detail);
4127    }
4128
4129    #[test]
4130    fn count_exceeds_waitpoint_set_rejected_only_for_distinct_waitpoints() {
4131        // n=3, only 2 waitpoints, DistinctWaitpoints → reject.
4132        let c = ResumeCondition::Composite(CompositeBody::Count {
4133            n: 3,
4134            count_kind: CountKind::DistinctWaitpoints,
4135            matcher: None,
4136            waitpoints: vec!["a".into(), "b".into()],
4137        });
4138        let e = c.validate_composite().unwrap_err();
4139        assert!(e.detail.contains("count_exceeds_waitpoint_set"), "{}", e.detail);
4140
4141        // Same cardinality, DistinctSignals → allowed (no upper bound).
4142        let c2 = ResumeCondition::Composite(CompositeBody::Count {
4143            n: 3,
4144            count_kind: CountKind::DistinctSignals,
4145            matcher: None,
4146            waitpoints: vec!["a".into(), "b".into()],
4147        });
4148        assert!(c2.validate_composite().is_ok());
4149    }
4150
4151    #[test]
4152    fn depth_4_accepted_depth_5_rejected() {
4153        // Build Depth-4: AllOf { AllOf { AllOf { AllOf { Single } } } }
4154        let leaf = single("wpk:leaf");
4155        let d4 = ResumeCondition::Composite(CompositeBody::AllOf {
4156            members: vec![ResumeCondition::Composite(CompositeBody::AllOf {
4157                members: vec![ResumeCondition::Composite(CompositeBody::AllOf {
4158                    members: vec![ResumeCondition::Composite(CompositeBody::AllOf {
4159                        members: vec![leaf.clone()],
4160                    })],
4161                })],
4162            })],
4163        });
4164        assert!(d4.validate_composite().is_ok());
4165
4166        // Depth-5 → reject.
4167        let d5 = ResumeCondition::Composite(CompositeBody::AllOf {
4168            members: vec![d4],
4169        });
4170        let e = d5.validate_composite().unwrap_err();
4171        assert!(e.detail.contains("exceeds cap"), "{}", e.detail);
4172    }
4173}