Skip to main content

ff_core/contracts/
mod.rs

1//! Phase 1 function contracts — Args + Result types for each FCALL.
2//!
3//! Each Args struct defines the typed inputs to a Valkey Function.
4//! Each Result enum defines the possible outcomes (success variants + error codes).
5
6pub mod decode;
7
8use crate::policy::ExecutionPolicy;
9use crate::state::{AttemptType, PublicState, StateVector};
10use crate::types::{
11    AttemptId, AttemptIndex, CancelSource, EdgeId, ExecutionId, FlowId, LaneId, LeaseEpoch,
12    LeaseFence, LeaseId, Namespace, SignalId, SuspensionId, TimestampMs, WaitpointId,
13    WaitpointToken, WorkerId, WorkerInstanceId,
14};
15use serde::{Deserialize, Serialize};
16use std::collections::{BTreeMap, BTreeSet, HashMap};
17
18// ─── create_execution ───
19
20#[derive(Clone, Debug, Serialize, Deserialize)]
21pub struct CreateExecutionArgs {
22    pub execution_id: ExecutionId,
23    pub namespace: Namespace,
24    pub lane_id: LaneId,
25    pub execution_kind: String,
26    pub input_payload: Vec<u8>,
27    #[serde(default)]
28    pub payload_encoding: Option<String>,
29    pub priority: i32,
30    pub creator_identity: String,
31    #[serde(default)]
32    pub idempotency_key: Option<String>,
33    #[serde(default)]
34    pub tags: HashMap<String, String>,
35    /// Execution policy (retry, timeout, suspension, routing, etc.).
36    #[serde(default)]
37    pub policy: Option<ExecutionPolicy>,
38    /// If set and in the future, execution starts delayed.
39    #[serde(default)]
40    pub delay_until: Option<TimestampMs>,
41    /// Absolute deadline timestamp (ms). Execution expires if not complete by this time.
42    #[serde(default)]
43    pub execution_deadline_at: Option<TimestampMs>,
44    /// Partition ID (pre-computed).
45    pub partition_id: u16,
46    pub now: TimestampMs,
47}
48
49#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
50pub enum CreateExecutionResult {
51    /// Execution created successfully.
52    Created {
53        execution_id: ExecutionId,
54        public_state: PublicState,
55    },
56    /// Idempotent duplicate — existing execution returned.
57    Duplicate { execution_id: ExecutionId },
58}
59
60// ─── issue_claim_grant ───
61
62#[derive(Clone, Debug, Serialize, Deserialize)]
63pub struct IssueClaimGrantArgs {
64    pub execution_id: ExecutionId,
65    pub lane_id: LaneId,
66    pub worker_id: WorkerId,
67    pub worker_instance_id: WorkerInstanceId,
68    #[serde(default)]
69    pub capability_hash: Option<String>,
70    #[serde(default)]
71    pub route_snapshot_json: Option<String>,
72    #[serde(default)]
73    pub admission_summary: Option<String>,
74    /// Capabilities this worker advertises. Serialized as a sorted,
75    /// comma-separated string to the Lua FCALL (see scheduling.lua
76    /// ff_issue_claim_grant). An empty set matches only executions whose
77    /// `required_capabilities` is also empty.
78    #[serde(default)]
79    pub worker_capabilities: BTreeSet<String>,
80    pub grant_ttl_ms: u64,
81    /// Caller-side timestamp for bookkeeping. NOT passed to the Lua FCALL —
82    /// ff_issue_claim_grant uses `redis.call("TIME")` for grant_expires_at.
83    pub now: TimestampMs,
84}
85
86#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
87pub enum IssueClaimGrantResult {
88    /// Grant issued.
89    Granted { execution_id: ExecutionId },
90}
91
92/// A claim grant issued by the scheduler for a specific execution.
93///
94/// The worker uses this to call `ff_claim_execution` (or
95/// `ff_acquire_lease`), which atomically consumes the grant and
96/// creates the lease.
97///
98/// Shared wire-level type between `ff-scheduler` (issuer) and
99/// `ff-sdk` (consumer, via `FlowFabricWorker::claim_from_grant`).
100/// Lives in `ff-core` so neither crate needs a dep on the other.
101///
102/// **Lane asymmetry with [`ReclaimGrant`]:** `ClaimGrant` does NOT
103/// carry `lane_id`. The issuing scheduler's caller already picked
104/// a lane (that's how admission reached this grant) and passes it
105/// through to `claim_from_grant` as a separate argument. The grant
106/// handle stays narrow to what uniquely identifies the admission
107/// decision. The matching field on [`ReclaimGrant`] is an
108/// intentional divergence — see the note on that type.
109#[derive(Clone, Debug, PartialEq, Eq)]
110pub struct ClaimGrant {
111    /// The execution that was granted.
112    pub execution_id: ExecutionId,
113    /// Opaque partition handle for this execution's hash-tag slot.
114    ///
115    /// Public wire type: consumers pass it back to FlowFabric but
116    /// must not parse the interior hash tag for routing decisions.
117    /// Internal consumers that need the typed
118    /// [`crate::partition::Partition`] call [`Self::partition`].
119    pub partition_key: crate::partition::PartitionKey,
120    /// The Valkey key holding the grant hash (for the worker to
121    /// reference).
122    pub grant_key: String,
123    /// When the grant expires if not consumed.
124    pub expires_at_ms: u64,
125}
126
127impl ClaimGrant {
128    /// Parse `partition_key` into a typed
129    /// [`crate::partition::Partition`]. Intended for internal
130    /// consumers (scheduler emitter, SDK worker claim path) that
131    /// need the family/index pair. Fails only on malformed keys
132    /// (which indicates a producer bug).
133    ///
134    /// Alias collapse applies: a grant issued against `Execution`
135    /// family round-trips to `Flow` (see [`crate::partition::PartitionKey`]
136    /// for the rationale — routing is preserved, only the metadata
137    /// family label normalises).
138    pub fn partition(
139        &self,
140    ) -> Result<crate::partition::Partition, crate::partition::PartitionKeyParseError> {
141        self.partition_key.parse()
142    }
143}
144
145/// A reclaim grant issued for a resumed (attempt_interrupted) execution.
146///
147/// Issued by a producer (typically `ff-scheduler` once a Batch-C
148/// reclaim scanner is in place; test fixtures in the interim — no
149/// production Rust caller exists in-tree today). Consumed by
150/// [`FlowFabricWorker::claim_from_reclaim_grant`], which calls
151/// `ff_claim_resumed_execution` atomically: that FCALL validates the
152/// grant, consumes it, and transitions `attempt_interrupted` →
153/// `started` while preserving the existing `attempt_index` +
154/// `attempt_id` (a resumed execution re-uses its attempt; it does
155/// not start a new one).
156///
157/// Mirrors [`ClaimGrant`] for the resume path. Differences:
158///
159///   * [`ClaimGrant`] is issued against a freshly-eligible
160///     execution and `ff_claim_execution` creates a new attempt.
161///   * [`ReclaimGrant`] is issued against an `attempt_interrupted`
162///     execution; `ff_claim_resumed_execution` re-uses the existing
163///     attempt and bumps the lease epoch.
164///
165/// The grant itself is written to the same `claim_grant` Valkey key
166/// that [`ClaimGrant`] uses; the distinction is which Lua FCALL
167/// consumes it (`ff_claim_execution` for new attempts,
168/// `ff_claim_resumed_execution` for resumes).
169///
170/// **Lane asymmetry with [`ClaimGrant`]:** `ReclaimGrant` CARRIES
171/// `lane_id` as a field. The issuing path already knows the lane
172/// (it's read from `exec_core` at grant time); carrying it here
173/// spares the consumer a `HGET exec_core lane_id` round trip on
174/// the hot claim path. The asymmetry is intentional — prefer
175/// one-fewer-HGET on a type that already lives with the resumer's
176/// lifecycle over strict handle symmetry with `ClaimGrant`.
177///
178/// Shared wire-level type between the eventual `ff-scheduler`
179/// producer (Batch-C reclaim scanner — not yet in-tree; test
180/// fixtures construct this type today) and `ff-sdk` (consumer, via
181/// `FlowFabricWorker::claim_from_reclaim_grant`). Lives in
182/// `ff-core` so neither crate needs a dep on the other.
183///
184/// [`FlowFabricWorker::claim_from_reclaim_grant`]: https://docs.rs/ff-sdk
185#[derive(Clone, Debug, PartialEq, Eq)]
186pub struct ReclaimGrant {
187    /// The execution granted for resumption.
188    pub execution_id: ExecutionId,
189    /// Opaque partition handle for this execution's hash-tag slot.
190    ///
191    /// Same wire-opacity contract as [`ClaimGrant::partition_key`].
192    /// Internal consumers call [`Self::partition`] for the parsed
193    /// form.
194    pub partition_key: crate::partition::PartitionKey,
195    /// Valkey key of the grant hash — same key shape as
196    /// [`ClaimGrant`].
197    pub grant_key: String,
198    /// Monotonic ms when the grant expires; unconsumed grants
199    /// vanish.
200    pub expires_at_ms: u64,
201    /// Lane the execution belongs to. Needed by
202    /// `ff_claim_resumed_execution` for `KEYS[3]` (eligible_zset)
203    /// and `KEYS[9]` (active_index).
204    pub lane_id: LaneId,
205}
206
207impl ReclaimGrant {
208    /// Parse `partition_key` into a typed
209    /// [`crate::partition::Partition`]. See [`ClaimGrant::partition`]
210    /// for the alias-collapse note.
211    pub fn partition(
212        &self,
213    ) -> Result<crate::partition::Partition, crate::partition::PartitionKeyParseError> {
214        self.partition_key.parse()
215    }
216}
217
218// ─── claim_execution ───
219
220#[derive(Clone, Debug, Serialize, Deserialize)]
221pub struct ClaimExecutionArgs {
222    pub execution_id: ExecutionId,
223    pub worker_id: WorkerId,
224    pub worker_instance_id: WorkerInstanceId,
225    pub lane_id: LaneId,
226    pub lease_id: LeaseId,
227    pub lease_ttl_ms: u64,
228    pub attempt_id: AttemptId,
229    /// Expected attempt index (pre-read from exec_core.total_attempt_count).
230    /// Used for KEYS construction — must match what the Lua computes.
231    pub expected_attempt_index: AttemptIndex,
232    /// JSON-encoded attempt policy snapshot.
233    #[serde(default)]
234    pub attempt_policy_json: String,
235    /// Per-attempt timeout in ms.
236    #[serde(default)]
237    pub attempt_timeout_ms: Option<u64>,
238    /// Total execution deadline (absolute timestamp ms).
239    #[serde(default)]
240    pub execution_deadline_at: Option<i64>,
241    pub now: TimestampMs,
242}
243
244#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
245pub struct ClaimedExecution {
246    pub execution_id: ExecutionId,
247    pub lease_id: LeaseId,
248    pub lease_epoch: LeaseEpoch,
249    pub attempt_index: AttemptIndex,
250    pub attempt_id: AttemptId,
251    pub attempt_type: AttemptType,
252    pub lease_expires_at: TimestampMs,
253}
254
255#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
256pub enum ClaimExecutionResult {
257    /// Successfully claimed.
258    Claimed(ClaimedExecution),
259}
260
261// ─── complete_execution ───
262
263#[derive(Clone, Debug, Serialize, Deserialize)]
264pub struct CompleteExecutionArgs {
265    pub execution_id: ExecutionId,
266    /// RFC #58.5 — fence triple. `Some` for SDK worker paths (standard
267    /// stale-lease fence). `None` for operator overrides, in which case
268    /// `source` must be `CancelSource::OperatorOverride` or the Lua
269    /// returns `fence_required`.
270    #[serde(default)]
271    pub fence: Option<LeaseFence>,
272    pub attempt_index: AttemptIndex,
273    #[serde(default)]
274    pub result_payload: Option<Vec<u8>>,
275    #[serde(default)]
276    pub result_encoding: Option<String>,
277    /// RFC #58.5 — unfenced-call gate. Ignored when `fence` is `Some`.
278    #[serde(default)]
279    pub source: CancelSource,
280    pub now: TimestampMs,
281}
282
283#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
284pub enum CompleteExecutionResult {
285    /// Execution completed successfully.
286    Completed {
287        execution_id: ExecutionId,
288        public_state: PublicState,
289    },
290}
291
292// ─── renew_lease ───
293
294#[derive(Clone, Debug, Serialize, Deserialize)]
295pub struct RenewLeaseArgs {
296    pub execution_id: ExecutionId,
297    pub attempt_index: AttemptIndex,
298    /// RFC #58.5 — fence triple. Required (no operator override path for
299    /// renew). `None` returns `fence_required`.
300    pub fence: Option<LeaseFence>,
301    /// How long to extend the lease (milliseconds).
302    pub lease_ttl_ms: u64,
303    /// Grace period after lease_expires_at before the lease_current key is auto-deleted.
304    #[serde(default = "default_lease_history_grace_ms")]
305    pub lease_history_grace_ms: u64,
306}
307
308fn default_lease_history_grace_ms() -> u64 {
309    60_000
310}
311
312#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
313pub enum RenewLeaseResult {
314    /// Lease renewed.
315    Renewed { expires_at: TimestampMs },
316}
317
318// ─── mark_lease_expired_if_due ───
319
320#[derive(Clone, Debug, Serialize, Deserialize)]
321pub struct MarkLeaseExpiredArgs {
322    pub execution_id: ExecutionId,
323}
324
325#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
326pub enum MarkLeaseExpiredResult {
327    /// Lease was marked as expired.
328    MarkedExpired,
329    /// No action needed (already expired, not yet due, not active, etc.).
330    AlreadySatisfied { reason: String },
331}
332
333// ─── cancel_execution ───
334
335#[derive(Clone, Debug, Serialize, Deserialize)]
336pub struct CancelExecutionArgs {
337    pub execution_id: ExecutionId,
338    pub reason: String,
339    #[serde(default)]
340    pub source: CancelSource,
341    /// Required if not operator_override and execution is active.
342    #[serde(default)]
343    pub lease_id: Option<LeaseId>,
344    #[serde(default)]
345    pub lease_epoch: Option<LeaseEpoch>,
346    /// Required if not operator_override and execution is active.
347    #[serde(default)]
348    pub attempt_id: Option<AttemptId>,
349    pub now: TimestampMs,
350}
351
352#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
353pub enum CancelExecutionResult {
354    /// Execution cancelled.
355    Cancelled {
356        execution_id: ExecutionId,
357        public_state: PublicState,
358    },
359}
360
361// ─── revoke_lease ───
362
363#[derive(Clone, Debug, Serialize, Deserialize)]
364pub struct RevokeLeaseArgs {
365    pub execution_id: ExecutionId,
366    /// If set, only revoke if this matches the current lease. Empty string skips check.
367    #[serde(default)]
368    pub expected_lease_id: Option<String>,
369    /// Worker instance whose lease set to clean up. Read from exec_core before calling.
370    pub worker_instance_id: WorkerInstanceId,
371    pub reason: String,
372}
373
374#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
375pub enum RevokeLeaseResult {
376    /// Lease revoked.
377    Revoked {
378        lease_id: String,
379        lease_epoch: String,
380    },
381    /// Already revoked or expired — no action needed.
382    AlreadySatisfied { reason: String },
383}
384
385// ─── delay_execution ───
386
387#[derive(Clone, Debug, Serialize, Deserialize)]
388pub struct DelayExecutionArgs {
389    pub execution_id: ExecutionId,
390    /// RFC #58.5 — fence triple. `None` requires `source ==
391    /// CancelSource::OperatorOverride`.
392    #[serde(default)]
393    pub fence: Option<LeaseFence>,
394    pub attempt_index: AttemptIndex,
395    pub delay_until: TimestampMs,
396    #[serde(default)]
397    pub source: CancelSource,
398    pub now: TimestampMs,
399}
400
401#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
402pub enum DelayExecutionResult {
403    /// Execution delayed.
404    Delayed {
405        execution_id: ExecutionId,
406        public_state: PublicState,
407    },
408}
409
410// ─── move_to_waiting_children ───
411
412#[derive(Clone, Debug, Serialize, Deserialize)]
413pub struct MoveToWaitingChildrenArgs {
414    pub execution_id: ExecutionId,
415    /// RFC #58.5 — fence triple. `None` requires `source ==
416    /// CancelSource::OperatorOverride`.
417    #[serde(default)]
418    pub fence: Option<LeaseFence>,
419    pub attempt_index: AttemptIndex,
420    #[serde(default)]
421    pub source: CancelSource,
422    pub now: TimestampMs,
423}
424
425#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
426pub enum MoveToWaitingChildrenResult {
427    /// Moved to waiting children.
428    Moved {
429        execution_id: ExecutionId,
430        public_state: PublicState,
431    },
432}
433
434// ─── change_priority ───
435
436#[derive(Clone, Debug, Serialize, Deserialize)]
437pub struct ChangePriorityArgs {
438    pub execution_id: ExecutionId,
439    pub new_priority: i32,
440    pub lane_id: LaneId,
441    pub now: TimestampMs,
442}
443
444#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
445pub enum ChangePriorityResult {
446    /// Priority changed and re-scored.
447    Changed { execution_id: ExecutionId },
448}
449
450// ─── update_progress ───
451
452#[derive(Clone, Debug, Serialize, Deserialize)]
453pub struct UpdateProgressArgs {
454    pub execution_id: ExecutionId,
455    pub lease_id: LeaseId,
456    pub lease_epoch: LeaseEpoch,
457    pub attempt_id: AttemptId,
458    #[serde(default)]
459    pub progress_pct: Option<u8>,
460    #[serde(default)]
461    pub progress_message: Option<String>,
462    pub now: TimestampMs,
463}
464
465#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
466pub enum UpdateProgressResult {
467    /// Progress updated.
468    Updated,
469}
470
471// ═══════════════════════════════════════════════════════════════════════
472// Phase 2 contracts: fail, reclaim, expire
473// ═══════════════════════════════════════════════════════════════════════
474
475// ─── fail_execution ───
476
477#[derive(Clone, Debug, Serialize, Deserialize)]
478pub struct FailExecutionArgs {
479    pub execution_id: ExecutionId,
480    /// RFC #58.5 — fence triple. `None` requires `source ==
481    /// CancelSource::OperatorOverride`.
482    #[serde(default)]
483    pub fence: Option<LeaseFence>,
484    pub attempt_index: AttemptIndex,
485    pub failure_reason: String,
486    pub failure_category: String,
487    /// JSON-encoded retry policy (from execution policy). Empty = no retries.
488    #[serde(default)]
489    pub retry_policy_json: String,
490    /// JSON-encoded attempt policy for the next retry attempt.
491    #[serde(default)]
492    pub next_attempt_policy_json: String,
493    #[serde(default)]
494    pub source: CancelSource,
495}
496
497/// Outcome of a fail_execution call.
498#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
499pub enum FailExecutionResult {
500    /// Retry was scheduled — execution is delayed with backoff.
501    RetryScheduled {
502        delay_until: TimestampMs,
503        next_attempt_index: AttemptIndex,
504    },
505    /// No retries left — execution is terminal failed.
506    TerminalFailed,
507}
508
509// ─── issue_reclaim_grant ───
510
511#[derive(Clone, Debug, Serialize, Deserialize)]
512pub struct IssueReclaimGrantArgs {
513    pub execution_id: ExecutionId,
514    pub worker_id: WorkerId,
515    pub worker_instance_id: WorkerInstanceId,
516    pub lane_id: LaneId,
517    #[serde(default)]
518    pub capability_hash: Option<String>,
519    pub grant_ttl_ms: u64,
520    #[serde(default)]
521    pub route_snapshot_json: Option<String>,
522    #[serde(default)]
523    pub admission_summary: Option<String>,
524    /// Caller-side timestamp for bookkeeping. NOT passed to the Lua FCALL —
525    /// ff_issue_reclaim_grant uses `redis.call("TIME")` for grant_expires_at
526    /// (same as ff_issue_claim_grant). Kept for contract symmetry with
527    /// IssueClaimGrantArgs and scheduler audit logging.
528    pub now: TimestampMs,
529}
530
531#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
532pub enum IssueReclaimGrantResult {
533    /// Reclaim grant issued.
534    Granted { expires_at_ms: TimestampMs },
535}
536
537// ─── reclaim_execution ───
538
539#[derive(Clone, Debug, Serialize, Deserialize)]
540pub struct ReclaimExecutionArgs {
541    pub execution_id: ExecutionId,
542    pub worker_id: WorkerId,
543    pub worker_instance_id: WorkerInstanceId,
544    pub lane_id: LaneId,
545    #[serde(default)]
546    pub capability_hash: Option<String>,
547    pub lease_id: LeaseId,
548    pub lease_ttl_ms: u64,
549    pub attempt_id: AttemptId,
550    /// JSON-encoded attempt policy for the reclaim attempt.
551    #[serde(default)]
552    pub attempt_policy_json: String,
553    /// Maximum reclaim count before terminal failure. Default: 100.
554    #[serde(default = "default_max_reclaim_count")]
555    pub max_reclaim_count: u32,
556    /// Old worker instance (for old_worker_leases key construction).
557    pub old_worker_instance_id: WorkerInstanceId,
558    /// Current attempt index (for old_attempt/old_stream_meta key construction).
559    pub current_attempt_index: AttemptIndex,
560}
561
562fn default_max_reclaim_count() -> u32 {
563    100
564}
565
566#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
567pub enum ReclaimExecutionResult {
568    /// Execution reclaimed — new attempt + new lease.
569    Reclaimed {
570        new_attempt_index: AttemptIndex,
571        new_attempt_id: AttemptId,
572        new_lease_id: LeaseId,
573        new_lease_epoch: LeaseEpoch,
574        lease_expires_at: TimestampMs,
575    },
576    /// Max reclaims exceeded — execution moved to terminal.
577    MaxReclaimsExceeded,
578}
579
580// ─── expire_execution ───
581
582#[derive(Clone, Debug, Serialize, Deserialize)]
583pub struct ExpireExecutionArgs {
584    pub execution_id: ExecutionId,
585    /// "attempt_timeout" or "execution_deadline"
586    pub expire_reason: String,
587}
588
589#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
590pub enum ExpireExecutionResult {
591    /// Execution expired.
592    Expired { execution_id: ExecutionId },
593    /// Already terminal — no-op.
594    AlreadyTerminal,
595}
596
597// ═══════════════════════════════════════════════════════════════════════
598// Phase 3 contracts: suspend, signal, resume, waitpoint
599// ═══════════════════════════════════════════════════════════════════════
600
601// ─── suspend_execution ───
602
603#[derive(Clone, Debug, Serialize, Deserialize)]
604pub struct SuspendExecutionArgs {
605    pub execution_id: ExecutionId,
606    /// RFC #58.5 — fence triple. Required (no operator override path for
607    /// suspend). `None` returns `fence_required`.
608    pub fence: Option<LeaseFence>,
609    pub attempt_index: AttemptIndex,
610    pub suspension_id: SuspensionId,
611    pub waitpoint_id: WaitpointId,
612    pub waitpoint_key: String,
613    pub reason_code: String,
614    pub requested_by: String,
615    pub resume_condition_json: String,
616    pub resume_policy_json: String,
617    #[serde(default)]
618    pub continuation_metadata_pointer: Option<String>,
619    #[serde(default)]
620    pub timeout_at: Option<TimestampMs>,
621    /// true to activate a pending waitpoint, false to create new.
622    #[serde(default)]
623    pub use_pending_waitpoint: bool,
624    /// Timeout behavior: "fail", "cancel", "expire", "auto_resume", "escalate".
625    #[serde(default = "default_timeout_behavior")]
626    pub timeout_behavior: String,
627}
628
629fn default_timeout_behavior() -> String {
630    "fail".to_owned()
631}
632
633#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
634pub enum SuspendExecutionResult {
635    /// Execution suspended, waitpoint active.
636    Suspended {
637        suspension_id: SuspensionId,
638        waitpoint_id: WaitpointId,
639        waitpoint_key: String,
640        /// HMAC-SHA1 token bound to (waitpoint_id, waitpoint_key, created_at).
641        /// Required by signal-delivery callers to authenticate against this
642        /// waitpoint (RFC-004 §Waitpoint Security).
643        waitpoint_token: WaitpointToken,
644    },
645    /// Buffered signals already satisfied the condition — suspension skipped.
646    /// Lease is still held. Token comes from the pending waitpoint record.
647    AlreadySatisfied {
648        suspension_id: SuspensionId,
649        waitpoint_id: WaitpointId,
650        waitpoint_key: String,
651        waitpoint_token: WaitpointToken,
652    },
653}
654
655// ─── resume_execution ───
656
657#[derive(Clone, Debug, Serialize, Deserialize)]
658pub struct ResumeExecutionArgs {
659    pub execution_id: ExecutionId,
660    /// "signal", "operator", "auto_resume"
661    #[serde(default = "default_trigger_type")]
662    pub trigger_type: String,
663    /// Optional delay before becoming eligible (ms).
664    #[serde(default)]
665    pub resume_delay_ms: u64,
666}
667
668fn default_trigger_type() -> String {
669    "signal".to_owned()
670}
671
672#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
673pub enum ResumeExecutionResult {
674    /// Execution resumed to runnable.
675    Resumed { public_state: PublicState },
676}
677
678// ─── create_pending_waitpoint ───
679
680#[derive(Clone, Debug, Serialize, Deserialize)]
681pub struct CreatePendingWaitpointArgs {
682    pub execution_id: ExecutionId,
683    pub lease_id: LeaseId,
684    pub lease_epoch: LeaseEpoch,
685    pub attempt_index: AttemptIndex,
686    pub attempt_id: AttemptId,
687    pub waitpoint_id: WaitpointId,
688    pub waitpoint_key: String,
689    /// Short expiry for the pending waitpoint (ms).
690    pub expires_in_ms: u64,
691}
692
693#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
694pub enum CreatePendingWaitpointResult {
695    /// Pending waitpoint created.
696    Created {
697        waitpoint_id: WaitpointId,
698        waitpoint_key: String,
699        /// HMAC-SHA1 token bound to the pending waitpoint. Required for
700        /// `buffer_signal_for_pending_waitpoint` and carried forward when
701        /// the waitpoint is activated by `suspend_execution`.
702        waitpoint_token: WaitpointToken,
703    },
704}
705
706// ─── close_waitpoint ───
707
708#[derive(Clone, Debug, Serialize, Deserialize)]
709pub struct CloseWaitpointArgs {
710    pub waitpoint_id: WaitpointId,
711    pub reason: String,
712}
713
714#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
715pub enum CloseWaitpointResult {
716    /// Waitpoint closed.
717    Closed,
718}
719
720// ─── deliver_signal ───
721
722#[derive(Clone, Debug, Serialize, Deserialize)]
723pub struct DeliverSignalArgs {
724    pub execution_id: ExecutionId,
725    pub waitpoint_id: WaitpointId,
726    pub signal_id: SignalId,
727    pub signal_name: String,
728    pub signal_category: String,
729    pub source_type: String,
730    pub source_identity: String,
731    #[serde(default)]
732    pub payload: Option<Vec<u8>>,
733    #[serde(default)]
734    pub payload_encoding: Option<String>,
735    #[serde(default)]
736    pub correlation_id: Option<String>,
737    #[serde(default)]
738    pub idempotency_key: Option<String>,
739    pub target_scope: String,
740    #[serde(default)]
741    pub created_at: Option<TimestampMs>,
742    /// Dedup TTL for idempotency key (ms).
743    #[serde(default)]
744    pub dedup_ttl_ms: Option<u64>,
745    /// Resume delay after signal satisfaction (ms).
746    #[serde(default)]
747    pub resume_delay_ms: Option<u64>,
748    /// Max signals per execution (default 10000).
749    #[serde(default)]
750    pub max_signals_per_execution: Option<u64>,
751    /// MAXLEN for the waitpoint signal stream.
752    #[serde(default)]
753    pub signal_maxlen: Option<u64>,
754    /// HMAC-SHA1 token issued when the waitpoint was created. Required for
755    /// signal delivery; missing/tampered/rotated-past-grace tokens are
756    /// rejected with `invalid_token` or `token_expired` (RFC-004).
757    ///
758    /// Defense-in-depth: `WaitpointToken` is a transparent string newtype,
759    /// so an empty string deserializes successfully from JSON. The
760    /// validation boundary is in Lua (`validate_waitpoint_token` returns
761    /// `missing_token` on empty input); this type intentionally does NOT
762    /// pre-reject at the Rust layer so callers get a consistent typed
763    /// error regardless of how they constructed the args.
764    pub waitpoint_token: WaitpointToken,
765    pub now: TimestampMs,
766}
767
768#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
769pub enum DeliverSignalResult {
770    /// Signal accepted with the given effect.
771    Accepted { signal_id: SignalId, effect: String },
772    /// Duplicate signal (idempotency key matched).
773    Duplicate { existing_signal_id: SignalId },
774}
775
776// ─── buffer_signal_for_pending_waitpoint ───
777
778#[derive(Clone, Debug, Serialize, Deserialize)]
779pub struct BufferSignalArgs {
780    pub execution_id: ExecutionId,
781    pub waitpoint_id: WaitpointId,
782    pub signal_id: SignalId,
783    pub signal_name: String,
784    pub signal_category: String,
785    pub source_type: String,
786    pub source_identity: String,
787    #[serde(default)]
788    pub payload: Option<Vec<u8>>,
789    #[serde(default)]
790    pub payload_encoding: Option<String>,
791    #[serde(default)]
792    pub idempotency_key: Option<String>,
793    pub target_scope: String,
794    /// HMAC-SHA1 token issued when `create_pending_waitpoint` ran. Required
795    /// to authenticate early signals targeting the pending waitpoint.
796    pub waitpoint_token: WaitpointToken,
797    pub now: TimestampMs,
798}
799
800#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
801pub enum BufferSignalResult {
802    /// Signal buffered for pending waitpoint.
803    Buffered { signal_id: SignalId },
804    /// Duplicate signal.
805    Duplicate { existing_signal_id: SignalId },
806}
807
808// ─── list_pending_waitpoints ───
809
810/// One entry in the read-only view of an execution's active waitpoints.
811///
812/// Returned by `Server::list_pending_waitpoints` (and the
813/// `GET /v1/executions/{id}/pending-waitpoints` REST endpoint). The
814/// `waitpoint_token` is the same HMAC-SHA1 credential a suspending worker
815/// receives in `SuspendOutcome::Suspended` — a reviewer that needs to
816/// deliver a signal against this waitpoint must present it in
817/// `DeliverSignalArgs::waitpoint_token`.
818///
819/// Exposing the token here is a deliberate API gap closure: a
820/// human-in-the-loop reviewer has no other path to the token, since only
821/// the suspending worker sees the `SuspendOutcome`. Access is gated by
822/// the same bearer-auth middleware as every other REST endpoint.
823#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
824pub struct PendingWaitpointInfo {
825    pub waitpoint_id: WaitpointId,
826    pub waitpoint_key: String,
827    /// Current waitpoint state: `pending`, `active`, `closed`. Callers
828    /// typically filter to `pending` or `active`.
829    pub state: String,
830    /// HMAC-SHA1 token minted at create time; required by
831    /// `ff_deliver_signal` and `ff_buffer_signal_for_pending_waitpoint`.
832    pub waitpoint_token: WaitpointToken,
833    /// Signal names the resume condition is waiting for. Reviewers that
834    /// need to drive a specific waitpoint — particularly when multiple
835    /// concurrent waitpoints exist on one execution — filter on this to
836    /// pick the right target.
837    ///
838    /// An EMPTY vec means the condition matches any signal (wildcard, per
839    /// `lua/helpers.lua` `initialize_condition`). Callers must not infer
840    /// "no waitpoint" from empty; check `state` / length of the outer
841    /// list for that.
842    #[serde(default)]
843    pub required_signal_names: Vec<String>,
844    /// Timestamp when the waitpoint record was first written.
845    pub created_at: TimestampMs,
846    /// Timestamp when the waitpoint was activated (suspension landed).
847    /// `None` while the waitpoint is still `pending`.
848    #[serde(default, skip_serializing_if = "Option::is_none")]
849    pub activated_at: Option<TimestampMs>,
850    /// Scheduled expiration timestamp. `None` if no timeout configured.
851    #[serde(default, skip_serializing_if = "Option::is_none")]
852    pub expires_at: Option<TimestampMs>,
853}
854
855// ─── expire_suspension ───
856
857#[derive(Clone, Debug, Serialize, Deserialize)]
858pub struct ExpireSuspensionArgs {
859    pub execution_id: ExecutionId,
860}
861
862#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
863pub enum ExpireSuspensionResult {
864    /// Suspension expired with the given behavior applied.
865    Expired { behavior_applied: String },
866    /// Already resolved — no action needed.
867    AlreadySatisfied { reason: String },
868}
869
870// ─── claim_resumed_execution ───
871
872#[derive(Clone, Debug, Serialize, Deserialize)]
873pub struct ClaimResumedExecutionArgs {
874    pub execution_id: ExecutionId,
875    pub worker_id: WorkerId,
876    pub worker_instance_id: WorkerInstanceId,
877    pub lane_id: LaneId,
878    pub lease_id: LeaseId,
879    pub lease_ttl_ms: u64,
880    /// Current attempt index (for KEYS construction — from exec_core).
881    pub current_attempt_index: AttemptIndex,
882    /// Remaining attempt timeout from before suspension (ms). 0 = no timeout.
883    #[serde(default)]
884    pub remaining_attempt_timeout_ms: Option<u64>,
885    pub now: TimestampMs,
886}
887
888#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
889pub struct ClaimedResumedExecution {
890    pub execution_id: ExecutionId,
891    pub lease_id: LeaseId,
892    pub lease_epoch: LeaseEpoch,
893    pub attempt_index: AttemptIndex,
894    pub attempt_id: AttemptId,
895    pub lease_expires_at: TimestampMs,
896}
897
898#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
899pub enum ClaimResumedExecutionResult {
900    /// Successfully claimed resumed execution (same attempt continues).
901    Claimed(ClaimedResumedExecution),
902}
903
904// ═══════════════════════════════════════════════════════════════════════
905// Phase 4 contracts: stream
906// ═══════════════════════════════════════════════════════════════════════
907
908// ─── append_frame ───
909
910#[derive(Clone, Debug, Serialize, Deserialize)]
911pub struct AppendFrameArgs {
912    pub execution_id: ExecutionId,
913    pub attempt_index: AttemptIndex,
914    pub lease_id: LeaseId,
915    pub lease_epoch: LeaseEpoch,
916    pub attempt_id: AttemptId,
917    pub frame_type: String,
918    pub timestamp: TimestampMs,
919    pub payload: Vec<u8>,
920    #[serde(default)]
921    pub encoding: Option<String>,
922    /// Optional structured metadata for the frame (JSON blob).
923    #[serde(default)]
924    pub metadata_json: Option<String>,
925    #[serde(default)]
926    pub correlation_id: Option<String>,
927    #[serde(default)]
928    pub source: Option<String>,
929    /// MAXLEN for the stream. 0 = no trim.
930    #[serde(default)]
931    pub retention_maxlen: Option<u32>,
932    /// Max payload bytes per frame. Default: 65536.
933    #[serde(default)]
934    pub max_payload_bytes: Option<u32>,
935}
936
937#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
938pub enum AppendFrameResult {
939    /// Frame appended successfully.
940    Appended {
941        /// Valkey Stream entry ID (e.g. "1713100800150-0").
942        entry_id: String,
943        /// Total frame count after this append.
944        frame_count: u64,
945    },
946}
947
948// ─── StreamCursor (issue #92) ───
949
950/// Opaque cursor for attempt-stream reads/tails.
951///
952/// Replaces the bare `&str` / `String` stream-id parameters previously
953/// carried on `read_stream` / `tail_stream` / `ReadStreamParams` /
954/// `TailStreamParams`. The wire form is a flat string — serde is
955/// transparent via `try_from`/`into` — so `?from=start&to=end` and
956/// `?after=123-0` continue to work for REST clients.
957///
958/// # Public wire grammar
959///
960/// The ONLY accepted tokens are:
961///
962/// * `"start"` — first entry in the stream (XRANGE `-` equivalent).
963///   Valid in `read_stream` / `ReadStreamParams`.
964/// * `"end"` — latest entry in the stream (XRANGE `+` equivalent).
965///   Valid in `read_stream` / `ReadStreamParams`.
966/// * `"<ms>"` or `"<ms>-<seq>"` — a concrete Valkey Stream entry id.
967///   Valid everywhere.
968///
969/// The bare XRANGE/XREAD markers `"-"` and `"+"` are **NOT** accepted
970/// on the wire. The opaque `StreamCursor` grammar is the public
971/// contract; the Valkey `-`/`+` markers are an internal implementation
972/// detail carried only inside the Lua-adjacent [`ReadFramesArgs`] /
973/// `xread_block` path via [`StreamCursor::to_wire`].
974///
975/// For XREAD (tail), the documented "from the beginning" convention is
976/// `StreamCursor::At("0-0".into())` — use the convenience constructor
977/// [`StreamCursor::from_beginning`] which returns exactly that value.
978/// `Start` / `End` are rejected by the SDK's `tail_stream` boundary
979/// because XREAD does not accept `-` / `+` as cursors. The
980/// [`StreamCursor::is_concrete`] helper centralises this
981/// Start/End-vs-At decision for boundary-validation call sites.
982///
983/// # Why an enum instead of a string
984///
985/// A string parameter lets malformed ids escape to the Lua/Valkey
986/// layer, surfacing as a script error and HTTP 500. An enum with
987/// fallible `FromStr` / `TryFrom<String>` catches every malformed input
988/// at the wire boundary with a structured error, and prevents bare `-`
989/// / `+` from leaking into consumer code as tacit extensions of the
990/// public API.
991#[derive(Clone, Debug, PartialEq, Eq, Hash)]
992pub enum StreamCursor {
993    /// First entry in the stream (XRANGE start marker).
994    Start,
995    /// Latest entry in the stream (XRANGE end marker).
996    End,
997    /// A concrete Valkey Stream entry id (`<ms>` or `<ms>-<seq>`).
998    ///
999    /// For XREAD-style tails, the documented "from the beginning"
1000    /// convention is `At("0-0".to_owned())` — see
1001    /// [`StreamCursor::from_beginning`].
1002    At(String),
1003}
1004
1005impl StreamCursor {
1006    /// Convenience constructor for the XREAD-from-beginning convention
1007    /// (`"0-0"`). XREAD's `last_id` is exclusive, so passing this as
1008    /// the `after` cursor returns every entry in the stream.
1009    pub fn from_beginning() -> Self {
1010        Self::At("0-0".to_owned())
1011    }
1012
1013    /// Serde default helper — emits `StreamCursor::Start`. Used as
1014    /// `#[serde(default = "StreamCursor::start")]` on REST query
1015    /// structs.
1016    pub fn start() -> Self {
1017        Self::Start
1018    }
1019
1020    /// Serde default helper — emits `StreamCursor::End`.
1021    pub fn end() -> Self {
1022        Self::End
1023    }
1024
1025    /// Serde default helper — emits
1026    /// `StreamCursor::from_beginning()`. Used as the default for
1027    /// `TailStreamParams::after`.
1028    pub fn beginning() -> Self {
1029        Self::from_beginning()
1030    }
1031
1032    /// Internal-only: lower the cursor to the XRANGE/XREAD marker
1033    /// string Valkey expects. `Start → "-"`, `End → "+"`,
1034    /// `At(s) → s`.
1035    ///
1036    /// Used at the ff-script adapter edge (right before constructing
1037    /// `ReadFramesArgs` or calling `xread_block`) to translate the
1038    /// opaque wire grammar into the Lua-ABI form. NOT part of the
1039    /// public wire — do not emit these raw characters to consumers.
1040    /// Hidden from the generated docs to discourage external use;
1041    /// external consumers should never need to see the raw `-` / `+`.
1042    #[doc(hidden)]
1043    pub fn to_wire(&self) -> &str {
1044        match self {
1045            Self::Start => "-",
1046            Self::End => "+",
1047            Self::At(s) => s.as_str(),
1048        }
1049    }
1050
1051    /// Internal-only owned variant of [`Self::to_wire`] — moves the
1052    /// inner `String` out of `At(s)` without cloning. Use at adapter
1053    /// edges that construct an owned wire string (e.g.
1054    /// `ReadFramesArgs.from_id`) from a `StreamCursor` that is about
1055    /// to be dropped.
1056    #[doc(hidden)]
1057    pub fn into_wire_string(self) -> String {
1058        match self {
1059            Self::Start => "-".to_owned(),
1060            Self::End => "+".to_owned(),
1061            Self::At(s) => s,
1062        }
1063    }
1064
1065    /// True iff this cursor is a concrete entry id
1066    /// (`"<ms>"` / `"<ms>-<seq>"`). False for the open markers
1067    /// `Start` / `End`.
1068    ///
1069    /// Used by boundaries like XREAD (tailing) that do not accept
1070    /// open markers — rejecting a cursor is equivalent to
1071    /// `!cursor.is_concrete()`. Centralised here to keep the SDK and
1072    /// REST guards in lock-step.
1073    pub fn is_concrete(&self) -> bool {
1074        matches!(self, Self::At(_))
1075    }
1076}
1077
1078impl std::fmt::Display for StreamCursor {
1079    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1080        match self {
1081            Self::Start => f.write_str("start"),
1082            Self::End => f.write_str("end"),
1083            Self::At(s) => f.write_str(s),
1084        }
1085    }
1086}
1087
1088/// Error produced when parsing a [`StreamCursor`] from a string.
1089#[derive(Clone, Debug, PartialEq, Eq)]
1090pub enum StreamCursorParseError {
1091    /// Empty input.
1092    Empty,
1093    /// Input matched a rejected bare-marker alias (`"-"`, `"+"`).
1094    /// The public wire requires `"start"` / `"end"`; the raw Valkey
1095    /// markers are internal-only.
1096    BareMarkerRejected(String),
1097    /// Input was neither a recognized keyword nor a well-formed
1098    /// Stream entry id. Entry ids must match `^\d+(?:-\d+)?$`.
1099    Malformed(String),
1100}
1101
1102impl std::fmt::Display for StreamCursorParseError {
1103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1104        match self {
1105            Self::Empty => f.write_str("stream cursor must not be empty"),
1106            Self::BareMarkerRejected(s) => write!(
1107                f,
1108                "bare marker '{s}' is not a valid stream cursor; use 'start' or 'end'"
1109            ),
1110            Self::Malformed(s) => write!(
1111                f,
1112                "invalid stream cursor '{s}' (expected 'start', 'end', '<ms>', or '<ms>-<seq>')"
1113            ),
1114        }
1115    }
1116}
1117
1118impl std::error::Error for StreamCursorParseError {}
1119
1120/// Shared grammar check — classifies `s` as `Start` / `End` / a
1121/// concrete-id shape / malformed / empty, WITHOUT allocating. The
1122/// owned vs borrowed entry points ([`StreamCursor::from_str`],
1123/// [`StreamCursor::try_from`]) consume this classification and move
1124/// the owned `String` into `At` when applicable, avoiding a
1125/// round-trip `String → &str → String::to_owned` for the common
1126/// REST-query path.
1127enum StreamCursorClass {
1128    Start,
1129    End,
1130    Concrete,
1131    BareMarker,
1132    Empty,
1133    Malformed,
1134}
1135
1136fn classify_stream_cursor(s: &str) -> StreamCursorClass {
1137    if s.is_empty() {
1138        return StreamCursorClass::Empty;
1139    }
1140    if s == "-" || s == "+" {
1141        return StreamCursorClass::BareMarker;
1142    }
1143    if s == "start" {
1144        return StreamCursorClass::Start;
1145    }
1146    if s == "end" {
1147        return StreamCursorClass::End;
1148    }
1149    if !s.is_ascii() {
1150        return StreamCursorClass::Malformed;
1151    }
1152    let (ms_part, seq_part) = match s.split_once('-') {
1153        Some((ms, seq)) => (ms, Some(seq)),
1154        None => (s, None),
1155    };
1156    let ms_ok = !ms_part.is_empty() && ms_part.bytes().all(|b| b.is_ascii_digit());
1157    let seq_ok = seq_part
1158        .map(|p| !p.is_empty() && p.bytes().all(|b| b.is_ascii_digit()))
1159        .unwrap_or(true);
1160    if ms_ok && seq_ok {
1161        StreamCursorClass::Concrete
1162    } else {
1163        StreamCursorClass::Malformed
1164    }
1165}
1166
1167impl std::str::FromStr for StreamCursor {
1168    type Err = StreamCursorParseError;
1169
1170    fn from_str(s: &str) -> Result<Self, Self::Err> {
1171        match classify_stream_cursor(s) {
1172            StreamCursorClass::Start => Ok(Self::Start),
1173            StreamCursorClass::End => Ok(Self::End),
1174            StreamCursorClass::Concrete => Ok(Self::At(s.to_owned())),
1175            StreamCursorClass::BareMarker => {
1176                Err(StreamCursorParseError::BareMarkerRejected(s.to_owned()))
1177            }
1178            StreamCursorClass::Empty => Err(StreamCursorParseError::Empty),
1179            StreamCursorClass::Malformed => Err(StreamCursorParseError::Malformed(s.to_owned())),
1180        }
1181    }
1182}
1183
1184impl TryFrom<String> for StreamCursor {
1185    type Error = StreamCursorParseError;
1186
1187    fn try_from(s: String) -> Result<Self, Self::Error> {
1188        // Owned parsing path — the `At` variant moves `s` in directly,
1189        // avoiding the `&str → String::to_owned` re-allocation that a
1190        // blind forward to `FromStr::from_str(&s)` would force. Error
1191        // paths still pay one allocation to describe the offending
1192        // input.
1193        match classify_stream_cursor(&s) {
1194            StreamCursorClass::Start => Ok(Self::Start),
1195            StreamCursorClass::End => Ok(Self::End),
1196            StreamCursorClass::Concrete => Ok(Self::At(s)),
1197            StreamCursorClass::BareMarker => Err(StreamCursorParseError::BareMarkerRejected(s)),
1198            StreamCursorClass::Empty => Err(StreamCursorParseError::Empty),
1199            StreamCursorClass::Malformed => Err(StreamCursorParseError::Malformed(s)),
1200        }
1201    }
1202}
1203
1204impl From<StreamCursor> for String {
1205    fn from(c: StreamCursor) -> Self {
1206        c.to_string()
1207    }
1208}
1209
1210impl Serialize for StreamCursor {
1211    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
1212        serializer.collect_str(self)
1213    }
1214}
1215
1216impl<'de> Deserialize<'de> for StreamCursor {
1217    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
1218        let s = String::deserialize(deserializer)?;
1219        Self::try_from(s).map_err(serde::de::Error::custom)
1220    }
1221}
1222
1223// ─── read_attempt_stream / tail_attempt_stream ───
1224
1225/// Hard cap on the number of frames returned by a single read/tail call.
1226///
1227/// Single source of truth across the Rust layer (ff-script, ff-server,
1228/// ff-sdk). The Lua side in `lua/stream.lua` keeps a matching literal with
1229/// an inline reference back here; bump both together if you ever need to
1230/// lift the cap.
1231pub const STREAM_READ_HARD_CAP: u64 = 10_000;
1232
1233/// A single frame read from an attempt-scoped stream.
1234///
1235/// Field set mirrors what `ff_append_frame` writes: `frame_type`, `ts`,
1236/// `payload`, `encoding`, `source`, and optionally `correlation_id`. Stored
1237/// as an ordered map so field order is deterministic across read calls.
1238#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1239pub struct StreamFrame {
1240    /// Valkey Stream entry ID, e.g. "1713100800150-0".
1241    pub id: String,
1242    /// Frame fields in sorted order.
1243    pub fields: std::collections::BTreeMap<String, String>,
1244}
1245
1246/// Inputs to `ff_read_attempt_stream` (XRANGE wrapper).
1247#[derive(Clone, Debug, Serialize, Deserialize)]
1248pub struct ReadFramesArgs {
1249    pub execution_id: ExecutionId,
1250    pub attempt_index: AttemptIndex,
1251    /// XRANGE start ID. Use "-" for earliest.
1252    pub from_id: String,
1253    /// XRANGE end ID. Use "+" for latest.
1254    pub to_id: String,
1255    /// XRANGE COUNT limit. MUST be `>= 1`. The REST and SDK layers reject
1256    /// `0` at the boundary; the Lua side rejects it too. `STREAM_READ_HARD_CAP`
1257    /// is the upper bound.
1258    pub count_limit: u64,
1259}
1260
1261/// Result of reading frames from an attempt stream — frames plus terminal
1262/// signal so consumers can stop polling without a timeout fallback.
1263#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1264pub struct StreamFrames {
1265    /// Entries in the requested range (possibly empty).
1266    pub frames: Vec<StreamFrame>,
1267    /// Timestamp when the upstream writer closed the stream. `None` if the
1268    /// stream is still open (or has never been written).
1269    #[serde(default, skip_serializing_if = "Option::is_none")]
1270    pub closed_at: Option<TimestampMs>,
1271    /// Reason from the closing writer. Current values:
1272    /// `attempt_success`, `attempt_failure`, `attempt_cancelled`,
1273    /// `attempt_interrupted`. `None` iff the stream is still open.
1274    #[serde(default, skip_serializing_if = "Option::is_none")]
1275    pub closed_reason: Option<String>,
1276}
1277
1278impl StreamFrames {
1279    /// Construct an empty open-stream result (no frames, no terminal
1280    /// markers). Useful for fast-path peek helpers.
1281    pub fn empty_open() -> Self {
1282        Self {
1283            frames: Vec::new(),
1284            closed_at: None,
1285            closed_reason: None,
1286        }
1287    }
1288
1289    /// True iff the producer has closed this stream. Consumers should stop
1290    /// polling and drain once this returns true.
1291    pub fn is_closed(&self) -> bool {
1292        self.closed_at.is_some()
1293    }
1294}
1295
1296#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1297pub enum ReadFramesResult {
1298    /// Frames returned (possibly empty) plus optional closed markers.
1299    Frames(StreamFrames),
1300}
1301
1302// ═══════════════════════════════════════════════════════════════════════
1303// Phase 5 contracts: budget, quota, block/unblock
1304// ═══════════════════════════════════════════════════════════════════════
1305
1306// ─── create_budget ───
1307
1308#[derive(Clone, Debug, Serialize, Deserialize)]
1309pub struct CreateBudgetArgs {
1310    pub budget_id: crate::types::BudgetId,
1311    pub scope_type: String,
1312    pub scope_id: String,
1313    pub enforcement_mode: String,
1314    pub on_hard_limit: String,
1315    pub on_soft_limit: String,
1316    pub reset_interval_ms: u64,
1317    /// Dimension names.
1318    pub dimensions: Vec<String>,
1319    /// Hard limits per dimension (parallel with dimensions).
1320    pub hard_limits: Vec<u64>,
1321    /// Soft limits per dimension (parallel with dimensions).
1322    pub soft_limits: Vec<u64>,
1323    pub now: TimestampMs,
1324}
1325
1326#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1327pub enum CreateBudgetResult {
1328    /// Budget created.
1329    Created { budget_id: crate::types::BudgetId },
1330    /// Already exists (idempotent).
1331    AlreadySatisfied { budget_id: crate::types::BudgetId },
1332}
1333
1334// ─── create_quota_policy ───
1335
1336#[derive(Clone, Debug, Serialize, Deserialize)]
1337pub struct CreateQuotaPolicyArgs {
1338    pub quota_policy_id: crate::types::QuotaPolicyId,
1339    pub window_seconds: u64,
1340    pub max_requests_per_window: u64,
1341    pub max_concurrent: u64,
1342    pub now: TimestampMs,
1343}
1344
1345#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1346pub enum CreateQuotaPolicyResult {
1347    /// Quota policy created.
1348    Created {
1349        quota_policy_id: crate::types::QuotaPolicyId,
1350    },
1351    /// Already exists (idempotent).
1352    AlreadySatisfied {
1353        quota_policy_id: crate::types::QuotaPolicyId,
1354    },
1355}
1356
1357// ─── budget_status (read-only) ───
1358
1359/// Operator-facing budget status snapshot (not an FCALL — direct HGETALL reads).
1360#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1361pub struct BudgetStatus {
1362    pub budget_id: String,
1363    pub scope_type: String,
1364    pub scope_id: String,
1365    pub enforcement_mode: String,
1366    /// Current usage per dimension: {dimension_name: current_value}.
1367    pub usage: HashMap<String, u64>,
1368    /// Hard limits per dimension: {dimension_name: limit}.
1369    pub hard_limits: HashMap<String, u64>,
1370    /// Soft limits per dimension: {dimension_name: limit}.
1371    pub soft_limits: HashMap<String, u64>,
1372    pub breach_count: u64,
1373    pub soft_breach_count: u64,
1374    pub last_breach_at: Option<String>,
1375    pub last_breach_dim: Option<String>,
1376    pub next_reset_at: Option<String>,
1377    pub created_at: Option<String>,
1378}
1379
1380// ─── report_usage_and_check ───
1381
1382#[derive(Clone, Debug, Serialize, Deserialize)]
1383pub struct ReportUsageArgs {
1384    /// Dimension names to increment.
1385    pub dimensions: Vec<String>,
1386    /// Increment values (parallel with dimensions).
1387    pub deltas: Vec<u64>,
1388    pub now: TimestampMs,
1389    /// Optional idempotency key to prevent double-counting on retries.
1390    /// Pass the raw dedup id (e.g. `"retry-42"`); the typed FCALL wrapper
1391    /// wraps it into `ff:usagededup:{b:M}:<id>` using the budget
1392    /// partition's hash tag so it co-locates with the other budget keys
1393    /// (#108).
1394    #[serde(default)]
1395    pub dedup_key: Option<String>,
1396}
1397
1398#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1399#[non_exhaustive]
1400pub enum ReportUsageResult {
1401    /// All increments applied, no breach.
1402    Ok,
1403    /// Soft limit breached on a dimension (advisory, increments applied).
1404    SoftBreach {
1405        dimension: String,
1406        current_usage: u64,
1407        soft_limit: u64,
1408    },
1409    /// Hard limit breached (increments NOT applied).
1410    HardBreach {
1411        dimension: String,
1412        current_usage: u64,
1413        hard_limit: u64,
1414    },
1415    /// Dedup key matched — usage already applied in a prior call.
1416    AlreadyApplied,
1417}
1418
1419// ─── reset_budget ───
1420
1421#[derive(Clone, Debug, Serialize, Deserialize)]
1422pub struct ResetBudgetArgs {
1423    pub budget_id: crate::types::BudgetId,
1424    pub now: TimestampMs,
1425}
1426
1427#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1428pub enum ResetBudgetResult {
1429    /// Budget reset successfully.
1430    Reset { next_reset_at: TimestampMs },
1431}
1432
1433// ─── check_admission_and_record ───
1434
1435#[derive(Clone, Debug, Serialize, Deserialize)]
1436pub struct CheckAdmissionArgs {
1437    pub execution_id: ExecutionId,
1438    pub now: TimestampMs,
1439    pub window_seconds: u64,
1440    pub rate_limit: u64,
1441    pub concurrency_cap: u64,
1442    #[serde(default)]
1443    pub jitter_ms: Option<u64>,
1444}
1445
1446#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1447pub enum CheckAdmissionResult {
1448    /// Admitted — execution may proceed.
1449    Admitted,
1450    /// Already admitted in this window (idempotent).
1451    AlreadyAdmitted,
1452    /// Rate limit exceeded.
1453    RateExceeded { retry_after_ms: u64 },
1454    /// Concurrency cap hit.
1455    ConcurrencyExceeded,
1456}
1457
1458// ─── release_admission ───
1459
1460#[derive(Clone, Debug, Serialize, Deserialize)]
1461pub struct ReleaseAdmissionArgs {
1462    pub execution_id: ExecutionId,
1463}
1464
1465#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1466pub enum ReleaseAdmissionResult {
1467    Released,
1468}
1469
1470// ─── block_execution_for_admission ───
1471
1472#[derive(Clone, Debug, Serialize, Deserialize)]
1473pub struct BlockExecutionArgs {
1474    pub execution_id: ExecutionId,
1475    pub blocking_reason: String,
1476    #[serde(default)]
1477    pub blocking_detail: Option<String>,
1478    pub now: TimestampMs,
1479}
1480
1481#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1482pub enum BlockExecutionResult {
1483    /// Execution blocked.
1484    Blocked,
1485}
1486
1487// ─── unblock_execution ───
1488
1489#[derive(Clone, Debug, Serialize, Deserialize)]
1490pub struct UnblockExecutionArgs {
1491    pub execution_id: ExecutionId,
1492    pub now: TimestampMs,
1493    /// Expected blocking reason (prevents stale unblock).
1494    #[serde(default)]
1495    pub expected_blocking_reason: Option<String>,
1496}
1497
1498#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1499pub enum UnblockExecutionResult {
1500    /// Execution unblocked and moved to eligible.
1501    Unblocked,
1502}
1503
1504// ═══════════════════════════════════════════════════════════════════════
1505// Phase 6 contracts: flow coordination and dependencies
1506// ═══════════════════════════════════════════════════════════════════════
1507
1508// ─── create_flow ───
1509
1510#[derive(Clone, Debug, Serialize, Deserialize)]
1511pub struct CreateFlowArgs {
1512    pub flow_id: crate::types::FlowId,
1513    pub flow_kind: String,
1514    pub namespace: Namespace,
1515    pub now: TimestampMs,
1516}
1517
1518#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1519pub enum CreateFlowResult {
1520    /// Flow created successfully.
1521    Created { flow_id: crate::types::FlowId },
1522    /// Flow already exists (idempotent).
1523    AlreadySatisfied { flow_id: crate::types::FlowId },
1524}
1525
1526// ─── add_execution_to_flow ───
1527
1528#[derive(Clone, Debug, Serialize, Deserialize)]
1529pub struct AddExecutionToFlowArgs {
1530    pub flow_id: crate::types::FlowId,
1531    pub execution_id: ExecutionId,
1532    pub now: TimestampMs,
1533}
1534
1535#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1536pub enum AddExecutionToFlowResult {
1537    /// Execution added to flow.
1538    Added {
1539        execution_id: ExecutionId,
1540        new_node_count: u32,
1541    },
1542    /// Already a member (idempotent).
1543    AlreadyMember {
1544        execution_id: ExecutionId,
1545        node_count: u32,
1546    },
1547}
1548
1549// ─── cancel_flow ───
1550
1551#[derive(Clone, Debug, Serialize, Deserialize)]
1552pub struct CancelFlowArgs {
1553    pub flow_id: crate::types::FlowId,
1554    pub reason: String,
1555    pub cancellation_policy: String,
1556    pub now: TimestampMs,
1557}
1558
1559#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1560pub enum CancelFlowResult {
1561    /// Flow cancelled and all member cancellations (if any) have completed
1562    /// synchronously. Used when `cancellation_policy != "cancel_all"`, when
1563    /// the flow has no members, when the caller opted into synchronous
1564    /// dispatch (e.g. `?wait=true`), or when the flow was already in a
1565    /// terminal state (idempotent retry).
1566    ///
1567    /// On the idempotent-retry path `member_execution_ids` may be *capped*
1568    /// at the server (default 1000 entries) to bound response bandwidth on
1569    /// flows with very large membership. The first (non-idempotent) call
1570    /// always returns the full list, so clients that need every member id
1571    /// should persist the initial response.
1572    Cancelled {
1573        cancellation_policy: String,
1574        member_execution_ids: Vec<String>,
1575    },
1576    /// Flow state was flipped to cancelled atomically, but member
1577    /// cancellations are dispatched asynchronously in the background.
1578    /// Clients may poll `GET /v1/executions/{id}/state` for each member
1579    /// execution id to track terminal state.
1580    CancellationScheduled {
1581        cancellation_policy: String,
1582        member_count: u32,
1583        member_execution_ids: Vec<String>,
1584    },
1585    /// `?wait=true` dispatch completed but one or more member cancellations
1586    /// failed mid-loop (e.g. ghost member, Lua error, transport fault after
1587    /// retries exhausted). The flow itself is still flipped to cancelled
1588    /// (atomic Lua already ran); callers SHOULD inspect
1589    /// `failed_member_execution_ids` and either retry those ids directly
1590    /// via `cancel_execution` or wait for the cancel-backlog reconciler
1591    /// to retry them in the background.
1592    ///
1593    /// Only emitted by the synchronous wait path
1594    /// ([`crate::CancelFlowArgs`] via `?wait=true`). The async path returns
1595    /// [`CancelFlowResult::CancellationScheduled`] and delegates retries
1596    /// to the reconciler — there is no visible "partial" state on the
1597    /// async path because the dispatch result is not observed inline.
1598    PartiallyCancelled {
1599        cancellation_policy: String,
1600        /// All member execution ids that the cancel_flow FCALL returned
1601        /// (i.e. the full membership at the moment of cancellation).
1602        member_execution_ids: Vec<String>,
1603        /// Strict subset of `member_execution_ids` whose per-member cancel
1604        /// FCALL returned an error. Order is deterministic (matches the
1605        /// iteration order over `member_execution_ids`).
1606        failed_member_execution_ids: Vec<String>,
1607    },
1608}
1609
1610// ─── stage_dependency_edge ───
1611
1612#[derive(Clone, Debug, Serialize, Deserialize)]
1613pub struct StageDependencyEdgeArgs {
1614    pub flow_id: crate::types::FlowId,
1615    pub edge_id: crate::types::EdgeId,
1616    pub upstream_execution_id: ExecutionId,
1617    pub downstream_execution_id: ExecutionId,
1618    #[serde(default = "default_dependency_kind")]
1619    pub dependency_kind: String,
1620    #[serde(default)]
1621    pub data_passing_ref: Option<String>,
1622    pub expected_graph_revision: u64,
1623    pub now: TimestampMs,
1624}
1625
1626fn default_dependency_kind() -> String {
1627    "success_only".to_owned()
1628}
1629
1630#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1631pub enum StageDependencyEdgeResult {
1632    /// Edge staged, new graph revision.
1633    Staged {
1634        edge_id: crate::types::EdgeId,
1635        new_graph_revision: u64,
1636    },
1637}
1638
1639// ─── apply_dependency_to_child ───
1640
1641#[derive(Clone, Debug, Serialize, Deserialize)]
1642pub struct ApplyDependencyToChildArgs {
1643    pub flow_id: crate::types::FlowId,
1644    pub edge_id: crate::types::EdgeId,
1645    /// The child execution that receives the dependency.
1646    pub downstream_execution_id: ExecutionId,
1647    pub upstream_execution_id: ExecutionId,
1648    pub graph_revision: u64,
1649    #[serde(default = "default_dependency_kind")]
1650    pub dependency_kind: String,
1651    #[serde(default)]
1652    pub data_passing_ref: Option<String>,
1653    pub now: TimestampMs,
1654}
1655
1656#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1657pub enum ApplyDependencyToChildResult {
1658    /// Dependency applied, N unsatisfied deps remaining.
1659    Applied { unsatisfied_count: u32 },
1660    /// Already applied (idempotent).
1661    AlreadyApplied,
1662}
1663
1664// ─── resolve_dependency ───
1665
1666#[derive(Clone, Debug, Serialize, Deserialize)]
1667pub struct ResolveDependencyArgs {
1668    pub edge_id: crate::types::EdgeId,
1669    /// "success", "failed", "cancelled", "expired", "skipped"
1670    pub upstream_outcome: String,
1671    pub now: TimestampMs,
1672}
1673
1674#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1675pub enum ResolveDependencyResult {
1676    /// Edge satisfied — downstream may become eligible.
1677    Satisfied,
1678    /// Edge made impossible — downstream becomes skipped.
1679    Impossible,
1680    /// Already resolved (idempotent).
1681    AlreadyResolved,
1682}
1683
1684// ─── promote_blocked_to_eligible ───
1685
1686#[derive(Clone, Debug, Serialize, Deserialize)]
1687pub struct PromoteBlockedToEligibleArgs {
1688    pub execution_id: ExecutionId,
1689    pub now: TimestampMs,
1690}
1691
1692#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1693pub enum PromoteBlockedToEligibleResult {
1694    Promoted,
1695}
1696
1697// ─── evaluate_flow_eligibility ───
1698
1699#[derive(Clone, Debug, Serialize, Deserialize)]
1700pub struct EvaluateFlowEligibilityArgs {
1701    pub execution_id: ExecutionId,
1702}
1703
1704#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1705pub enum EvaluateFlowEligibilityResult {
1706    /// Execution eligibility status.
1707    Status { status: String },
1708}
1709
1710// ─── replay_execution ───
1711
1712#[derive(Clone, Debug, Serialize, Deserialize)]
1713pub struct ReplayExecutionArgs {
1714    pub execution_id: ExecutionId,
1715    pub now: TimestampMs,
1716}
1717
1718#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1719pub enum ReplayExecutionResult {
1720    /// Replayed to runnable.
1721    Replayed { public_state: PublicState },
1722}
1723
1724// ─── get_execution (full read) ───
1725
1726/// Full execution info returned by `Server::get_execution`.
1727#[derive(Clone, Debug, Serialize, Deserialize)]
1728pub struct ExecutionInfo {
1729    pub execution_id: ExecutionId,
1730    pub namespace: String,
1731    pub lane_id: String,
1732    pub priority: i32,
1733    pub execution_kind: String,
1734    pub state_vector: StateVector,
1735    pub public_state: PublicState,
1736    pub created_at: String,
1737    /// TimestampMs (ms since epoch) when the execution's first attempt
1738    /// was started by a worker claim. Empty string until the first
1739    /// claim lands. Serialised as `Option<String>` so pre-claim reads
1740    /// deserialise cleanly even if the field is absent from the wire.
1741    #[serde(default, skip_serializing_if = "Option::is_none")]
1742    pub started_at: Option<String>,
1743    /// TimestampMs when the execution reached a terminal
1744    /// `completed`/`failed`/`cancelled`/`expired` state. Empty /
1745    /// absent while still in flight.
1746    #[serde(default, skip_serializing_if = "Option::is_none")]
1747    pub completed_at: Option<String>,
1748    pub current_attempt_index: u32,
1749    pub flow_id: Option<String>,
1750    pub blocking_detail: String,
1751}
1752
1753// ─── set_execution_tags / set_flow_tags (issue #58.4) ───
1754
1755/// Args for `ff_set_execution_tags`. Tag keys MUST match
1756/// `^[a-z][a-z0-9_]*\.` — the caller-namespace rule — or the FCALL
1757/// returns `invalid_tag_key`. Values are arbitrary strings. The map is
1758/// ordered (`BTreeMap`) so two callers submitting the same logical set
1759/// of tags produce identical ARGV.
1760#[derive(Clone, Debug, Serialize, Deserialize)]
1761pub struct SetExecutionTagsArgs {
1762    pub execution_id: ExecutionId,
1763    pub tags: BTreeMap<String, String>,
1764}
1765
1766/// Result of `ff_set_execution_tags`.
1767#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1768pub enum SetExecutionTagsResult {
1769    /// Tags written. `count` is the number of key-value pairs applied.
1770    Ok { count: u32 },
1771}
1772
1773/// Args for `ff_set_flow_tags`. Same namespace rule as
1774/// [`SetExecutionTagsArgs`]. The Lua function also lazy-migrates any
1775/// pre-58.4 reserved-namespace fields stashed inline on `flow_core` into
1776/// the new tags key.
1777#[derive(Clone, Debug, Serialize, Deserialize)]
1778pub struct SetFlowTagsArgs {
1779    pub flow_id: FlowId,
1780    pub tags: BTreeMap<String, String>,
1781}
1782
1783/// Result of `ff_set_flow_tags`.
1784#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1785pub enum SetFlowTagsResult {
1786    /// Tags written. `count` is the number of key-value pairs applied.
1787    Ok { count: u32 },
1788}
1789
1790// ─── describe_execution (issue #58.1) ───
1791
1792/// Engine-decoupled read-model for one execution.
1793///
1794/// Returned by `ff_sdk::FlowFabricWorker::describe_execution`. Consumers
1795/// consult this struct instead of reaching into Valkey's exec_core hash
1796/// directly — the engine is free to rename fields or restructure storage
1797/// under this surface.
1798///
1799/// `#[non_exhaustive]` — FF may add fields in minor releases without a
1800/// semver break. Match with `..` or use field-by-field construction.
1801#[derive(Clone, Debug, PartialEq, Eq)]
1802#[non_exhaustive]
1803pub struct ExecutionSnapshot {
1804    pub execution_id: ExecutionId,
1805    pub flow_id: Option<FlowId>,
1806    pub lane_id: LaneId,
1807    pub namespace: Namespace,
1808    pub public_state: PublicState,
1809    /// Blocking reason string (e.g. `"waiting_for_worker"`,
1810    /// `"waiting_for_delay"`, `"waiting_for_dependencies"`). `None` when
1811    /// the exec_core field is empty.
1812    pub blocking_reason: Option<String>,
1813    /// Free-form operator-readable detail explaining `blocking_reason`.
1814    /// `None` when the exec_core field is empty.
1815    pub blocking_detail: Option<String>,
1816    /// Summary of the execution's currently-active attempt. `None` when
1817    /// no attempt has been started (pre-claim) or when the exec_core
1818    /// attempt fields are all empty.
1819    pub current_attempt: Option<AttemptSummary>,
1820    /// Summary of the execution's currently-held lease. `None` when the
1821    /// execution is not held by a worker.
1822    pub current_lease: Option<LeaseSummary>,
1823    /// The waitpoint this execution is currently suspended on, if any.
1824    pub current_waitpoint: Option<WaitpointId>,
1825    pub created_at: TimestampMs,
1826    /// Timestamp of the last write that mutated exec_core. Engine-maintained.
1827    pub last_mutation_at: TimestampMs,
1828    pub total_attempt_count: u32,
1829    /// Caller-owned labels. The prefix `^[a-z][a-z0-9_]*\.` is reserved for
1830    /// consumer metadata (e.g. `cairn.task_id`); FF guarantees it will not
1831    /// write keys matching that shape. FF's own fields stay in snake_case
1832    /// without dots. Empty when no tags are set.
1833    pub tags: BTreeMap<String, String>,
1834}
1835
1836impl ExecutionSnapshot {
1837    /// Construct an [`ExecutionSnapshot`]. Present so downstream crates
1838    /// (ff-sdk's `describe_execution`) can assemble the struct despite
1839    /// the `#[non_exhaustive]` marker. Prefer adding builder-style
1840    /// helpers here over loosening `non_exhaustive`.
1841    #[allow(clippy::too_many_arguments)]
1842    pub fn new(
1843        execution_id: ExecutionId,
1844        flow_id: Option<FlowId>,
1845        lane_id: LaneId,
1846        namespace: Namespace,
1847        public_state: PublicState,
1848        blocking_reason: Option<String>,
1849        blocking_detail: Option<String>,
1850        current_attempt: Option<AttemptSummary>,
1851        current_lease: Option<LeaseSummary>,
1852        current_waitpoint: Option<WaitpointId>,
1853        created_at: TimestampMs,
1854        last_mutation_at: TimestampMs,
1855        total_attempt_count: u32,
1856        tags: BTreeMap<String, String>,
1857    ) -> Self {
1858        Self {
1859            execution_id,
1860            flow_id,
1861            lane_id,
1862            namespace,
1863            public_state,
1864            blocking_reason,
1865            blocking_detail,
1866            current_attempt,
1867            current_lease,
1868            current_waitpoint,
1869            created_at,
1870            last_mutation_at,
1871            total_attempt_count,
1872            tags,
1873        }
1874    }
1875}
1876
1877/// Currently-active attempt summary inside an [`ExecutionSnapshot`].
1878///
1879/// `#[non_exhaustive]`.
1880#[derive(Clone, Debug, PartialEq, Eq)]
1881#[non_exhaustive]
1882pub struct AttemptSummary {
1883    pub attempt_id: AttemptId,
1884    pub attempt_index: AttemptIndex,
1885}
1886
1887impl AttemptSummary {
1888    /// Construct an [`AttemptSummary`]. See [`ExecutionSnapshot::new`]
1889    /// for the rationale — `#[non_exhaustive]` blocks cross-crate
1890    /// struct-literal construction.
1891    pub fn new(attempt_id: AttemptId, attempt_index: AttemptIndex) -> Self {
1892        Self {
1893            attempt_id,
1894            attempt_index,
1895        }
1896    }
1897}
1898
1899/// Currently-held lease summary inside an [`ExecutionSnapshot`].
1900///
1901/// `#[non_exhaustive]`.
1902#[derive(Clone, Debug, PartialEq, Eq)]
1903#[non_exhaustive]
1904pub struct LeaseSummary {
1905    pub lease_epoch: LeaseEpoch,
1906    pub worker_instance_id: WorkerInstanceId,
1907    pub expires_at: TimestampMs,
1908}
1909
1910impl LeaseSummary {
1911    /// Construct a [`LeaseSummary`]. See [`ExecutionSnapshot::new`]
1912    /// for the rationale.
1913    pub fn new(
1914        lease_epoch: LeaseEpoch,
1915        worker_instance_id: WorkerInstanceId,
1916        expires_at: TimestampMs,
1917    ) -> Self {
1918        Self {
1919            lease_epoch,
1920            worker_instance_id,
1921            expires_at,
1922        }
1923    }
1924}
1925
1926// ─── Common sub-types ───
1927
1928// ─── describe_flow (issue #58.2) ───
1929
1930/// Engine-decoupled read-model for one flow.
1931///
1932/// Returned by `ff_sdk::FlowFabricWorker::describe_flow`. Consumers
1933/// consult this struct instead of reaching into Valkey's flow_core hash
1934/// directly — the engine is free to rename fields or restructure storage
1935/// under this surface.
1936///
1937/// `#[non_exhaustive]` — FF may add fields in minor releases without a
1938/// semver break. Match with `..` or use [`FlowSnapshot::new`].
1939///
1940/// # `public_flow_state`
1941///
1942/// Stored as an engine-written string literal on `flow_core`. Known
1943/// values today: `open`, `running`, `blocked`, `cancelled`, `completed`,
1944/// `failed`. Surfaced as `String` (not a typed enum) because FF does
1945/// not yet expose a `PublicFlowState` type — callers that need to act
1946/// on specific values should match on the literal. The flow_projector
1947/// writes a parallel `public_flow_state` into the flow's summary hash;
1948/// this field reflects the authoritative value on `flow_core`, which
1949/// is what mutation guards (cancel/add-member) consult.
1950///
1951/// # `tags`
1952///
1953/// Unlike [`ExecutionSnapshot::tags`] (which has a dedicated tags
1954/// hash), flow tags live inline on `flow_core`. FF's own fields are
1955/// snake_case without a `.`; any field whose name starts with
1956/// `<lowercase>.` (e.g. `cairn.task_id`) is treated as consumer-owned
1957/// metadata and routed here. An empty map means no namespaced tags
1958/// were written. The prefix convention mirrors
1959/// [`ExecutionSnapshot::tags`] — consumers should keep tag keys
1960/// namespaced (`cairn.*`, `operator.*`, etc.) so future FF field
1961/// additions don't collide.
1962#[derive(Clone, Debug, PartialEq, Eq)]
1963#[non_exhaustive]
1964pub struct FlowSnapshot {
1965    pub flow_id: FlowId,
1966    /// The `flow_kind` literal passed to `create_flow` (e.g. `dag`,
1967    /// `pipeline`). Preserved as-is; FF does not interpret it.
1968    pub flow_kind: String,
1969    pub namespace: Namespace,
1970    /// Authoritative flow state on `flow_core`. See the struct-level
1971    /// docs for the set of known values.
1972    pub public_flow_state: String,
1973    /// Monotonically increasing revision bumped on every structural
1974    /// mutation (add-member, stage-edge). Used by optimistic-concurrency
1975    /// writers via `expected_graph_revision`.
1976    pub graph_revision: u64,
1977    /// Number of member executions added so far. Never decremented.
1978    pub node_count: u32,
1979    /// Number of dependency edges staged so far. Never decremented.
1980    pub edge_count: u32,
1981    pub created_at: TimestampMs,
1982    /// Timestamp of the last write that mutated `flow_core`.
1983    /// Engine-maintained.
1984    pub last_mutation_at: TimestampMs,
1985    /// When the flow reached a terminal state via `cancel_flow`. `None`
1986    /// while the flow is live. Only written by the cancel path today;
1987    /// `completed`/`failed` terminal states do not populate this field
1988    /// (the flow_projector derives them from membership).
1989    pub cancelled_at: Option<TimestampMs>,
1990    /// Operator-supplied reason from the `cancel_flow` call. `None`
1991    /// when the flow has not been cancelled.
1992    pub cancel_reason: Option<String>,
1993    /// The `cancellation_policy` value persisted by `cancel_flow`
1994    /// (e.g. `cancel_all`, `cancel_flow_only`). `None` for flows
1995    /// cancelled before this field was persisted, or not yet cancelled.
1996    pub cancellation_policy: Option<String>,
1997    /// Consumer-owned namespaced metadata (e.g. `cairn.task_id`). See
1998    /// the struct-level docs for the routing rule.
1999    pub tags: BTreeMap<String, String>,
2000    /// RFC-016 Stage A: inbound edge groups known on this flow.
2001    ///
2002    /// One entry per downstream execution that has at least one staged
2003    /// inbound dependency edge. Populated from the
2004    /// `ff:flow:{fp:N}:<flow_id>:edgegroup:<downstream_eid>` hash —
2005    /// when that hash is absent (existing flows created before Stage A),
2006    /// the backend falls back to reading the legacy
2007    /// `deps_meta.unsatisfied_required_count` counter on the
2008    /// downstream's exec partition and reports the group as
2009    /// [`EdgeDependencyPolicy::AllOf`] with the derived counters
2010    /// (backward-compat shim — see RFC-016 §11 Stage A).
2011    ///
2012    /// Every entry in Stage A reports `policy = AllOf`; Stage B/C/D/E
2013    /// extend the variants and wire the quorum counters.
2014    pub edge_groups: Vec<EdgeGroupSnapshot>,
2015}
2016
2017impl FlowSnapshot {
2018    /// Construct a [`FlowSnapshot`]. Present so downstream crates
2019    /// (ff-sdk's `describe_flow`) can assemble the struct despite the
2020    /// `#[non_exhaustive]` marker.
2021    #[allow(clippy::too_many_arguments)]
2022    pub fn new(
2023        flow_id: FlowId,
2024        flow_kind: String,
2025        namespace: Namespace,
2026        public_flow_state: String,
2027        graph_revision: u64,
2028        node_count: u32,
2029        edge_count: u32,
2030        created_at: TimestampMs,
2031        last_mutation_at: TimestampMs,
2032        cancelled_at: Option<TimestampMs>,
2033        cancel_reason: Option<String>,
2034        cancellation_policy: Option<String>,
2035        tags: BTreeMap<String, String>,
2036        edge_groups: Vec<EdgeGroupSnapshot>,
2037    ) -> Self {
2038        Self {
2039            flow_id,
2040            flow_kind,
2041            namespace,
2042            public_flow_state,
2043            graph_revision,
2044            node_count,
2045            edge_count,
2046            created_at,
2047            last_mutation_at,
2048            cancelled_at,
2049            cancel_reason,
2050            cancellation_policy,
2051            tags,
2052            edge_groups,
2053        }
2054    }
2055}
2056
2057// ─── describe_edge / list_*_edges (issue #58.3) ───
2058
2059/// Engine-decoupled read-model for one dependency edge.
2060///
2061/// Returned by `ff_sdk::FlowFabricWorker::describe_edge`,
2062/// `list_incoming_edges`, and `list_outgoing_edges`. Consumers consult
2063/// this struct instead of reaching into Valkey's per-flow `edge:` hash
2064/// directly — the engine is free to rename hash fields or restructure
2065/// key layout under this surface.
2066///
2067/// `#[non_exhaustive]` — FF may add fields in minor releases without a
2068/// semver break. Match with `..` or use [`EdgeSnapshot::new`].
2069///
2070/// # Fields
2071///
2072/// The struct mirrors the immutable edge record written by
2073/// `ff_stage_dependency_edge` (see `lua/flow.lua`). The flow-scoped
2074/// edge hash is only ever written once, at staging time; per-execution
2075/// resolution state lives on a separate `dep:<edge_id>` hash and is not
2076/// surfaced here. The `edge_state` field therefore reflects the
2077/// staging-time literal (currently `pending`), not the downstream
2078/// execution's dep-edge state.
2079#[derive(Clone, Debug, PartialEq, Eq)]
2080#[non_exhaustive]
2081pub struct EdgeSnapshot {
2082    pub edge_id: EdgeId,
2083    pub flow_id: FlowId,
2084    pub upstream_execution_id: ExecutionId,
2085    pub downstream_execution_id: ExecutionId,
2086    /// The `dependency_kind` literal (e.g. `success_only`) from
2087    /// `stage_dependency_edge`. Preserved as-is; FF does not interpret
2088    /// it on reads.
2089    pub dependency_kind: String,
2090    /// The satisfaction-condition literal stamped at staging time
2091    /// (e.g. `all_required`).
2092    pub satisfaction_condition: String,
2093    /// Optional opaque handle to a data-passing artifact. `None` when
2094    /// the stored field is empty (the most common case).
2095    pub data_passing_ref: Option<String>,
2096    /// Edge-state literal on the flow-scoped edge hash. Written once
2097    /// at staging as `pending`; this hash is immutable on the flow
2098    /// side. Per-execution resolution state is tracked separately on
2099    /// the child's `dep:<edge_id>` hash.
2100    pub edge_state: String,
2101    pub created_at: TimestampMs,
2102    /// Origin of the edge (e.g. `engine`). Preserved as-is.
2103    pub created_by: String,
2104}
2105
2106/// Direction marker for [`crate::engine_backend::EngineBackend::list_edges`].
2107///
2108/// Carries the subject execution whose adjacency side the caller wants
2109/// to list — mirrors the internal `AdjacencySide + subject_eid` pair
2110/// the ff-sdk free-fn `list_edges_from_set` already uses. Keeping
2111/// direction + subject fused in one enum means the trait method has a
2112/// single `direction` parameter rather than a `(side, eid)` pair, and
2113/// the backend impl can't forget one of the two.
2114///
2115/// * `Outgoing { from_node }` — the caller wants every edge whose
2116///   `upstream_execution_id == from_node`. Corresponds to the
2117///   `out:<execution_id>` adjacency SET under the execution's flow
2118///   partition.
2119/// * `Incoming { to_node }` — the caller wants every edge whose
2120///   `downstream_execution_id == to_node`. Corresponds to the
2121///   `in:<execution_id>` adjacency SET under the execution's flow
2122///   partition.
2123#[derive(Clone, Debug, PartialEq, Eq)]
2124pub enum EdgeDirection {
2125    /// Edges leaving `from_node` — `out:` adjacency SET.
2126    Outgoing {
2127        /// The subject execution whose outgoing edges to list.
2128        from_node: ExecutionId,
2129    },
2130    /// Edges landing on `to_node` — `in:` adjacency SET.
2131    Incoming {
2132        /// The subject execution whose incoming edges to list.
2133        to_node: ExecutionId,
2134    },
2135}
2136
2137impl EdgeDirection {
2138    /// Return the subject execution id regardless of direction. Shared
2139    /// helper for backend impls that need the execution id for the
2140    /// initial `HGET exec_core.flow_id` lookup (flow routing) before
2141    /// they know which adjacency SET to read.
2142    pub fn subject(&self) -> &ExecutionId {
2143        match self {
2144            Self::Outgoing { from_node } => from_node,
2145            Self::Incoming { to_node } => to_node,
2146        }
2147    }
2148}
2149
2150impl EdgeSnapshot {
2151    /// Construct an [`EdgeSnapshot`]. Present so downstream crates
2152    /// (ff-sdk's `describe_edge` / `list_*_edges`) can assemble the
2153    /// struct despite the `#[non_exhaustive]` marker.
2154    #[allow(clippy::too_many_arguments)]
2155    pub fn new(
2156        edge_id: EdgeId,
2157        flow_id: FlowId,
2158        upstream_execution_id: ExecutionId,
2159        downstream_execution_id: ExecutionId,
2160        dependency_kind: String,
2161        satisfaction_condition: String,
2162        data_passing_ref: Option<String>,
2163        edge_state: String,
2164        created_at: TimestampMs,
2165        created_by: String,
2166    ) -> Self {
2167        Self {
2168            edge_id,
2169            flow_id,
2170            upstream_execution_id,
2171            downstream_execution_id,
2172            dependency_kind,
2173            satisfaction_condition,
2174            data_passing_ref,
2175            edge_state,
2176            created_at,
2177            created_by,
2178        }
2179    }
2180}
2181
2182// ─── RFC-016 edge-group policy (Stage A) ───
2183
2184/// Policy controlling how an inbound edge group's satisfaction is
2185/// decided.
2186///
2187/// Stage A honours only [`EdgeDependencyPolicy::AllOf`] — the two
2188/// quorum variants exist so the wire/snapshot surface is stable for
2189/// Stage B/C/D's resolver extensions, but
2190/// [`crate::engine_backend::EngineBackend::set_edge_group_policy`]
2191/// rejects them with [`crate::engine_error::EngineError::Validation`]
2192/// until Stage B lands.
2193///
2194/// `#[non_exhaustive]` — future stages may add variants (e.g.
2195/// `Threshold` — see RFC-016 §10.3) without a semver break. Construct
2196/// via the [`EdgeDependencyPolicy::all_of`], [`EdgeDependencyPolicy::any_of`],
2197/// and [`EdgeDependencyPolicy::quorum`] helpers.
2198#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2199#[non_exhaustive]
2200#[serde(tag = "kind", rename_all = "snake_case")]
2201pub enum EdgeDependencyPolicy {
2202    /// Today's behavior: every edge in the inbound group must be
2203    /// satisfied (RFC-007 `all_required` + `success_only`).
2204    AllOf,
2205    /// k-of-n where k==1 — satisfied on the first upstream success.
2206    /// Stage A: rejected on
2207    /// [`crate::engine_backend::EngineBackend::set_edge_group_policy`];
2208    /// resolver emits nothing for this variant yet.
2209    AnyOf {
2210        #[serde(rename = "on_satisfied")]
2211        on_satisfied: OnSatisfied,
2212    },
2213    /// k-of-n quorum. Stage A: rejected on
2214    /// [`crate::engine_backend::EngineBackend::set_edge_group_policy`].
2215    Quorum {
2216        k: u32,
2217        #[serde(rename = "on_satisfied")]
2218        on_satisfied: OnSatisfied,
2219    },
2220}
2221
2222impl EdgeDependencyPolicy {
2223    /// Construct the default all-of policy (RFC-007 behavior).
2224    pub fn all_of() -> Self {
2225        Self::AllOf
2226    }
2227
2228    /// Construct an any-of policy — reserved for Stage B.
2229    pub fn any_of(on_satisfied: OnSatisfied) -> Self {
2230        Self::AnyOf { on_satisfied }
2231    }
2232
2233    /// Construct a quorum policy — reserved for Stage B.
2234    pub fn quorum(k: u32, on_satisfied: OnSatisfied) -> Self {
2235        Self::Quorum { k, on_satisfied }
2236    }
2237
2238    /// Stable string label used for wire format + metric labels.
2239    /// `all_of` | `any_of` | `quorum`.
2240    pub fn variant_str(&self) -> &'static str {
2241        match self {
2242            Self::AllOf => "all_of",
2243            Self::AnyOf { .. } => "any_of",
2244            Self::Quorum { .. } => "quorum",
2245        }
2246    }
2247}
2248
2249/// Policy for unfinished sibling upstreams once the quorum is met.
2250///
2251/// `#[non_exhaustive]` — RFC-016 §10.5 rejects a third variant today
2252/// but keeps the door open. Construct via [`OnSatisfied::cancel_remaining`]
2253/// / [`OnSatisfied::let_run`].
2254#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2255#[non_exhaustive]
2256#[serde(rename_all = "snake_case")]
2257pub enum OnSatisfied {
2258    /// Default. Cancel any still-running siblings once quorum met.
2259    CancelRemaining,
2260    /// Let stragglers finish; their terminals update counters for
2261    /// observability only (one-shot downstream).
2262    LetRun,
2263}
2264
2265impl OnSatisfied {
2266    /// Construct the default `cancel_remaining` disposition.
2267    pub fn cancel_remaining() -> Self {
2268        Self::CancelRemaining
2269    }
2270
2271    /// Construct the `let_run` disposition.
2272    pub fn let_run() -> Self {
2273        Self::LetRun
2274    }
2275
2276    /// Stable string label for wire format.
2277    pub fn variant_str(&self) -> &'static str {
2278        match self {
2279            Self::CancelRemaining => "cancel_remaining",
2280            Self::LetRun => "let_run",
2281        }
2282    }
2283}
2284
2285/// Edge-group lifecycle state (Stage A exposes only `pending` +
2286/// `satisfied` + `impossible`; `cancelled` reserved for Stage C).
2287#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
2288#[non_exhaustive]
2289#[serde(rename_all = "snake_case")]
2290pub enum EdgeGroupState {
2291    Pending,
2292    Satisfied,
2293    Impossible,
2294    Cancelled,
2295}
2296
2297impl EdgeGroupState {
2298    pub fn from_literal(s: &str) -> Self {
2299        match s {
2300            "satisfied" => Self::Satisfied,
2301            "impossible" => Self::Impossible,
2302            "cancelled" => Self::Cancelled,
2303            _ => Self::Pending,
2304        }
2305    }
2306
2307    pub fn as_str(&self) -> &'static str {
2308        match self {
2309            Self::Pending => "pending",
2310            Self::Satisfied => "satisfied",
2311            Self::Impossible => "impossible",
2312            Self::Cancelled => "cancelled",
2313        }
2314    }
2315}
2316
2317/// Snapshot of one inbound edge group (per downstream execution).
2318///
2319/// Exposed via [`FlowSnapshot::edge_groups`]. Stage A only populates
2320/// `AllOf` groups and their counters; Stage B/C add `failed` /
2321/// `skipped` / `satisfied_at` wiring for the quorum variants.
2322///
2323/// `#[non_exhaustive]` — future stages will add fields (`satisfied_at`,
2324/// `failed_count` write-path, `cancel_siblings_pending`). Match with
2325/// `..` or use [`EdgeGroupSnapshot::new`].
2326#[derive(Clone, Debug, PartialEq, Eq)]
2327#[non_exhaustive]
2328pub struct EdgeGroupSnapshot {
2329    pub downstream_execution_id: ExecutionId,
2330    pub policy: EdgeDependencyPolicy,
2331    pub total_deps: u32,
2332    pub satisfied_count: u32,
2333    pub failed_count: u32,
2334    pub skipped_count: u32,
2335    pub running_count: u32,
2336    pub group_state: EdgeGroupState,
2337}
2338
2339impl EdgeGroupSnapshot {
2340    #[allow(clippy::too_many_arguments)]
2341    pub fn new(
2342        downstream_execution_id: ExecutionId,
2343        policy: EdgeDependencyPolicy,
2344        total_deps: u32,
2345        satisfied_count: u32,
2346        failed_count: u32,
2347        skipped_count: u32,
2348        running_count: u32,
2349        group_state: EdgeGroupState,
2350    ) -> Self {
2351        Self {
2352            downstream_execution_id,
2353            policy,
2354            total_deps,
2355            satisfied_count,
2356            failed_count,
2357            skipped_count,
2358            running_count,
2359            group_state,
2360        }
2361    }
2362}
2363
2364// ─── set_edge_group_policy (RFC-016 §6.1) ───
2365
2366#[derive(Clone, Debug, Serialize, Deserialize)]
2367pub struct SetEdgeGroupPolicyArgs {
2368    pub flow_id: FlowId,
2369    pub downstream_execution_id: ExecutionId,
2370    pub policy: EdgeDependencyPolicy,
2371    pub now: TimestampMs,
2372}
2373
2374#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2375pub enum SetEdgeGroupPolicyResult {
2376    /// Policy stored (fresh write).
2377    Set,
2378    /// Policy already stored with an identical value (idempotent).
2379    AlreadySet,
2380}
2381
2382// ─── list_flows (issue #185) ───
2383
2384/// Typed flow-lifecycle status surfaced on [`FlowSummary`].
2385///
2386/// Mirrors the free-form `public_flow_state` literal that FF's flow
2387/// lifecycle writes onto `flow_core` (known values: `open`, `running`,
2388/// `blocked`, `cancelled`, `completed`, `failed` — see [`FlowSnapshot`]).
2389/// The three "active" runtime states (`open`, `running`, `blocked`)
2390/// collapse to [`FlowStatus::Active`] here — callers that need the
2391/// exact runtime sub-state should use [`FlowSnapshot::public_flow_state`]
2392/// via [`crate::engine_backend::EngineBackend::describe_flow`]. `failed`
2393/// maps to [`FlowStatus::Failed`].
2394///
2395/// `#[non_exhaustive]` so future lifecycle states (if FF introduces
2396/// any) can be added without a semver break.
2397#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
2398#[non_exhaustive]
2399pub enum FlowStatus {
2400    /// `open` / `running` / `blocked` — flow is still live on the engine.
2401    Active,
2402    /// Terminal success: all members reached a successful terminal state
2403    /// and the flow projector flipped `public_flow_state` to `completed`.
2404    Completed,
2405    /// Terminal failure: one or more members failed and the flow
2406    /// projector flipped `public_flow_state` to `failed`.
2407    Failed,
2408    /// Cancelled by an operator via `cancel_flow`.
2409    Cancelled,
2410    /// The stored `public_flow_state` literal is present but not a
2411    /// known value. The raw literal is preserved on
2412    /// [`FlowSnapshot::public_flow_state`] — callers that need to act
2413    /// on it should fall back to [`crate::engine_backend::EngineBackend::describe_flow`].
2414    Unknown,
2415}
2416
2417impl FlowStatus {
2418    /// Map the raw `public_flow_state` literal stored on `flow_core`
2419    /// to a typed [`FlowStatus`]. Unknown literals surface as
2420    /// [`FlowStatus::Unknown`] so the list surface stays forwards-
2421    /// compatible with future engine-side state additions.
2422    pub fn from_public_flow_state(raw: &str) -> Self {
2423        match raw {
2424            "open" | "running" | "blocked" => Self::Active,
2425            "completed" => Self::Completed,
2426            "failed" => Self::Failed,
2427            "cancelled" => Self::Cancelled,
2428            _ => Self::Unknown,
2429        }
2430    }
2431}
2432
2433/// Lightweight per-flow projection returned by
2434/// [`crate::engine_backend::EngineBackend::list_flows`].
2435///
2436/// Deliberately narrower than [`FlowSnapshot`] — listing pages serve
2437/// dashboard-style enumerations where the caller does not want to pay
2438/// for the full `flow_core` hash on every row. Consumers that need
2439/// revision / node-count / tags / cancel metadata should fan out to
2440/// [`crate::engine_backend::EngineBackend::describe_flow`] for the
2441/// specific ids they care about.
2442///
2443/// `#[non_exhaustive]` — FF may add fields in minor releases without
2444/// a semver break. Match with `..` or use [`FlowSummary::new`].
2445#[derive(Clone, Debug, PartialEq, Eq)]
2446#[non_exhaustive]
2447pub struct FlowSummary {
2448    pub flow_id: FlowId,
2449    /// Timestamp (ms since unix epoch) `flow_core.created_at` was
2450    /// stamped. Mirrors [`FlowSnapshot::created_at`]; kept typed so
2451    /// callers that want raw millis can read `.0`.
2452    pub created_at: TimestampMs,
2453    /// Typed projection of `flow_core.public_flow_state`. See
2454    /// [`FlowStatus`] for the mapping.
2455    pub status: FlowStatus,
2456}
2457
2458impl FlowSummary {
2459    /// Construct a [`FlowSummary`]. Present so downstream crates can
2460    /// assemble the struct despite the `#[non_exhaustive]` marker.
2461    pub fn new(flow_id: FlowId, created_at: TimestampMs, status: FlowStatus) -> Self {
2462        Self {
2463            flow_id,
2464            created_at,
2465            status,
2466        }
2467    }
2468}
2469
2470/// One page of [`FlowSummary`] rows returned by
2471/// [`crate::engine_backend::EngineBackend::list_flows`].
2472///
2473/// `next_cursor` is `Some(last_flow_id)` when at least one more row
2474/// may exist on the partition — callers forward it verbatim as the
2475/// next call's `cursor` argument to continue iteration. `None` means
2476/// the listing is exhausted. Cursor semantics match the Postgres
2477/// `WHERE flow_id > $cursor ORDER BY flow_id LIMIT $limit` pattern
2478/// (see the trait method's rustdoc).
2479///
2480/// `#[non_exhaustive]` — FF may add summary-level fields (total count,
2481/// partition hint) in future minor releases without a semver break.
2482#[derive(Clone, Debug, PartialEq, Eq)]
2483#[non_exhaustive]
2484pub struct ListFlowsPage {
2485    pub flows: Vec<FlowSummary>,
2486    pub next_cursor: Option<FlowId>,
2487}
2488
2489impl ListFlowsPage {
2490    /// Construct a [`ListFlowsPage`]. Present so downstream crates can
2491    /// assemble the struct despite the `#[non_exhaustive]` marker.
2492    pub fn new(flows: Vec<FlowSummary>, next_cursor: Option<FlowId>) -> Self {
2493        Self { flows, next_cursor }
2494    }
2495}
2496
2497/// Summary of state after a mutation, returned by many functions.
2498#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2499pub struct StateSummary {
2500    pub state_vector: StateVector,
2501    pub current_attempt_index: AttemptIndex,
2502}
2503
2504#[cfg(test)]
2505mod tests {
2506    use super::*;
2507    use crate::types::FlowId;
2508
2509    #[test]
2510    fn create_execution_args_serde() {
2511        let config = crate::partition::PartitionConfig::default();
2512        let args = CreateExecutionArgs {
2513            execution_id: ExecutionId::for_flow(&FlowId::new(), &config),
2514            namespace: Namespace::new("test"),
2515            lane_id: LaneId::new("default"),
2516            execution_kind: "llm_call".to_owned(),
2517            input_payload: b"hello".to_vec(),
2518            payload_encoding: Some("json".to_owned()),
2519            priority: 0,
2520            creator_identity: "test-user".to_owned(),
2521            idempotency_key: None,
2522            tags: HashMap::new(),
2523            policy: None,
2524            delay_until: None,
2525            execution_deadline_at: None,
2526            partition_id: 42,
2527            now: TimestampMs::now(),
2528        };
2529        let json = serde_json::to_string(&args).unwrap();
2530        let parsed: CreateExecutionArgs = serde_json::from_str(&json).unwrap();
2531        assert_eq!(args.execution_id, parsed.execution_id);
2532    }
2533
2534    #[test]
2535    fn claim_result_serde() {
2536        let config = crate::partition::PartitionConfig::default();
2537        let result = ClaimExecutionResult::Claimed(ClaimedExecution {
2538            execution_id: ExecutionId::for_flow(&FlowId::new(), &config),
2539            lease_id: LeaseId::new(),
2540            lease_epoch: LeaseEpoch::new(1),
2541            attempt_index: AttemptIndex::new(0),
2542            attempt_id: AttemptId::new(),
2543            attempt_type: AttemptType::Initial,
2544            lease_expires_at: TimestampMs::from_millis(1000),
2545        });
2546        let json = serde_json::to_string(&result).unwrap();
2547        let parsed: ClaimExecutionResult = serde_json::from_str(&json).unwrap();
2548        assert_eq!(result, parsed);
2549    }
2550
2551    // ── StreamCursor (issue #92) ──
2552
2553    #[test]
2554    fn stream_cursor_display_matches_wire_tokens() {
2555        assert_eq!(StreamCursor::Start.to_string(), "start");
2556        assert_eq!(StreamCursor::End.to_string(), "end");
2557        assert_eq!(StreamCursor::At("123".into()).to_string(), "123");
2558        assert_eq!(StreamCursor::At("123-4".into()).to_string(), "123-4");
2559    }
2560
2561    #[test]
2562    fn stream_cursor_to_wire_maps_to_valkey_markers() {
2563        assert_eq!(StreamCursor::Start.to_wire(), "-");
2564        assert_eq!(StreamCursor::End.to_wire(), "+");
2565        assert_eq!(StreamCursor::At("0-0".into()).to_wire(), "0-0");
2566        assert_eq!(StreamCursor::At("17-3".into()).to_wire(), "17-3");
2567    }
2568
2569    #[test]
2570    fn stream_cursor_from_str_accepts_wire_tokens() {
2571        use std::str::FromStr;
2572        assert_eq!(
2573            StreamCursor::from_str("start").unwrap(),
2574            StreamCursor::Start
2575        );
2576        assert_eq!(StreamCursor::from_str("end").unwrap(), StreamCursor::End);
2577        assert_eq!(
2578            StreamCursor::from_str("123").unwrap(),
2579            StreamCursor::At("123".into())
2580        );
2581        assert_eq!(
2582            StreamCursor::from_str("0-0").unwrap(),
2583            StreamCursor::At("0-0".into())
2584        );
2585        assert_eq!(
2586            StreamCursor::from_str("1713100800150-0").unwrap(),
2587            StreamCursor::At("1713100800150-0".into())
2588        );
2589    }
2590
2591    #[test]
2592    fn stream_cursor_from_str_rejects_bare_markers() {
2593        use std::str::FromStr;
2594        assert!(matches!(
2595            StreamCursor::from_str("-"),
2596            Err(StreamCursorParseError::BareMarkerRejected(s)) if s == "-"
2597        ));
2598        assert!(matches!(
2599            StreamCursor::from_str("+"),
2600            Err(StreamCursorParseError::BareMarkerRejected(s)) if s == "+"
2601        ));
2602    }
2603
2604    #[test]
2605    fn stream_cursor_from_str_rejects_empty() {
2606        use std::str::FromStr;
2607        assert_eq!(
2608            StreamCursor::from_str(""),
2609            Err(StreamCursorParseError::Empty)
2610        );
2611    }
2612
2613    #[test]
2614    fn stream_cursor_from_str_rejects_malformed() {
2615        use std::str::FromStr;
2616        for bad in [
2617            "abc", "-1", "1-", "-1-2", "1-2-3", "1.2", "1 2", "Start", "END",
2618        ] {
2619            assert!(
2620                matches!(
2621                    StreamCursor::from_str(bad),
2622                    Err(StreamCursorParseError::Malformed(_))
2623                ),
2624                "must reject {bad:?}",
2625            );
2626        }
2627    }
2628
2629    #[test]
2630    fn stream_cursor_from_str_rejects_non_ascii() {
2631        use std::str::FromStr;
2632        assert!(matches!(
2633            StreamCursor::from_str("1\u{2013}2"),
2634            Err(StreamCursorParseError::Malformed(_))
2635        ));
2636    }
2637
2638    #[test]
2639    fn stream_cursor_serde_round_trip() {
2640        for c in [
2641            StreamCursor::Start,
2642            StreamCursor::End,
2643            StreamCursor::At("0-0".into()),
2644            StreamCursor::At("1713100800150-0".into()),
2645        ] {
2646            let json = serde_json::to_string(&c).unwrap();
2647            let back: StreamCursor = serde_json::from_str(&json).unwrap();
2648            assert_eq!(back, c);
2649        }
2650    }
2651
2652    #[test]
2653    fn stream_cursor_serializes_as_bare_string() {
2654        assert_eq!(
2655            serde_json::to_string(&StreamCursor::Start).unwrap(),
2656            r#""start""#
2657        );
2658        assert_eq!(
2659            serde_json::to_string(&StreamCursor::End).unwrap(),
2660            r#""end""#
2661        );
2662        assert_eq!(
2663            serde_json::to_string(&StreamCursor::At("123-0".into())).unwrap(),
2664            r#""123-0""#
2665        );
2666    }
2667
2668    #[test]
2669    fn stream_cursor_deserialize_rejects_bare_markers() {
2670        assert!(serde_json::from_str::<StreamCursor>(r#""-""#).is_err());
2671        assert!(serde_json::from_str::<StreamCursor>(r#""+""#).is_err());
2672    }
2673
2674    #[test]
2675    fn stream_cursor_from_beginning_is_zero_zero() {
2676        assert_eq!(
2677            StreamCursor::from_beginning(),
2678            StreamCursor::At("0-0".into())
2679        );
2680    }
2681
2682    #[test]
2683    fn stream_cursor_is_concrete_classifies_variants() {
2684        assert!(!StreamCursor::Start.is_concrete());
2685        assert!(!StreamCursor::End.is_concrete());
2686        assert!(StreamCursor::At("0-0".into()).is_concrete());
2687        assert!(StreamCursor::At("123-0".into()).is_concrete());
2688        assert!(StreamCursor::from_beginning().is_concrete());
2689    }
2690
2691    #[test]
2692    fn stream_cursor_into_wire_string_moves_without_cloning() {
2693        assert_eq!(StreamCursor::Start.into_wire_string(), "-");
2694        assert_eq!(StreamCursor::End.into_wire_string(), "+");
2695        assert_eq!(StreamCursor::At("17-3".into()).into_wire_string(), "17-3");
2696    }
2697}
2698
2699// ─── list_executions ───
2700
2701/// Summary of an execution for list views.
2702#[derive(Clone, Debug, Serialize, Deserialize)]
2703pub struct ExecutionSummary {
2704    pub execution_id: ExecutionId,
2705    pub namespace: String,
2706    pub lane_id: String,
2707    pub execution_kind: String,
2708    pub public_state: String,
2709    pub priority: i32,
2710    pub created_at: String,
2711}
2712
2713/// Result of a list_executions query.
2714#[derive(Clone, Debug, Serialize, Deserialize)]
2715pub struct ListExecutionsResult {
2716    pub executions: Vec<ExecutionSummary>,
2717    pub total_returned: usize,
2718}
2719
2720// ─── list_lanes (issue #184) ───
2721
2722/// One page of lane ids returned by
2723/// [`crate::engine_backend::EngineBackend::list_lanes`].
2724///
2725/// Lanes are global (not partition-scoped) — the backend enumerates
2726/// every registered lane, sorts by [`LaneId`] name, and returns a
2727/// `limit`-sized slice starting after `cursor` (exclusive).
2728///
2729/// `next_cursor` is `Some(last_lane_in_page)` when more pages remain
2730/// and `None` when the caller has read the final page. Callers that
2731/// want the full list loop until `next_cursor` is `None`, threading
2732/// the previous page's `next_cursor` into the next call's `cursor`
2733/// argument.
2734///
2735/// `#[non_exhaustive]` — FF may add fields (e.g. a `total` hint) in
2736/// minor releases without a semver break.
2737#[derive(Clone, Debug, PartialEq, Eq)]
2738#[non_exhaustive]
2739pub struct ListLanesPage {
2740    /// The lanes in this page, sorted by [`LaneId`] name.
2741    pub lanes: Vec<LaneId>,
2742    /// Cursor for the next page (exclusive). `None` ⇒ final page.
2743    pub next_cursor: Option<LaneId>,
2744}
2745
2746impl ListLanesPage {
2747    /// Construct a [`ListLanesPage`]. Present so downstream crates
2748    /// (ff-backend-valkey's `list_lanes` impl) can assemble the
2749    /// struct despite the `#[non_exhaustive]` marker.
2750    pub fn new(lanes: Vec<LaneId>, next_cursor: Option<LaneId>) -> Self {
2751        Self { lanes, next_cursor }
2752    }
2753}
2754
2755// ─── list_suspended ───
2756
2757/// One entry in a [`ListSuspendedPage`] — a suspended execution and
2758/// the reason it is blocked, answering an operator's "what's this
2759/// waiting on?" without a follow-up round-trip.
2760///
2761/// `reason` carries the free-form `reason_code` recorded by the
2762/// suspending worker at `lua/suspension.lua` (HSET `suspension:current
2763/// reason_code`). It is a `String`, not a closed enum: the suspension
2764/// pipeline accepts arbitrary caller-supplied codes (typical values
2765/// are `"signal"`, `"timer"`, `"children"`, `"join"`, but consumers
2766/// embed bespoke codes). A future enum projection can classify
2767/// known codes once the set is frozen; until then, callers that want
2768/// structured routing MUST match on the string explicitly.
2769#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2770#[non_exhaustive]
2771pub struct SuspendedExecutionEntry {
2772    /// Execution currently in `lifecycle_phase=suspended`.
2773    pub execution_id: ExecutionId,
2774    /// Score stored on the per-lane suspended ZSET — the scheduled
2775    /// `timeout_at` in milliseconds, or the `9999999999999` sentinel
2776    /// when no timeout was set (see `lua/suspension.lua`).
2777    pub suspended_at_ms: i64,
2778    /// Free-form reason code from `suspension:current.reason_code`.
2779    /// Empty string when the suspension hash is absent or does not
2780    /// carry a `reason_code` field (older records). See the struct
2781    /// rustdoc for the deliberate-String rationale.
2782    pub reason: String,
2783}
2784
2785impl SuspendedExecutionEntry {
2786    /// Construct a new entry. Preferred over direct field init for
2787    /// `#[non_exhaustive]` forward-compat.
2788    pub fn new(execution_id: ExecutionId, suspended_at_ms: i64, reason: String) -> Self {
2789        Self {
2790            execution_id,
2791            suspended_at_ms,
2792            reason,
2793        }
2794    }
2795}
2796
2797/// One cursor-paginated page of suspended executions.
2798///
2799/// Pagination is cursor-based (not offset/limit) so a Valkey backend
2800/// can resume a partition scan from the last seen execution id and a
2801/// future Postgres backend can do keyset pagination on
2802/// `executions WHERE state='suspended'`. The cursor is opaque to
2803/// callers: pass `next_cursor` back as the `cursor` argument to the
2804/// next [`EngineBackend::list_suspended`] call to fetch the next
2805/// page. `None` means the stream is exhausted.
2806///
2807/// [`EngineBackend::list_suspended`]: crate::engine_backend::EngineBackend::list_suspended
2808#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2809#[non_exhaustive]
2810pub struct ListSuspendedPage {
2811    /// Entries on this page, ordered by ascending `suspended_at_ms`
2812    /// (timeout order) with `execution_id` as a lex tiebreak.
2813    pub entries: Vec<SuspendedExecutionEntry>,
2814    /// Resume-point for the next page. `None` when no further
2815    /// entries remain in the partition.
2816    pub next_cursor: Option<ExecutionId>,
2817}
2818
2819impl ListSuspendedPage {
2820    /// Construct a new page. Preferred over direct field init for
2821    /// `#[non_exhaustive]` forward-compat.
2822    pub fn new(entries: Vec<SuspendedExecutionEntry>, next_cursor: Option<ExecutionId>) -> Self {
2823        Self {
2824            entries,
2825            next_cursor,
2826        }
2827    }
2828}
2829
2830// ─── list_executions ───
2831
2832/// One page of partition-scoped execution ids returned by
2833/// [`EngineBackend::list_executions`](crate::engine_backend::EngineBackend::list_executions).
2834///
2835/// Pagination is forward-only and cursor-based. `next_cursor` carries
2836/// the last `ExecutionId` emitted in `executions` iff another page is
2837/// available; callers pass that id back as the next call's `cursor`
2838/// (exclusive start). `next_cursor = None` signals end-of-stream.
2839///
2840/// `#[non_exhaustive]` — FF may add fields (e.g. `approximate_total`)
2841/// in minor releases without a semver break. Use
2842/// [`ListExecutionsPage::new`] for cross-crate construction.
2843#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2844#[non_exhaustive]
2845pub struct ListExecutionsPage {
2846    /// Execution ids on this page, in ascending lexicographic order.
2847    pub executions: Vec<ExecutionId>,
2848    /// Exclusive cursor to request the next page. `None` ⇒ no more
2849    /// results.
2850    pub next_cursor: Option<ExecutionId>,
2851}
2852
2853impl ListExecutionsPage {
2854    /// Construct a [`ListExecutionsPage`]. Present so downstream
2855    /// crates can assemble the struct despite the `#[non_exhaustive]`
2856    /// marker.
2857    pub fn new(executions: Vec<ExecutionId>, next_cursor: Option<ExecutionId>) -> Self {
2858        Self { executions, next_cursor }
2859    }
2860}
2861
2862// ─── rotate_waitpoint_hmac_secret ───
2863
2864/// Args for `ff_rotate_waitpoint_hmac_secret`. Rotates the HMAC signing
2865/// kid on ONE partition. Callers fan out across every partition themselves
2866/// (ff-server does the parallel fan-out in `rotate_waitpoint_secret`;
2867/// direct-Valkey consumers mirror the pattern).
2868///
2869/// "now" is derived server-side from `redis.call("TIME")` inside the FCALL
2870/// (consistency with `validate_waitpoint_token` and flow scanners).
2871/// `grace_ms` is a duration, not a clock value, so carrying it from the
2872/// caller is safe.
2873#[derive(Clone, Debug)]
2874pub struct RotateWaitpointHmacSecretArgs {
2875    pub new_kid: String,
2876    pub new_secret_hex: String,
2877    /// Grace window in ms. Must be non-negative. Tokens signed by the
2878    /// outgoing kid remain valid for `grace_ms` after this rotation.
2879    pub grace_ms: u64,
2880}
2881
2882/// Outcome of a single-partition rotation.
2883#[derive(Clone, Debug, PartialEq, Eq)]
2884pub enum RotateWaitpointHmacSecretOutcome {
2885    /// Installed the new kid. `previous_kid` is `None` on bootstrap
2886    /// (no prior `current_kid`). `gc_count` counts expired kids reaped
2887    /// during this rotation.
2888    Rotated {
2889        previous_kid: Option<String>,
2890        new_kid: String,
2891        gc_count: u32,
2892    },
2893    /// Exact replay — same kid + same secret already installed. Safe
2894    /// operator retry; no state change.
2895    Noop { kid: String },
2896}
2897
2898// ─── list_waitpoint_hmac_kids ───
2899
2900#[derive(Clone, Debug, PartialEq, Eq)]
2901pub struct ListWaitpointHmacKidsArgs {}
2902
2903/// Snapshot of the waitpoint HMAC keystore on ONE partition.
2904#[derive(Clone, Debug, PartialEq, Eq)]
2905pub struct WaitpointHmacKids {
2906    /// The currently-signing kid. `None` if uninitialized.
2907    pub current_kid: Option<String>,
2908    /// Kids that still validate existing tokens but no longer sign
2909    /// new ones. Order is Lua HGETALL traversal order — callers that
2910    /// need a stable sort should sort by `expires_at_ms`.
2911    pub verifying: Vec<VerifyingKid>,
2912}
2913
2914#[derive(Clone, Debug, PartialEq, Eq)]
2915pub struct VerifyingKid {
2916    pub kid: String,
2917    pub expires_at_ms: i64,
2918}
2919
2920// ═══════════════════════════════════════════════════════════════════════
2921// RFC-013 Stage 1d: EngineBackend::suspend typed args + outcome
2922// ═══════════════════════════════════════════════════════════════════════
2923//
2924// `SuspendExecutionArgs` / `SuspendExecutionResult` above remain the
2925// wire-level Lua-ARGV mirror used by the backend serializer. The types
2926// below are the public trait-surface shapes RFC-013 §2.2–§2.6 specifies.
2927//
2928// Every type in this block is `#[non_exhaustive]` per the RFC §2.2.1
2929// memory-rule compliance note; each gets a constructor so external-crate
2930// consumers can build them without struct-literal access.
2931
2932use crate::backend::WaitpointHmac;
2933
2934/// Partition-scoped idempotency key for retry-safe `EngineBackend::suspend`.
2935///
2936/// See RFC-013 §2.2 — when set on [`SuspendArgs::idempotency_key`], the
2937/// backend dedups the call on `(partition, execution_id, idempotency_key)`
2938/// and a second `suspend` with the same triple returns the first call's
2939/// [`SuspendOutcome`] verbatim. Absent a key, `suspend` is NOT retry-
2940/// idempotent; callers must describe-and-reconcile per §3.1.
2941///
2942/// Follows the `UsageDimensions::dedup_key` pattern — opaque to the
2943/// engine, byte-compared at the partition scope.
2944#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
2945#[serde(transparent)]
2946pub struct IdempotencyKey(String);
2947
2948impl IdempotencyKey {
2949    /// Construct from any stringy input. Empty strings are accepted;
2950    /// the backend treats an empty key as "no dedup" at the serialize
2951    /// step so `Some(IdempotencyKey::new(""))` is functionally the same
2952    /// as `None`.
2953    pub fn new(key: impl Into<String>) -> Self {
2954        Self(key.into())
2955    }
2956
2957    /// Borrow the underlying string.
2958    pub fn as_str(&self) -> &str {
2959        &self.0
2960    }
2961}
2962
2963impl std::fmt::Display for IdempotencyKey {
2964    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2965        f.write_str(&self.0)
2966    }
2967}
2968
2969/// v1 signal-match predicate inside [`ResumeCondition::Single`].
2970///
2971/// RFC-013 §2.4 — `ByName(String)` matches a single concrete signal
2972/// name; `Wildcard` matches any delivered signal. RFC-014 may extend
2973/// (payload predicates, pattern matching) — `#[non_exhaustive]`.
2974#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2975#[non_exhaustive]
2976pub enum SignalMatcher {
2977    /// Match by exact signal name.
2978    ByName(String),
2979    /// Match any signal delivered to the waitpoint.
2980    Wildcard,
2981}
2982
2983/// Hard cap on composite-condition nesting depth (RFC-014 §5.4
2984/// invariant 4; §5.5 cap rationale). Soft-cap: bumping requires only
2985/// this constant + the cap-rationale paragraph in RFC-014 §5.5 — no
2986/// wire-format change. Keep in sync.
2987pub const MAX_COMPOSITE_DEPTH: usize = 4;
2988
2989/// RFC-013 reserves this enum slot; RFC-014 populates it with the
2990/// concrete composition vocabulary (`AllOf` + `Count`). The enum is
2991/// `#[non_exhaustive]` so RFC-016 or later RFCs may add variants
2992/// (`AnyOf` has been explicitly rejected per RFC-014 §2.3 in favour of
2993/// `Count { n: 1, .. }`; the guard exists for orthogonal future work).
2994#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2995#[serde(tag = "kind")]
2996#[non_exhaustive]
2997pub enum CompositeBody {
2998    /// All listed sub-conditions must be satisfied. Order-independent.
2999    /// Once satisfied, further signals to member waitpoints are observed
3000    /// but do not re-open satisfaction. RFC-014 §2.1.
3001    AllOf {
3002        members: Vec<ResumeCondition>,
3003    },
3004    /// At least `n` distinct satisfiers (by [`CountKind`]) must match.
3005    /// `matcher` optionally constrains participating signals; `None`
3006    /// lets any signal on any of `waitpoints` count. RFC-014 §2.1.
3007    Count {
3008        n: u32,
3009        count_kind: CountKind,
3010        #[serde(default, skip_serializing_if = "Option::is_none")]
3011        matcher: Option<SignalMatcher>,
3012        waitpoints: Vec<String>,
3013    },
3014}
3015
3016/// How `Count` nodes distinguish satisfiers. RFC-014 §2.1 + §3.2.
3017#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
3018#[non_exhaustive]
3019pub enum CountKind {
3020    /// n distinct `waitpoint_id`s in `waitpoints` must fire.
3021    DistinctWaitpoints,
3022    /// n distinct `signal_id`s across the waitpoint set.
3023    DistinctSignals,
3024    /// n distinct `source_type:source_identity` tuples.
3025    DistinctSources,
3026}
3027
3028impl CompositeBody {
3029    /// `AllOf { members }` constructor (RFC-014 §10.3 SDK surface).
3030    pub fn all_of(members: impl IntoIterator<Item = ResumeCondition>) -> Self {
3031        Self::AllOf {
3032            members: members.into_iter().collect(),
3033        }
3034    }
3035
3036    /// `Count` constructor with explicit kind + waitpoint set.
3037    pub fn count(
3038        n: u32,
3039        count_kind: CountKind,
3040        matcher: Option<SignalMatcher>,
3041        waitpoints: impl IntoIterator<Item = String>,
3042    ) -> Self {
3043        Self::Count {
3044            n,
3045            count_kind,
3046            matcher,
3047            waitpoints: waitpoints.into_iter().collect(),
3048        }
3049    }
3050}
3051
3052/// Declarative resume condition for [`SuspendArgs::resume_condition`].
3053///
3054/// RFC-013 §2.4 — typed replacement for the SDK's former
3055/// `ConditionMatcher` / `resume_condition_json` pair.
3056#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
3057#[non_exhaustive]
3058pub enum ResumeCondition {
3059    /// Single waitpoint-key match with a predicate. `matcher` is
3060    /// evaluated against every signal delivered to `waitpoint_key`.
3061    Single {
3062        waitpoint_key: String,
3063        matcher: SignalMatcher,
3064    },
3065    /// Operator-only resume — no signal satisfies; only an explicit
3066    /// operator resume closes the waitpoint.
3067    OperatorOnly,
3068    /// Pure-timeout suspension. No signal satisfier; the waitpoint
3069    /// resolves only via `timeout_behavior` at `timeout_at`. Requires
3070    /// `SuspendArgs::timeout_at` to be `Some(_)` — otherwise the
3071    /// Rust-side validator rejects as `timeout_only_without_deadline`.
3072    TimeoutOnly,
3073    /// Multi-condition composition; RFC-014 defines the body.
3074    Composite(CompositeBody),
3075}
3076
3077/// RFC-014 §5.1 validation error shape. Emitted by
3078/// [`ResumeCondition::validate_composite`] when a composite fails a
3079/// structural / cardinality invariant at suspend-time, before any Valkey
3080/// call. Carries a human-readable `detail` per §5.1.1.
3081#[derive(Clone, Debug, PartialEq, Eq)]
3082pub struct CompositeValidationError {
3083    pub detail: String,
3084}
3085
3086impl CompositeValidationError {
3087    fn new(detail: impl Into<String>) -> Self {
3088        Self {
3089            detail: detail.into(),
3090        }
3091    }
3092}
3093
3094impl ResumeCondition {
3095    /// RFC-014 §10.3 builder — `AllOf` across N distinct waitpoints,
3096    /// each member a `Single { matcher: Wildcard }` leaf. Canonical
3097    /// Pattern 3 shape for heterogeneous-subsystem "all fired"
3098    /// semantics (e.g. `db-migration-complete` + `cache-warmed` +
3099    /// `feature-flag-set`).
3100    ///
3101    /// Callers that need per-waitpoint matchers should construct the
3102    /// tree directly via
3103    /// [`ResumeCondition::Composite(CompositeBody::all_of(..))`].
3104    pub fn all_of_waitpoints<I, S>(waitpoint_keys: I) -> Self
3105    where
3106        I: IntoIterator<Item = S>,
3107        S: Into<String>,
3108    {
3109        let members: Vec<ResumeCondition> = waitpoint_keys
3110            .into_iter()
3111            .map(|k| ResumeCondition::Single {
3112                waitpoint_key: k.into(),
3113                matcher: SignalMatcher::Wildcard,
3114            })
3115            .collect();
3116        ResumeCondition::Composite(CompositeBody::AllOf { members })
3117    }
3118
3119    /// Collect every distinct `waitpoint_key` the condition targets.
3120    /// Used at suspend-time to validate the condition's wp set against
3121    /// `SuspendArgs.waitpoints` (RFC-014 §5.1 multi-binding cross-
3122    /// check). Order follows tree DFS, de-duplicated preserving first
3123    /// occurrence.
3124    pub fn referenced_waitpoint_keys(&self) -> Vec<String> {
3125        let mut out: Vec<String> = Vec::new();
3126        let mut push = |k: &str| {
3127            if !out.iter().any(|e| e == k) {
3128                out.push(k.to_owned());
3129            }
3130        };
3131        fn walk(cond: &ResumeCondition, push: &mut dyn FnMut(&str)) {
3132            match cond {
3133                ResumeCondition::Single { waitpoint_key, .. } => push(waitpoint_key),
3134                ResumeCondition::Composite(body) => walk_body(body, push),
3135                _ => {}
3136            }
3137        }
3138        fn walk_body(body: &CompositeBody, push: &mut dyn FnMut(&str)) {
3139            match body {
3140                CompositeBody::AllOf { members } => {
3141                    for m in members {
3142                        walk(m, push);
3143                    }
3144                }
3145                CompositeBody::Count { waitpoints, .. } => {
3146                    for w in waitpoints {
3147                        push(w.as_str());
3148                    }
3149                }
3150            }
3151        }
3152        walk(self, &mut push);
3153        out
3154    }
3155
3156    /// Validate RFC-014 structural invariants on a composite condition.
3157    /// Single / OperatorOnly / TimeoutOnly return Ok — they carry no
3158    /// composite body. Checks cover:
3159    /// * `AllOf { members: [] }` — §5.1 `allof_empty_members`
3160    /// * `Count { n: 0 }` — §5.1 `count_n_zero`
3161    /// * `Count { waitpoints: [] }` — §5.1 `count_waitpoints_empty`
3162    /// * `Count { n > waitpoints.len(), DistinctWaitpoints }` — §5.1
3163    ///   `count_exceeds_waitpoint_set`
3164    /// * depth > [`MAX_COMPOSITE_DEPTH`] — §5.1 `condition_depth_exceeded`
3165    pub fn validate_composite(&self) -> Result<(), CompositeValidationError> {
3166        match self {
3167            ResumeCondition::Composite(body) => validate_body(body, 1, ""),
3168            _ => Ok(()),
3169        }
3170    }
3171}
3172
3173fn validate_body(
3174    body: &CompositeBody,
3175    depth: usize,
3176    path: &str,
3177) -> Result<(), CompositeValidationError> {
3178    if depth > MAX_COMPOSITE_DEPTH {
3179        return Err(CompositeValidationError::new(format!(
3180            "depth {} exceeds cap {} at path {}",
3181            depth,
3182            MAX_COMPOSITE_DEPTH,
3183            if path.is_empty() { "<root>" } else { path }
3184        )));
3185    }
3186    match body {
3187        CompositeBody::AllOf { members } => {
3188            if members.is_empty() {
3189                return Err(CompositeValidationError::new(format!(
3190                    "allof_empty_members at path {}",
3191                    if path.is_empty() { "<root>" } else { path }
3192                )));
3193            }
3194            for (i, m) in members.iter().enumerate() {
3195                let child_path = if path.is_empty() {
3196                    format!("members[{i}]")
3197                } else {
3198                    format!("{path}.members[{i}]")
3199                };
3200                if let ResumeCondition::Composite(inner) = m {
3201                    validate_body(inner, depth + 1, &child_path)?;
3202                }
3203                // Leaf `Single` / operator / timeout needs no further
3204                // structural checks — RFC-013 already constrains them.
3205            }
3206            Ok(())
3207        }
3208        CompositeBody::Count {
3209            n,
3210            count_kind,
3211            waitpoints,
3212            ..
3213        } => {
3214            if *n == 0 {
3215                return Err(CompositeValidationError::new(format!(
3216                    "count_n_zero at path {}",
3217                    if path.is_empty() { "<root>" } else { path }
3218                )));
3219            }
3220            if waitpoints.is_empty() {
3221                return Err(CompositeValidationError::new(format!(
3222                    "count_waitpoints_empty at path {}",
3223                    if path.is_empty() { "<root>" } else { path }
3224                )));
3225            }
3226            if matches!(count_kind, CountKind::DistinctWaitpoints)
3227                && (*n as usize) > waitpoints.len()
3228            {
3229                return Err(CompositeValidationError::new(format!(
3230                    "count_exceeds_waitpoint_set: n={} > waitpoints.len()={} at path {}",
3231                    n,
3232                    waitpoints.len(),
3233                    if path.is_empty() { "<root>" } else { path }
3234                )));
3235            }
3236            Ok(())
3237        }
3238    }
3239}
3240
3241/// Where a satisfied suspension routes back to.
3242///
3243/// v1 ships only [`ResumeTarget::Runnable`] — execution returns to
3244/// `runnable` and goes through normal scheduling.
3245#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
3246#[non_exhaustive]
3247pub enum ResumeTarget {
3248    Runnable,
3249}
3250
3251/// Resume-side policy carried alongside [`ResumeCondition`].
3252///
3253/// RFC-013 §2.5 — what happens when the condition is satisfied. Fields
3254/// mirror the `resume_policy_json` the backend serializer writes to Lua
3255/// (RFC-004 §Resume policy fields).
3256#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
3257#[non_exhaustive]
3258pub struct ResumePolicy {
3259    pub resume_target: ResumeTarget,
3260    pub consume_matched_signals: bool,
3261    pub retain_signal_buffer_until_closed: bool,
3262    #[serde(default, skip_serializing_if = "Option::is_none")]
3263    pub resume_delay_ms: Option<u64>,
3264    pub close_waitpoint_on_resume: bool,
3265}
3266
3267impl ResumePolicy {
3268    /// Canonical v1 defaults (RFC-013 §2.2.1):
3269    /// * `resume_target = Runnable`
3270    /// * `consume_matched_signals = true`
3271    /// * `retain_signal_buffer_until_closed = false`
3272    /// * `resume_delay_ms = None`
3273    /// * `close_waitpoint_on_resume = true`
3274    pub fn normal() -> Self {
3275        Self {
3276            resume_target: ResumeTarget::Runnable,
3277            consume_matched_signals: true,
3278            retain_signal_buffer_until_closed: false,
3279            resume_delay_ms: None,
3280            close_waitpoint_on_resume: true,
3281        }
3282    }
3283}
3284
3285/// Timeout behavior at the suspension deadline (RFC-004 §Timeout Behavior).
3286#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
3287#[non_exhaustive]
3288pub enum TimeoutBehavior {
3289    Fail,
3290    Cancel,
3291    Expire,
3292    AutoResumeWithTimeoutSignal,
3293    /// v2 per RFC-004 Implementation Notes; enum slot present for
3294    /// additive RFC-014/RFC-015 landing.
3295    Escalate,
3296}
3297
3298impl TimeoutBehavior {
3299    /// Lua-side string encoding. Matches the wire values Lua's
3300    /// `ff_expire_suspension` matches on.
3301    pub fn as_wire_str(self) -> &'static str {
3302        match self {
3303            Self::Fail => "fail",
3304            Self::Cancel => "cancel",
3305            Self::Expire => "expire",
3306            Self::AutoResumeWithTimeoutSignal => "auto_resume_with_timeout_signal",
3307            Self::Escalate => "escalate",
3308        }
3309    }
3310}
3311
3312/// Reason category for a suspension (RFC-004 §Suspension Reason Categories).
3313#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
3314#[non_exhaustive]
3315pub enum SuspensionReasonCode {
3316    WaitingForSignal,
3317    WaitingForApproval,
3318    WaitingForCallback,
3319    WaitingForToolResult,
3320    WaitingForOperatorReview,
3321    PausedByPolicy,
3322    PausedByBudget,
3323    StepBoundary,
3324    ManualPause,
3325}
3326
3327impl SuspensionReasonCode {
3328    pub fn as_wire_str(self) -> &'static str {
3329        match self {
3330            Self::WaitingForSignal => "waiting_for_signal",
3331            Self::WaitingForApproval => "waiting_for_approval",
3332            Self::WaitingForCallback => "waiting_for_callback",
3333            Self::WaitingForToolResult => "waiting_for_tool_result",
3334            Self::WaitingForOperatorReview => "waiting_for_operator_review",
3335            Self::PausedByPolicy => "paused_by_policy",
3336            Self::PausedByBudget => "paused_by_budget",
3337            Self::StepBoundary => "step_boundary",
3338            Self::ManualPause => "manual_pause",
3339        }
3340    }
3341}
3342
3343/// Who requested the suspension.
3344#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
3345#[non_exhaustive]
3346pub enum SuspensionRequester {
3347    Worker,
3348    Operator,
3349    Policy,
3350    SystemTimeoutPolicy,
3351}
3352
3353impl SuspensionRequester {
3354    pub fn as_wire_str(self) -> &'static str {
3355        match self {
3356            Self::Worker => "worker",
3357            Self::Operator => "operator",
3358            Self::Policy => "policy",
3359            Self::SystemTimeoutPolicy => "system_timeout_policy",
3360        }
3361    }
3362}
3363
3364/// How the waitpoint resource backing a [`SuspendArgs`] is obtained.
3365///
3366/// RFC-013 §2.2 — `Fresh` mints a new waitpoint as part of `suspend`;
3367/// `UsePending` activates a waitpoint previously issued via
3368/// `EngineBackend::create_waitpoint`.
3369#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
3370#[non_exhaustive]
3371pub enum WaitpointBinding {
3372    Fresh {
3373        waitpoint_id: WaitpointId,
3374        waitpoint_key: String,
3375    },
3376    UsePending {
3377        waitpoint_id: WaitpointId,
3378    },
3379}
3380
3381impl WaitpointBinding {
3382    /// Mint a fresh binding with a random `waitpoint_id` (UUID v4) and
3383    /// `waitpoint_key = "wpk:<uuid>"`.
3384    pub fn fresh() -> Self {
3385        let wp_id = WaitpointId::new();
3386        let key = format!("wpk:{wp_id}");
3387        Self::Fresh {
3388            waitpoint_id: wp_id,
3389            waitpoint_key: key,
3390        }
3391    }
3392
3393    /// Construct a `UsePending` binding from a pending waitpoint
3394    /// previously issued by `create_waitpoint`. The HMAC token is
3395    /// resolved Lua-side from the partition's waitpoint hash at
3396    /// `suspend` time (RFC-013 §5.1).
3397    pub fn use_pending(pending: &crate::backend::PendingWaitpoint) -> Self {
3398        Self::UsePending {
3399            waitpoint_id: pending.waitpoint_id.clone(),
3400        }
3401    }
3402}
3403
3404/// Trait-surface input to [`EngineBackend::suspend`] (RFC-013 §2.2 +
3405/// RFC-014 Pattern 3 widening).
3406///
3407/// Built via [`SuspendArgs::new`] + `with_*` setters; direct struct-
3408/// literal construction across crate boundaries is not possible
3409/// (`#[non_exhaustive]`).
3410///
3411/// ## Waitpoints
3412///
3413/// `waitpoints` is a non-empty `Vec<WaitpointBinding>`. The first entry
3414/// is the "primary" binding (accessible via [`primary`](Self::primary))
3415/// and carries the `current_waitpoint_id` written onto `exec_core` for
3416/// operator visibility. Additional entries land in Valkey as their own
3417/// waitpoint hashes / signal streams / HMAC tokens, enabling RFC-014
3418/// Pattern 3 `AllOf { members: [Single{wp1}, Single{wp2}, ...] }` across
3419/// distinct heterogeneous subsystems.
3420///
3421/// [`SuspendArgs::new`] takes exactly the primary binding; call
3422/// [`with_waitpoint`](Self::with_waitpoint) to append further bindings
3423/// (the RFC-014 builder API).
3424#[derive(Clone, Debug, Serialize, Deserialize)]
3425#[non_exhaustive]
3426pub struct SuspendArgs {
3427    pub suspension_id: SuspensionId,
3428    /// RFC-014 Pattern 3: all waitpoint bindings for this suspension.
3429    /// Guaranteed non-empty; `waitpoints[0]` is the primary.
3430    pub waitpoints: Vec<WaitpointBinding>,
3431    pub resume_condition: ResumeCondition,
3432    pub resume_policy: ResumePolicy,
3433    pub reason_code: SuspensionReasonCode,
3434    pub requested_by: SuspensionRequester,
3435    #[serde(default, skip_serializing_if = "Option::is_none")]
3436    pub timeout_at: Option<TimestampMs>,
3437    pub timeout_behavior: TimeoutBehavior,
3438    #[serde(default, skip_serializing_if = "Option::is_none")]
3439    pub continuation_metadata_pointer: Option<String>,
3440    pub now: TimestampMs,
3441    #[serde(default, skip_serializing_if = "Option::is_none")]
3442    pub idempotency_key: Option<IdempotencyKey>,
3443}
3444
3445impl SuspendArgs {
3446    /// Build a minimal `SuspendArgs` for a worker-originated suspension.
3447    ///
3448    /// `waitpoint` becomes the primary binding. Append additional
3449    /// bindings with [`with_waitpoint`](Self::with_waitpoint) (RFC-014
3450    /// Pattern 3) or replace the set with
3451    /// [`with_waitpoints`](Self::with_waitpoints).
3452    ///
3453    /// Defaults: `requested_by = Worker`, `timeout_at = None`,
3454    /// `timeout_behavior = Fail`, `continuation_metadata_pointer = None`,
3455    /// `idempotency_key = None`.
3456    pub fn new(
3457        suspension_id: SuspensionId,
3458        waitpoint: WaitpointBinding,
3459        resume_condition: ResumeCondition,
3460        resume_policy: ResumePolicy,
3461        reason_code: SuspensionReasonCode,
3462        now: TimestampMs,
3463    ) -> Self {
3464        Self {
3465            suspension_id,
3466            waitpoints: vec![waitpoint],
3467            resume_condition,
3468            resume_policy,
3469            reason_code,
3470            requested_by: SuspensionRequester::Worker,
3471            timeout_at: None,
3472            timeout_behavior: TimeoutBehavior::Fail,
3473            continuation_metadata_pointer: None,
3474            now,
3475            idempotency_key: None,
3476        }
3477    }
3478
3479    /// Primary binding — `waitpoints[0]`. Guaranteed present by
3480    /// construction.
3481    pub fn primary(&self) -> &WaitpointBinding {
3482        &self.waitpoints[0]
3483    }
3484
3485    pub fn with_timeout(mut self, at: TimestampMs, behavior: TimeoutBehavior) -> Self {
3486        self.timeout_at = Some(at);
3487        self.timeout_behavior = behavior;
3488        self
3489    }
3490
3491    pub fn with_requester(mut self, requester: SuspensionRequester) -> Self {
3492        self.requested_by = requester;
3493        self
3494    }
3495
3496    pub fn with_continuation_metadata_pointer(mut self, p: String) -> Self {
3497        self.continuation_metadata_pointer = Some(p);
3498        self
3499    }
3500
3501    pub fn with_idempotency_key(mut self, key: IdempotencyKey) -> Self {
3502        self.idempotency_key = Some(key);
3503        self
3504    }
3505
3506    /// RFC-014 Pattern 3 — append a further waitpoint binding to this
3507    /// suspension. Each additional binding yields its own waitpoint
3508    /// hash, signal stream, condition hash and HMAC token in Valkey,
3509    /// but all share the suspension record and composite evaluator
3510    /// under one `suspension:current`.
3511    ///
3512    /// Ordering: the primary (from [`SuspendArgs::new`]) stays at
3513    /// `waitpoints[0]`; subsequent `with_waitpoint` calls append at the
3514    /// tail.
3515    pub fn with_waitpoint(mut self, binding: WaitpointBinding) -> Self {
3516        self.waitpoints.push(binding);
3517        self
3518    }
3519
3520    /// RFC-014 Pattern 3 — replace the full binding vector in one call.
3521    /// Must be non-empty; an empty Vec is a programmer error and will
3522    /// be rejected by the backend's `validate_suspend_args` with
3523    /// `waitpoints_empty`.
3524    pub fn with_waitpoints(mut self, bindings: Vec<WaitpointBinding>) -> Self {
3525        self.waitpoints = bindings;
3526        self
3527    }
3528}
3529
3530/// Shared "what happened on the waitpoint" payload carried in both
3531/// [`SuspendOutcome`] variants.
3532///
3533/// For Pattern 3 (RFC-014) — multi-waitpoint suspensions — the primary
3534/// binding's identity lives at the top level (`waitpoint_id` /
3535/// `waitpoint_key` / `waitpoint_token`) and remaining bindings are
3536/// exposed via `additional_waitpoints`, each carrying its own minted
3537/// HMAC token so external signallers can deliver to any of the N
3538/// waitpoints the suspension is listening on.
3539#[derive(Clone, Debug, PartialEq, Eq)]
3540#[non_exhaustive]
3541pub struct SuspendOutcomeDetails {
3542    pub suspension_id: SuspensionId,
3543    pub waitpoint_id: WaitpointId,
3544    pub waitpoint_key: String,
3545    pub waitpoint_token: WaitpointHmac,
3546    /// RFC-014 Pattern 3 extras (beyond the primary). Empty for
3547    /// single-waitpoint suspensions (patterns 1 + 2); carries one
3548    /// entry per additional binding for Pattern 3.
3549    pub additional_waitpoints: Vec<AdditionalWaitpointBinding>,
3550}
3551
3552/// RFC-014 Pattern 3 — per-binding identity + HMAC token for
3553/// waitpoints beyond the primary. Structure mirrors the top-level
3554/// fields on [`SuspendOutcomeDetails`].
3555#[derive(Clone, Debug, PartialEq, Eq)]
3556#[non_exhaustive]
3557pub struct AdditionalWaitpointBinding {
3558    pub waitpoint_id: WaitpointId,
3559    pub waitpoint_key: String,
3560    pub waitpoint_token: WaitpointHmac,
3561}
3562
3563impl AdditionalWaitpointBinding {
3564    pub fn new(
3565        waitpoint_id: WaitpointId,
3566        waitpoint_key: String,
3567        waitpoint_token: WaitpointHmac,
3568    ) -> Self {
3569        Self {
3570            waitpoint_id,
3571            waitpoint_key,
3572            waitpoint_token,
3573        }
3574    }
3575}
3576
3577impl SuspendOutcomeDetails {
3578    pub fn new(
3579        suspension_id: SuspensionId,
3580        waitpoint_id: WaitpointId,
3581        waitpoint_key: String,
3582        waitpoint_token: WaitpointHmac,
3583    ) -> Self {
3584        Self {
3585            suspension_id,
3586            waitpoint_id,
3587            waitpoint_key,
3588            waitpoint_token,
3589            additional_waitpoints: Vec::new(),
3590        }
3591    }
3592
3593    /// Attach RFC-014 Pattern 3 additional-waitpoint bindings. The
3594    /// primary binding stays at the top-level fields; `extras` lands
3595    /// in [`additional_waitpoints`](Self::additional_waitpoints).
3596    pub fn with_additional_waitpoints(
3597        mut self,
3598        extras: Vec<AdditionalWaitpointBinding>,
3599    ) -> Self {
3600        self.additional_waitpoints = extras;
3601        self
3602    }
3603}
3604
3605/// Trait-surface output from [`EngineBackend::suspend`] (RFC-013 §2.3).
3606///
3607/// Two variants encode the "lease released" vs "lease retained" split.
3608/// See §2.3 for the runtime-enforcement semantics.
3609#[derive(Clone, Debug, PartialEq, Eq)]
3610#[non_exhaustive]
3611pub enum SuspendOutcome {
3612    /// The worker's pre-suspend handle is no longer lease-bearing; a
3613    /// fresh `HandleKind::Suspended` handle supersedes it.
3614    Suspended {
3615        details: SuspendOutcomeDetails,
3616        handle: crate::backend::Handle,
3617    },
3618    /// Buffered signals on a pending waitpoint already satisfied the
3619    /// condition at suspension time; the lease is retained and the
3620    /// caller's pre-suspend handle remains valid.
3621    AlreadySatisfied { details: SuspendOutcomeDetails },
3622}
3623
3624impl SuspendOutcome {
3625    /// Borrow the shared details regardless of variant.
3626    pub fn details(&self) -> &SuspendOutcomeDetails {
3627        match self {
3628            Self::Suspended { details, .. } => details,
3629            Self::AlreadySatisfied { details } => details,
3630        }
3631    }
3632}
3633
3634// `EngineBackend::suspend` type re-exports for `ff_core::backend::*`
3635// consumers. The `backend` module re-exports these below so external
3636// crates can reach them via the idiomatic `ff_core::backend` path that
3637// already sources the other trait-surface types (RFC-013 §9.1).
3638
3639#[cfg(test)]
3640mod rfc_014_validation_tests {
3641    use super::*;
3642
3643    fn single(wp: &str) -> ResumeCondition {
3644        ResumeCondition::Single {
3645            waitpoint_key: wp.to_owned(),
3646            matcher: SignalMatcher::ByName("x".to_owned()),
3647        }
3648    }
3649
3650    #[test]
3651    fn single_passes_validate() {
3652        assert!(single("wpk:a").validate_composite().is_ok());
3653    }
3654
3655    #[test]
3656    fn allof_empty_members_rejected() {
3657        let c = ResumeCondition::Composite(CompositeBody::AllOf { members: vec![] });
3658        let e = c.validate_composite().unwrap_err();
3659        assert!(e.detail.contains("allof_empty_members"), "{}", e.detail);
3660    }
3661
3662    #[test]
3663    fn count_n_zero_rejected() {
3664        let c = ResumeCondition::Composite(CompositeBody::Count {
3665            n: 0,
3666            count_kind: CountKind::DistinctWaitpoints,
3667            matcher: None,
3668            waitpoints: vec!["wpk:a".to_owned()],
3669        });
3670        let e = c.validate_composite().unwrap_err();
3671        assert!(e.detail.contains("count_n_zero"), "{}", e.detail);
3672    }
3673
3674    #[test]
3675    fn count_waitpoints_empty_rejected() {
3676        let c = ResumeCondition::Composite(CompositeBody::Count {
3677            n: 1,
3678            count_kind: CountKind::DistinctSources,
3679            matcher: None,
3680            waitpoints: vec![],
3681        });
3682        let e = c.validate_composite().unwrap_err();
3683        assert!(e.detail.contains("count_waitpoints_empty"), "{}", e.detail);
3684    }
3685
3686    #[test]
3687    fn count_exceeds_waitpoint_set_rejected_only_for_distinct_waitpoints() {
3688        // n=3, only 2 waitpoints, DistinctWaitpoints → reject.
3689        let c = ResumeCondition::Composite(CompositeBody::Count {
3690            n: 3,
3691            count_kind: CountKind::DistinctWaitpoints,
3692            matcher: None,
3693            waitpoints: vec!["a".into(), "b".into()],
3694        });
3695        let e = c.validate_composite().unwrap_err();
3696        assert!(e.detail.contains("count_exceeds_waitpoint_set"), "{}", e.detail);
3697
3698        // Same cardinality, DistinctSignals → allowed (no upper bound).
3699        let c2 = ResumeCondition::Composite(CompositeBody::Count {
3700            n: 3,
3701            count_kind: CountKind::DistinctSignals,
3702            matcher: None,
3703            waitpoints: vec!["a".into(), "b".into()],
3704        });
3705        assert!(c2.validate_composite().is_ok());
3706    }
3707
3708    #[test]
3709    fn depth_4_accepted_depth_5_rejected() {
3710        // Build Depth-4: AllOf { AllOf { AllOf { AllOf { Single } } } }
3711        let leaf = single("wpk:leaf");
3712        let d4 = ResumeCondition::Composite(CompositeBody::AllOf {
3713            members: vec![ResumeCondition::Composite(CompositeBody::AllOf {
3714                members: vec![ResumeCondition::Composite(CompositeBody::AllOf {
3715                    members: vec![ResumeCondition::Composite(CompositeBody::AllOf {
3716                        members: vec![leaf.clone()],
3717                    })],
3718                })],
3719            })],
3720        });
3721        assert!(d4.validate_composite().is_ok());
3722
3723        // Depth-5 → reject.
3724        let d5 = ResumeCondition::Composite(CompositeBody::AllOf {
3725            members: vec![d4],
3726        });
3727        let e = d5.validate_composite().unwrap_err();
3728        assert!(e.detail.contains("exceeds cap"), "{}", e.detail);
3729    }
3730}