Skip to main content

ff_core/
contracts.rs

1//! Phase 1 function contracts — Args + Result types for each FCALL.
2//!
3//! Each Args struct defines the typed inputs to a Valkey Function.
4//! Each Result enum defines the possible outcomes (success variants + error codes).
5
6use crate::policy::ExecutionPolicy;
7use crate::state::{AttemptType, PublicState, StateVector};
8use crate::types::{
9    AttemptId, AttemptIndex, CancelSource, EdgeId, ExecutionId, FlowId, LaneId, LeaseEpoch,
10    LeaseFence, LeaseId, Namespace, SignalId, SuspensionId, TimestampMs, WaitpointId,
11    WaitpointToken, WorkerId, WorkerInstanceId,
12};
13use serde::{Deserialize, Serialize};
14use std::collections::{BTreeMap, BTreeSet, HashMap};
15
16// ─── create_execution ───
17
18#[derive(Clone, Debug, Serialize, Deserialize)]
19pub struct CreateExecutionArgs {
20    pub execution_id: ExecutionId,
21    pub namespace: Namespace,
22    pub lane_id: LaneId,
23    pub execution_kind: String,
24    pub input_payload: Vec<u8>,
25    #[serde(default)]
26    pub payload_encoding: Option<String>,
27    pub priority: i32,
28    pub creator_identity: String,
29    #[serde(default)]
30    pub idempotency_key: Option<String>,
31    #[serde(default)]
32    pub tags: HashMap<String, String>,
33    /// Execution policy (retry, timeout, suspension, routing, etc.).
34    #[serde(default)]
35    pub policy: Option<ExecutionPolicy>,
36    /// If set and in the future, execution starts delayed.
37    #[serde(default)]
38    pub delay_until: Option<TimestampMs>,
39    /// Absolute deadline timestamp (ms). Execution expires if not complete by this time.
40    #[serde(default)]
41    pub execution_deadline_at: Option<TimestampMs>,
42    /// Partition ID (pre-computed).
43    pub partition_id: u16,
44    pub now: TimestampMs,
45}
46
47#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
48pub enum CreateExecutionResult {
49    /// Execution created successfully.
50    Created {
51        execution_id: ExecutionId,
52        public_state: PublicState,
53    },
54    /// Idempotent duplicate — existing execution returned.
55    Duplicate { execution_id: ExecutionId },
56}
57
58// ─── issue_claim_grant ───
59
60#[derive(Clone, Debug, Serialize, Deserialize)]
61pub struct IssueClaimGrantArgs {
62    pub execution_id: ExecutionId,
63    pub lane_id: LaneId,
64    pub worker_id: WorkerId,
65    pub worker_instance_id: WorkerInstanceId,
66    #[serde(default)]
67    pub capability_hash: Option<String>,
68    #[serde(default)]
69    pub route_snapshot_json: Option<String>,
70    #[serde(default)]
71    pub admission_summary: Option<String>,
72    /// Capabilities this worker advertises. Serialized as a sorted,
73    /// comma-separated string to the Lua FCALL (see scheduling.lua
74    /// ff_issue_claim_grant). An empty set matches only executions whose
75    /// `required_capabilities` is also empty.
76    #[serde(default)]
77    pub worker_capabilities: BTreeSet<String>,
78    pub grant_ttl_ms: u64,
79    /// Caller-side timestamp for bookkeeping. NOT passed to the Lua FCALL —
80    /// ff_issue_claim_grant uses `redis.call("TIME")` for grant_expires_at.
81    pub now: TimestampMs,
82}
83
84#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
85pub enum IssueClaimGrantResult {
86    /// Grant issued.
87    Granted { execution_id: ExecutionId },
88}
89
90/// A claim grant issued by the scheduler for a specific execution.
91///
92/// The worker uses this to call `ff_claim_execution` (or
93/// `ff_acquire_lease`), which atomically consumes the grant and
94/// creates the lease.
95///
96/// Shared wire-level type between `ff-scheduler` (issuer) and
97/// `ff-sdk` (consumer, via `FlowFabricWorker::claim_from_grant`).
98/// Lives in `ff-core` so neither crate needs a dep on the other.
99///
100/// **Lane asymmetry with [`ReclaimGrant`]:** `ClaimGrant` does NOT
101/// carry `lane_id`. The issuing scheduler's caller already picked
102/// a lane (that's how admission reached this grant) and passes it
103/// through to `claim_from_grant` as a separate argument. The grant
104/// handle stays narrow to what uniquely identifies the admission
105/// decision. The matching field on [`ReclaimGrant`] is an
106/// intentional divergence — see the note on that type.
107#[derive(Clone, Debug, PartialEq, Eq)]
108pub struct ClaimGrant {
109    /// The execution that was granted.
110    pub execution_id: ExecutionId,
111    /// Opaque partition handle for this execution's hash-tag slot.
112    ///
113    /// Public wire type: consumers pass it back to FlowFabric but
114    /// must not parse the interior hash tag for routing decisions.
115    /// Internal consumers that need the typed
116    /// [`crate::partition::Partition`] call [`Self::partition`].
117    pub partition_key: crate::partition::PartitionKey,
118    /// The Valkey key holding the grant hash (for the worker to
119    /// reference).
120    pub grant_key: String,
121    /// When the grant expires if not consumed.
122    pub expires_at_ms: u64,
123}
124
125impl ClaimGrant {
126    /// Parse `partition_key` into a typed
127    /// [`crate::partition::Partition`]. Intended for internal
128    /// consumers (scheduler emitter, SDK worker claim path) that
129    /// need the family/index pair. Fails only on malformed keys
130    /// (which indicates a producer bug).
131    ///
132    /// Alias collapse applies: a grant issued against `Execution`
133    /// family round-trips to `Flow` (see [`crate::partition::PartitionKey`]
134    /// for the rationale — routing is preserved, only the metadata
135    /// family label normalises).
136    pub fn partition(
137        &self,
138    ) -> Result<crate::partition::Partition, crate::partition::PartitionKeyParseError> {
139        self.partition_key.parse()
140    }
141}
142
143/// A reclaim grant issued for a resumed (attempt_interrupted) execution.
144///
145/// Issued by a producer (typically `ff-scheduler` once a Batch-C
146/// reclaim scanner is in place; test fixtures in the interim — no
147/// production Rust caller exists in-tree today). Consumed by
148/// [`FlowFabricWorker::claim_from_reclaim_grant`], which calls
149/// `ff_claim_resumed_execution` atomically: that FCALL validates the
150/// grant, consumes it, and transitions `attempt_interrupted` →
151/// `started` while preserving the existing `attempt_index` +
152/// `attempt_id` (a resumed execution re-uses its attempt; it does
153/// not start a new one).
154///
155/// Mirrors [`ClaimGrant`] for the resume path. Differences:
156///
157///   * [`ClaimGrant`] is issued against a freshly-eligible
158///     execution and `ff_claim_execution` creates a new attempt.
159///   * [`ReclaimGrant`] is issued against an `attempt_interrupted`
160///     execution; `ff_claim_resumed_execution` re-uses the existing
161///     attempt and bumps the lease epoch.
162///
163/// The grant itself is written to the same `claim_grant` Valkey key
164/// that [`ClaimGrant`] uses; the distinction is which Lua FCALL
165/// consumes it (`ff_claim_execution` for new attempts,
166/// `ff_claim_resumed_execution` for resumes).
167///
168/// **Lane asymmetry with [`ClaimGrant`]:** `ReclaimGrant` CARRIES
169/// `lane_id` as a field. The issuing path already knows the lane
170/// (it's read from `exec_core` at grant time); carrying it here
171/// spares the consumer a `HGET exec_core lane_id` round trip on
172/// the hot claim path. The asymmetry is intentional — prefer
173/// one-fewer-HGET on a type that already lives with the resumer's
174/// lifecycle over strict handle symmetry with `ClaimGrant`.
175///
176/// Shared wire-level type between the eventual `ff-scheduler`
177/// producer (Batch-C reclaim scanner — not yet in-tree; test
178/// fixtures construct this type today) and `ff-sdk` (consumer, via
179/// `FlowFabricWorker::claim_from_reclaim_grant`). Lives in
180/// `ff-core` so neither crate needs a dep on the other.
181///
182/// [`FlowFabricWorker::claim_from_reclaim_grant`]: https://docs.rs/ff-sdk
183#[derive(Clone, Debug, PartialEq, Eq)]
184pub struct ReclaimGrant {
185    /// The execution granted for resumption.
186    pub execution_id: ExecutionId,
187    /// Opaque partition handle for this execution's hash-tag slot.
188    ///
189    /// Same wire-opacity contract as [`ClaimGrant::partition_key`].
190    /// Internal consumers call [`Self::partition`] for the parsed
191    /// form.
192    pub partition_key: crate::partition::PartitionKey,
193    /// Valkey key of the grant hash — same key shape as
194    /// [`ClaimGrant`].
195    pub grant_key: String,
196    /// Monotonic ms when the grant expires; unconsumed grants
197    /// vanish.
198    pub expires_at_ms: u64,
199    /// Lane the execution belongs to. Needed by
200    /// `ff_claim_resumed_execution` for `KEYS[3]` (eligible_zset)
201    /// and `KEYS[9]` (active_index).
202    pub lane_id: LaneId,
203}
204
205impl ReclaimGrant {
206    /// Parse `partition_key` into a typed
207    /// [`crate::partition::Partition`]. See [`ClaimGrant::partition`]
208    /// for the alias-collapse note.
209    pub fn partition(
210        &self,
211    ) -> Result<crate::partition::Partition, crate::partition::PartitionKeyParseError> {
212        self.partition_key.parse()
213    }
214}
215
216// ─── claim_execution ───
217
218#[derive(Clone, Debug, Serialize, Deserialize)]
219pub struct ClaimExecutionArgs {
220    pub execution_id: ExecutionId,
221    pub worker_id: WorkerId,
222    pub worker_instance_id: WorkerInstanceId,
223    pub lane_id: LaneId,
224    pub lease_id: LeaseId,
225    pub lease_ttl_ms: u64,
226    pub attempt_id: AttemptId,
227    /// Expected attempt index (pre-read from exec_core.total_attempt_count).
228    /// Used for KEYS construction — must match what the Lua computes.
229    pub expected_attempt_index: AttemptIndex,
230    /// JSON-encoded attempt policy snapshot.
231    #[serde(default)]
232    pub attempt_policy_json: String,
233    /// Per-attempt timeout in ms.
234    #[serde(default)]
235    pub attempt_timeout_ms: Option<u64>,
236    /// Total execution deadline (absolute timestamp ms).
237    #[serde(default)]
238    pub execution_deadline_at: Option<i64>,
239    pub now: TimestampMs,
240}
241
242#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
243pub struct ClaimedExecution {
244    pub execution_id: ExecutionId,
245    pub lease_id: LeaseId,
246    pub lease_epoch: LeaseEpoch,
247    pub attempt_index: AttemptIndex,
248    pub attempt_id: AttemptId,
249    pub attempt_type: AttemptType,
250    pub lease_expires_at: TimestampMs,
251}
252
253#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
254pub enum ClaimExecutionResult {
255    /// Successfully claimed.
256    Claimed(ClaimedExecution),
257}
258
259// ─── complete_execution ───
260
261#[derive(Clone, Debug, Serialize, Deserialize)]
262pub struct CompleteExecutionArgs {
263    pub execution_id: ExecutionId,
264    /// RFC #58.5 — fence triple. `Some` for SDK worker paths (standard
265    /// stale-lease fence). `None` for operator overrides, in which case
266    /// `source` must be `CancelSource::OperatorOverride` or the Lua
267    /// returns `fence_required`.
268    #[serde(default)]
269    pub fence: Option<LeaseFence>,
270    pub attempt_index: AttemptIndex,
271    #[serde(default)]
272    pub result_payload: Option<Vec<u8>>,
273    #[serde(default)]
274    pub result_encoding: Option<String>,
275    /// RFC #58.5 — unfenced-call gate. Ignored when `fence` is `Some`.
276    #[serde(default)]
277    pub source: CancelSource,
278    pub now: TimestampMs,
279}
280
281#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
282pub enum CompleteExecutionResult {
283    /// Execution completed successfully.
284    Completed {
285        execution_id: ExecutionId,
286        public_state: PublicState,
287    },
288}
289
290// ─── renew_lease ───
291
292#[derive(Clone, Debug, Serialize, Deserialize)]
293pub struct RenewLeaseArgs {
294    pub execution_id: ExecutionId,
295    pub attempt_index: AttemptIndex,
296    /// RFC #58.5 — fence triple. Required (no operator override path for
297    /// renew). `None` returns `fence_required`.
298    pub fence: Option<LeaseFence>,
299    /// How long to extend the lease (milliseconds).
300    pub lease_ttl_ms: u64,
301    /// Grace period after lease_expires_at before the lease_current key is auto-deleted.
302    #[serde(default = "default_lease_history_grace_ms")]
303    pub lease_history_grace_ms: u64,
304}
305
306fn default_lease_history_grace_ms() -> u64 {
307    60_000
308}
309
310#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
311pub enum RenewLeaseResult {
312    /// Lease renewed.
313    Renewed { expires_at: TimestampMs },
314}
315
316// ─── mark_lease_expired_if_due ───
317
318#[derive(Clone, Debug, Serialize, Deserialize)]
319pub struct MarkLeaseExpiredArgs {
320    pub execution_id: ExecutionId,
321}
322
323#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
324pub enum MarkLeaseExpiredResult {
325    /// Lease was marked as expired.
326    MarkedExpired,
327    /// No action needed (already expired, not yet due, not active, etc.).
328    AlreadySatisfied { reason: String },
329}
330
331// ─── cancel_execution ───
332
333#[derive(Clone, Debug, Serialize, Deserialize)]
334pub struct CancelExecutionArgs {
335    pub execution_id: ExecutionId,
336    pub reason: String,
337    #[serde(default)]
338    pub source: CancelSource,
339    /// Required if not operator_override and execution is active.
340    #[serde(default)]
341    pub lease_id: Option<LeaseId>,
342    #[serde(default)]
343    pub lease_epoch: Option<LeaseEpoch>,
344    /// Required if not operator_override and execution is active.
345    #[serde(default)]
346    pub attempt_id: Option<AttemptId>,
347    pub now: TimestampMs,
348}
349
350#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
351pub enum CancelExecutionResult {
352    /// Execution cancelled.
353    Cancelled {
354        execution_id: ExecutionId,
355        public_state: PublicState,
356    },
357}
358
359// ─── revoke_lease ───
360
361#[derive(Clone, Debug, Serialize, Deserialize)]
362pub struct RevokeLeaseArgs {
363    pub execution_id: ExecutionId,
364    /// If set, only revoke if this matches the current lease. Empty string skips check.
365    #[serde(default)]
366    pub expected_lease_id: Option<String>,
367    /// Worker instance whose lease set to clean up. Read from exec_core before calling.
368    pub worker_instance_id: WorkerInstanceId,
369    pub reason: String,
370}
371
372#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
373pub enum RevokeLeaseResult {
374    /// Lease revoked.
375    Revoked {
376        lease_id: String,
377        lease_epoch: String,
378    },
379    /// Already revoked or expired — no action needed.
380    AlreadySatisfied { reason: String },
381}
382
383// ─── delay_execution ───
384
385#[derive(Clone, Debug, Serialize, Deserialize)]
386pub struct DelayExecutionArgs {
387    pub execution_id: ExecutionId,
388    /// RFC #58.5 — fence triple. `None` requires `source ==
389    /// CancelSource::OperatorOverride`.
390    #[serde(default)]
391    pub fence: Option<LeaseFence>,
392    pub attempt_index: AttemptIndex,
393    pub delay_until: TimestampMs,
394    #[serde(default)]
395    pub source: CancelSource,
396    pub now: TimestampMs,
397}
398
399#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
400pub enum DelayExecutionResult {
401    /// Execution delayed.
402    Delayed {
403        execution_id: ExecutionId,
404        public_state: PublicState,
405    },
406}
407
408// ─── move_to_waiting_children ───
409
410#[derive(Clone, Debug, Serialize, Deserialize)]
411pub struct MoveToWaitingChildrenArgs {
412    pub execution_id: ExecutionId,
413    /// RFC #58.5 — fence triple. `None` requires `source ==
414    /// CancelSource::OperatorOverride`.
415    #[serde(default)]
416    pub fence: Option<LeaseFence>,
417    pub attempt_index: AttemptIndex,
418    #[serde(default)]
419    pub source: CancelSource,
420    pub now: TimestampMs,
421}
422
423#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
424pub enum MoveToWaitingChildrenResult {
425    /// Moved to waiting children.
426    Moved {
427        execution_id: ExecutionId,
428        public_state: PublicState,
429    },
430}
431
432// ─── change_priority ───
433
434#[derive(Clone, Debug, Serialize, Deserialize)]
435pub struct ChangePriorityArgs {
436    pub execution_id: ExecutionId,
437    pub new_priority: i32,
438    pub lane_id: LaneId,
439    pub now: TimestampMs,
440}
441
442#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
443pub enum ChangePriorityResult {
444    /// Priority changed and re-scored.
445    Changed { execution_id: ExecutionId },
446}
447
448// ─── update_progress ───
449
450#[derive(Clone, Debug, Serialize, Deserialize)]
451pub struct UpdateProgressArgs {
452    pub execution_id: ExecutionId,
453    pub lease_id: LeaseId,
454    pub lease_epoch: LeaseEpoch,
455    pub attempt_id: AttemptId,
456    #[serde(default)]
457    pub progress_pct: Option<u8>,
458    #[serde(default)]
459    pub progress_message: Option<String>,
460    pub now: TimestampMs,
461}
462
463#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
464pub enum UpdateProgressResult {
465    /// Progress updated.
466    Updated,
467}
468
469// ═══════════════════════════════════════════════════════════════════════
470// Phase 2 contracts: fail, reclaim, expire
471// ═══════════════════════════════════════════════════════════════════════
472
473// ─── fail_execution ───
474
475#[derive(Clone, Debug, Serialize, Deserialize)]
476pub struct FailExecutionArgs {
477    pub execution_id: ExecutionId,
478    /// RFC #58.5 — fence triple. `None` requires `source ==
479    /// CancelSource::OperatorOverride`.
480    #[serde(default)]
481    pub fence: Option<LeaseFence>,
482    pub attempt_index: AttemptIndex,
483    pub failure_reason: String,
484    pub failure_category: String,
485    /// JSON-encoded retry policy (from execution policy). Empty = no retries.
486    #[serde(default)]
487    pub retry_policy_json: String,
488    /// JSON-encoded attempt policy for the next retry attempt.
489    #[serde(default)]
490    pub next_attempt_policy_json: String,
491    #[serde(default)]
492    pub source: CancelSource,
493}
494
495/// Outcome of a fail_execution call.
496#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
497pub enum FailExecutionResult {
498    /// Retry was scheduled — execution is delayed with backoff.
499    RetryScheduled {
500        delay_until: TimestampMs,
501        next_attempt_index: AttemptIndex,
502    },
503    /// No retries left — execution is terminal failed.
504    TerminalFailed,
505}
506
507// ─── issue_reclaim_grant ───
508
509#[derive(Clone, Debug, Serialize, Deserialize)]
510pub struct IssueReclaimGrantArgs {
511    pub execution_id: ExecutionId,
512    pub worker_id: WorkerId,
513    pub worker_instance_id: WorkerInstanceId,
514    pub lane_id: LaneId,
515    #[serde(default)]
516    pub capability_hash: Option<String>,
517    pub grant_ttl_ms: u64,
518    #[serde(default)]
519    pub route_snapshot_json: Option<String>,
520    #[serde(default)]
521    pub admission_summary: Option<String>,
522    /// Caller-side timestamp for bookkeeping. NOT passed to the Lua FCALL —
523    /// ff_issue_reclaim_grant uses `redis.call("TIME")` for grant_expires_at
524    /// (same as ff_issue_claim_grant). Kept for contract symmetry with
525    /// IssueClaimGrantArgs and scheduler audit logging.
526    pub now: TimestampMs,
527}
528
529#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
530pub enum IssueReclaimGrantResult {
531    /// Reclaim grant issued.
532    Granted { expires_at_ms: TimestampMs },
533}
534
535// ─── reclaim_execution ───
536
537#[derive(Clone, Debug, Serialize, Deserialize)]
538pub struct ReclaimExecutionArgs {
539    pub execution_id: ExecutionId,
540    pub worker_id: WorkerId,
541    pub worker_instance_id: WorkerInstanceId,
542    pub lane_id: LaneId,
543    #[serde(default)]
544    pub capability_hash: Option<String>,
545    pub lease_id: LeaseId,
546    pub lease_ttl_ms: u64,
547    pub attempt_id: AttemptId,
548    /// JSON-encoded attempt policy for the reclaim attempt.
549    #[serde(default)]
550    pub attempt_policy_json: String,
551    /// Maximum reclaim count before terminal failure. Default: 100.
552    #[serde(default = "default_max_reclaim_count")]
553    pub max_reclaim_count: u32,
554    /// Old worker instance (for old_worker_leases key construction).
555    pub old_worker_instance_id: WorkerInstanceId,
556    /// Current attempt index (for old_attempt/old_stream_meta key construction).
557    pub current_attempt_index: AttemptIndex,
558}
559
560fn default_max_reclaim_count() -> u32 {
561    100
562}
563
564#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
565pub enum ReclaimExecutionResult {
566    /// Execution reclaimed — new attempt + new lease.
567    Reclaimed {
568        new_attempt_index: AttemptIndex,
569        new_attempt_id: AttemptId,
570        new_lease_id: LeaseId,
571        new_lease_epoch: LeaseEpoch,
572        lease_expires_at: TimestampMs,
573    },
574    /// Max reclaims exceeded — execution moved to terminal.
575    MaxReclaimsExceeded,
576}
577
578// ─── expire_execution ───
579
580#[derive(Clone, Debug, Serialize, Deserialize)]
581pub struct ExpireExecutionArgs {
582    pub execution_id: ExecutionId,
583    /// "attempt_timeout" or "execution_deadline"
584    pub expire_reason: String,
585}
586
587#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
588pub enum ExpireExecutionResult {
589    /// Execution expired.
590    Expired { execution_id: ExecutionId },
591    /// Already terminal — no-op.
592    AlreadyTerminal,
593}
594
595// ═══════════════════════════════════════════════════════════════════════
596// Phase 3 contracts: suspend, signal, resume, waitpoint
597// ═══════════════════════════════════════════════════════════════════════
598
599// ─── suspend_execution ───
600
601#[derive(Clone, Debug, Serialize, Deserialize)]
602pub struct SuspendExecutionArgs {
603    pub execution_id: ExecutionId,
604    /// RFC #58.5 — fence triple. Required (no operator override path for
605    /// suspend). `None` returns `fence_required`.
606    pub fence: Option<LeaseFence>,
607    pub attempt_index: AttemptIndex,
608    pub suspension_id: SuspensionId,
609    pub waitpoint_id: WaitpointId,
610    pub waitpoint_key: String,
611    pub reason_code: String,
612    pub requested_by: String,
613    pub resume_condition_json: String,
614    pub resume_policy_json: String,
615    #[serde(default)]
616    pub continuation_metadata_pointer: Option<String>,
617    #[serde(default)]
618    pub timeout_at: Option<TimestampMs>,
619    /// true to activate a pending waitpoint, false to create new.
620    #[serde(default)]
621    pub use_pending_waitpoint: bool,
622    /// Timeout behavior: "fail", "cancel", "expire", "auto_resume", "escalate".
623    #[serde(default = "default_timeout_behavior")]
624    pub timeout_behavior: String,
625}
626
627fn default_timeout_behavior() -> String {
628    "fail".to_owned()
629}
630
631#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
632pub enum SuspendExecutionResult {
633    /// Execution suspended, waitpoint active.
634    Suspended {
635        suspension_id: SuspensionId,
636        waitpoint_id: WaitpointId,
637        waitpoint_key: String,
638        /// HMAC-SHA1 token bound to (waitpoint_id, waitpoint_key, created_at).
639        /// Required by signal-delivery callers to authenticate against this
640        /// waitpoint (RFC-004 §Waitpoint Security).
641        waitpoint_token: WaitpointToken,
642    },
643    /// Buffered signals already satisfied the condition — suspension skipped.
644    /// Lease is still held. Token comes from the pending waitpoint record.
645    AlreadySatisfied {
646        suspension_id: SuspensionId,
647        waitpoint_id: WaitpointId,
648        waitpoint_key: String,
649        waitpoint_token: WaitpointToken,
650    },
651}
652
653// ─── resume_execution ───
654
655#[derive(Clone, Debug, Serialize, Deserialize)]
656pub struct ResumeExecutionArgs {
657    pub execution_id: ExecutionId,
658    /// "signal", "operator", "auto_resume"
659    #[serde(default = "default_trigger_type")]
660    pub trigger_type: String,
661    /// Optional delay before becoming eligible (ms).
662    #[serde(default)]
663    pub resume_delay_ms: u64,
664}
665
666fn default_trigger_type() -> String {
667    "signal".to_owned()
668}
669
670#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
671pub enum ResumeExecutionResult {
672    /// Execution resumed to runnable.
673    Resumed { public_state: PublicState },
674}
675
676// ─── create_pending_waitpoint ───
677
678#[derive(Clone, Debug, Serialize, Deserialize)]
679pub struct CreatePendingWaitpointArgs {
680    pub execution_id: ExecutionId,
681    pub lease_id: LeaseId,
682    pub lease_epoch: LeaseEpoch,
683    pub attempt_index: AttemptIndex,
684    pub attempt_id: AttemptId,
685    pub waitpoint_id: WaitpointId,
686    pub waitpoint_key: String,
687    /// Short expiry for the pending waitpoint (ms).
688    pub expires_in_ms: u64,
689}
690
691#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
692pub enum CreatePendingWaitpointResult {
693    /// Pending waitpoint created.
694    Created {
695        waitpoint_id: WaitpointId,
696        waitpoint_key: String,
697        /// HMAC-SHA1 token bound to the pending waitpoint. Required for
698        /// `buffer_signal_for_pending_waitpoint` and carried forward when
699        /// the waitpoint is activated by `suspend_execution`.
700        waitpoint_token: WaitpointToken,
701    },
702}
703
704// ─── close_waitpoint ───
705
706#[derive(Clone, Debug, Serialize, Deserialize)]
707pub struct CloseWaitpointArgs {
708    pub waitpoint_id: WaitpointId,
709    pub reason: String,
710}
711
712#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
713pub enum CloseWaitpointResult {
714    /// Waitpoint closed.
715    Closed,
716}
717
718// ─── deliver_signal ───
719
720#[derive(Clone, Debug, Serialize, Deserialize)]
721pub struct DeliverSignalArgs {
722    pub execution_id: ExecutionId,
723    pub waitpoint_id: WaitpointId,
724    pub signal_id: SignalId,
725    pub signal_name: String,
726    pub signal_category: String,
727    pub source_type: String,
728    pub source_identity: String,
729    #[serde(default)]
730    pub payload: Option<Vec<u8>>,
731    #[serde(default)]
732    pub payload_encoding: Option<String>,
733    #[serde(default)]
734    pub correlation_id: Option<String>,
735    #[serde(default)]
736    pub idempotency_key: Option<String>,
737    pub target_scope: String,
738    #[serde(default)]
739    pub created_at: Option<TimestampMs>,
740    /// Dedup TTL for idempotency key (ms).
741    #[serde(default)]
742    pub dedup_ttl_ms: Option<u64>,
743    /// Resume delay after signal satisfaction (ms).
744    #[serde(default)]
745    pub resume_delay_ms: Option<u64>,
746    /// Max signals per execution (default 10000).
747    #[serde(default)]
748    pub max_signals_per_execution: Option<u64>,
749    /// MAXLEN for the waitpoint signal stream.
750    #[serde(default)]
751    pub signal_maxlen: Option<u64>,
752    /// HMAC-SHA1 token issued when the waitpoint was created. Required for
753    /// signal delivery; missing/tampered/rotated-past-grace tokens are
754    /// rejected with `invalid_token` or `token_expired` (RFC-004).
755    ///
756    /// Defense-in-depth: `WaitpointToken` is a transparent string newtype,
757    /// so an empty string deserializes successfully from JSON. The
758    /// validation boundary is in Lua (`validate_waitpoint_token` returns
759    /// `missing_token` on empty input); this type intentionally does NOT
760    /// pre-reject at the Rust layer so callers get a consistent typed
761    /// error regardless of how they constructed the args.
762    pub waitpoint_token: WaitpointToken,
763    pub now: TimestampMs,
764}
765
766#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
767pub enum DeliverSignalResult {
768    /// Signal accepted with the given effect.
769    Accepted { signal_id: SignalId, effect: String },
770    /// Duplicate signal (idempotency key matched).
771    Duplicate { existing_signal_id: SignalId },
772}
773
774// ─── buffer_signal_for_pending_waitpoint ───
775
776#[derive(Clone, Debug, Serialize, Deserialize)]
777pub struct BufferSignalArgs {
778    pub execution_id: ExecutionId,
779    pub waitpoint_id: WaitpointId,
780    pub signal_id: SignalId,
781    pub signal_name: String,
782    pub signal_category: String,
783    pub source_type: String,
784    pub source_identity: String,
785    #[serde(default)]
786    pub payload: Option<Vec<u8>>,
787    #[serde(default)]
788    pub payload_encoding: Option<String>,
789    #[serde(default)]
790    pub idempotency_key: Option<String>,
791    pub target_scope: String,
792    /// HMAC-SHA1 token issued when `create_pending_waitpoint` ran. Required
793    /// to authenticate early signals targeting the pending waitpoint.
794    pub waitpoint_token: WaitpointToken,
795    pub now: TimestampMs,
796}
797
798#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
799pub enum BufferSignalResult {
800    /// Signal buffered for pending waitpoint.
801    Buffered { signal_id: SignalId },
802    /// Duplicate signal.
803    Duplicate { existing_signal_id: SignalId },
804}
805
806// ─── list_pending_waitpoints ───
807
808/// One entry in the read-only view of an execution's active waitpoints.
809///
810/// Returned by `Server::list_pending_waitpoints` (and the
811/// `GET /v1/executions/{id}/pending-waitpoints` REST endpoint). The
812/// `waitpoint_token` is the same HMAC-SHA1 credential a suspending worker
813/// receives in `SuspendOutcome::Suspended` — a reviewer that needs to
814/// deliver a signal against this waitpoint must present it in
815/// `DeliverSignalArgs::waitpoint_token`.
816///
817/// Exposing the token here is a deliberate API gap closure: a
818/// human-in-the-loop reviewer has no other path to the token, since only
819/// the suspending worker sees the `SuspendOutcome`. Access is gated by
820/// the same bearer-auth middleware as every other REST endpoint.
821#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
822pub struct PendingWaitpointInfo {
823    pub waitpoint_id: WaitpointId,
824    pub waitpoint_key: String,
825    /// Current waitpoint state: `pending`, `active`, `closed`. Callers
826    /// typically filter to `pending` or `active`.
827    pub state: String,
828    /// HMAC-SHA1 token minted at create time; required by
829    /// `ff_deliver_signal` and `ff_buffer_signal_for_pending_waitpoint`.
830    pub waitpoint_token: WaitpointToken,
831    /// Signal names the resume condition is waiting for. Reviewers that
832    /// need to drive a specific waitpoint — particularly when multiple
833    /// concurrent waitpoints exist on one execution — filter on this to
834    /// pick the right target.
835    ///
836    /// An EMPTY vec means the condition matches any signal (wildcard, per
837    /// `lua/helpers.lua` `initialize_condition`). Callers must not infer
838    /// "no waitpoint" from empty; check `state` / length of the outer
839    /// list for that.
840    #[serde(default)]
841    pub required_signal_names: Vec<String>,
842    /// Timestamp when the waitpoint record was first written.
843    pub created_at: TimestampMs,
844    /// Timestamp when the waitpoint was activated (suspension landed).
845    /// `None` while the waitpoint is still `pending`.
846    #[serde(default, skip_serializing_if = "Option::is_none")]
847    pub activated_at: Option<TimestampMs>,
848    /// Scheduled expiration timestamp. `None` if no timeout configured.
849    #[serde(default, skip_serializing_if = "Option::is_none")]
850    pub expires_at: Option<TimestampMs>,
851}
852
853// ─── expire_suspension ───
854
855#[derive(Clone, Debug, Serialize, Deserialize)]
856pub struct ExpireSuspensionArgs {
857    pub execution_id: ExecutionId,
858}
859
860#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
861pub enum ExpireSuspensionResult {
862    /// Suspension expired with the given behavior applied.
863    Expired { behavior_applied: String },
864    /// Already resolved — no action needed.
865    AlreadySatisfied { reason: String },
866}
867
868// ─── claim_resumed_execution ───
869
870#[derive(Clone, Debug, Serialize, Deserialize)]
871pub struct ClaimResumedExecutionArgs {
872    pub execution_id: ExecutionId,
873    pub worker_id: WorkerId,
874    pub worker_instance_id: WorkerInstanceId,
875    pub lane_id: LaneId,
876    pub lease_id: LeaseId,
877    pub lease_ttl_ms: u64,
878    /// Current attempt index (for KEYS construction — from exec_core).
879    pub current_attempt_index: AttemptIndex,
880    /// Remaining attempt timeout from before suspension (ms). 0 = no timeout.
881    #[serde(default)]
882    pub remaining_attempt_timeout_ms: Option<u64>,
883    pub now: TimestampMs,
884}
885
886#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
887pub struct ClaimedResumedExecution {
888    pub execution_id: ExecutionId,
889    pub lease_id: LeaseId,
890    pub lease_epoch: LeaseEpoch,
891    pub attempt_index: AttemptIndex,
892    pub attempt_id: AttemptId,
893    pub lease_expires_at: TimestampMs,
894}
895
896#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
897pub enum ClaimResumedExecutionResult {
898    /// Successfully claimed resumed execution (same attempt continues).
899    Claimed(ClaimedResumedExecution),
900}
901
902// ═══════════════════════════════════════════════════════════════════════
903// Phase 4 contracts: stream
904// ═══════════════════════════════════════════════════════════════════════
905
906// ─── append_frame ───
907
908#[derive(Clone, Debug, Serialize, Deserialize)]
909pub struct AppendFrameArgs {
910    pub execution_id: ExecutionId,
911    pub attempt_index: AttemptIndex,
912    pub lease_id: LeaseId,
913    pub lease_epoch: LeaseEpoch,
914    pub attempt_id: AttemptId,
915    pub frame_type: String,
916    pub timestamp: TimestampMs,
917    pub payload: Vec<u8>,
918    #[serde(default)]
919    pub encoding: Option<String>,
920    /// Optional structured metadata for the frame (JSON blob).
921    #[serde(default)]
922    pub metadata_json: Option<String>,
923    #[serde(default)]
924    pub correlation_id: Option<String>,
925    #[serde(default)]
926    pub source: Option<String>,
927    /// MAXLEN for the stream. 0 = no trim.
928    #[serde(default)]
929    pub retention_maxlen: Option<u32>,
930    /// Max payload bytes per frame. Default: 65536.
931    #[serde(default)]
932    pub max_payload_bytes: Option<u32>,
933}
934
935#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
936pub enum AppendFrameResult {
937    /// Frame appended successfully.
938    Appended {
939        /// Valkey Stream entry ID (e.g. "1713100800150-0").
940        entry_id: String,
941        /// Total frame count after this append.
942        frame_count: u64,
943    },
944}
945
946// ─── StreamCursor (issue #92) ───
947
948/// Opaque cursor for attempt-stream reads/tails.
949///
950/// Replaces the bare `&str` / `String` stream-id parameters previously
951/// carried on `read_stream` / `tail_stream` / `ReadStreamParams` /
952/// `TailStreamParams`. The wire form is a flat string — serde is
953/// transparent via `try_from`/`into` — so `?from=start&to=end` and
954/// `?after=123-0` continue to work for REST clients.
955///
956/// # Public wire grammar
957///
958/// The ONLY accepted tokens are:
959///
960/// * `"start"` — first entry in the stream (XRANGE `-` equivalent).
961///   Valid in `read_stream` / `ReadStreamParams`.
962/// * `"end"` — latest entry in the stream (XRANGE `+` equivalent).
963///   Valid in `read_stream` / `ReadStreamParams`.
964/// * `"<ms>"` or `"<ms>-<seq>"` — a concrete Valkey Stream entry id.
965///   Valid everywhere.
966///
967/// The bare XRANGE/XREAD markers `"-"` and `"+"` are **NOT** accepted
968/// on the wire. The opaque `StreamCursor` grammar is the public
969/// contract; the Valkey `-`/`+` markers are an internal implementation
970/// detail carried only inside the Lua-adjacent [`ReadFramesArgs`] /
971/// `xread_block` path via [`StreamCursor::to_wire`].
972///
973/// For XREAD (tail), the documented "from the beginning" convention is
974/// `StreamCursor::At("0-0".into())` — use the convenience constructor
975/// [`StreamCursor::from_beginning`] which returns exactly that value.
976/// `Start` / `End` are rejected by the SDK's `tail_stream` boundary
977/// because XREAD does not accept `-` / `+` as cursors. The
978/// [`StreamCursor::is_concrete`] helper centralises this
979/// Start/End-vs-At decision for boundary-validation call sites.
980///
981/// # Why an enum instead of a string
982///
983/// A string parameter lets malformed ids escape to the Lua/Valkey
984/// layer, surfacing as a script error and HTTP 500. An enum with
985/// fallible `FromStr` / `TryFrom<String>` catches every malformed input
986/// at the wire boundary with a structured error, and prevents bare `-`
987/// / `+` from leaking into consumer code as tacit extensions of the
988/// public API.
989#[derive(Clone, Debug, PartialEq, Eq, Hash)]
990pub enum StreamCursor {
991    /// First entry in the stream (XRANGE start marker).
992    Start,
993    /// Latest entry in the stream (XRANGE end marker).
994    End,
995    /// A concrete Valkey Stream entry id (`<ms>` or `<ms>-<seq>`).
996    ///
997    /// For XREAD-style tails, the documented "from the beginning"
998    /// convention is `At("0-0".to_owned())` — see
999    /// [`StreamCursor::from_beginning`].
1000    At(String),
1001}
1002
1003impl StreamCursor {
1004    /// Convenience constructor for the XREAD-from-beginning convention
1005    /// (`"0-0"`). XREAD's `last_id` is exclusive, so passing this as
1006    /// the `after` cursor returns every entry in the stream.
1007    pub fn from_beginning() -> Self {
1008        Self::At("0-0".to_owned())
1009    }
1010
1011    /// Serde default helper — emits `StreamCursor::Start`. Used as
1012    /// `#[serde(default = "StreamCursor::start")]` on REST query
1013    /// structs.
1014    pub fn start() -> Self {
1015        Self::Start
1016    }
1017
1018    /// Serde default helper — emits `StreamCursor::End`.
1019    pub fn end() -> Self {
1020        Self::End
1021    }
1022
1023    /// Serde default helper — emits
1024    /// `StreamCursor::from_beginning()`. Used as the default for
1025    /// `TailStreamParams::after`.
1026    pub fn beginning() -> Self {
1027        Self::from_beginning()
1028    }
1029
1030    /// Internal-only: lower the cursor to the XRANGE/XREAD marker
1031    /// string Valkey expects. `Start → "-"`, `End → "+"`,
1032    /// `At(s) → s`.
1033    ///
1034    /// Used at the ff-script adapter edge (right before constructing
1035    /// `ReadFramesArgs` or calling `xread_block`) to translate the
1036    /// opaque wire grammar into the Lua-ABI form. NOT part of the
1037    /// public wire — do not emit these raw characters to consumers.
1038    /// Hidden from the generated docs to discourage external use;
1039    /// external consumers should never need to see the raw `-` / `+`.
1040    #[doc(hidden)]
1041    pub fn to_wire(&self) -> &str {
1042        match self {
1043            Self::Start => "-",
1044            Self::End => "+",
1045            Self::At(s) => s.as_str(),
1046        }
1047    }
1048
1049    /// Internal-only owned variant of [`Self::to_wire`] — moves the
1050    /// inner `String` out of `At(s)` without cloning. Use at adapter
1051    /// edges that construct an owned wire string (e.g.
1052    /// `ReadFramesArgs.from_id`) from a `StreamCursor` that is about
1053    /// to be dropped.
1054    #[doc(hidden)]
1055    pub fn into_wire_string(self) -> String {
1056        match self {
1057            Self::Start => "-".to_owned(),
1058            Self::End => "+".to_owned(),
1059            Self::At(s) => s,
1060        }
1061    }
1062
1063    /// True iff this cursor is a concrete entry id
1064    /// (`"<ms>"` / `"<ms>-<seq>"`). False for the open markers
1065    /// `Start` / `End`.
1066    ///
1067    /// Used by boundaries like XREAD (tailing) that do not accept
1068    /// open markers — rejecting a cursor is equivalent to
1069    /// `!cursor.is_concrete()`. Centralised here to keep the SDK and
1070    /// REST guards in lock-step.
1071    pub fn is_concrete(&self) -> bool {
1072        matches!(self, Self::At(_))
1073    }
1074}
1075
1076impl std::fmt::Display for StreamCursor {
1077    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1078        match self {
1079            Self::Start => f.write_str("start"),
1080            Self::End => f.write_str("end"),
1081            Self::At(s) => f.write_str(s),
1082        }
1083    }
1084}
1085
1086/// Error produced when parsing a [`StreamCursor`] from a string.
1087#[derive(Clone, Debug, PartialEq, Eq)]
1088pub enum StreamCursorParseError {
1089    /// Empty input.
1090    Empty,
1091    /// Input matched a rejected bare-marker alias (`"-"`, `"+"`).
1092    /// The public wire requires `"start"` / `"end"`; the raw Valkey
1093    /// markers are internal-only.
1094    BareMarkerRejected(String),
1095    /// Input was neither a recognized keyword nor a well-formed
1096    /// Stream entry id. Entry ids must match `^\d+(?:-\d+)?$`.
1097    Malformed(String),
1098}
1099
1100impl std::fmt::Display for StreamCursorParseError {
1101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1102        match self {
1103            Self::Empty => f.write_str("stream cursor must not be empty"),
1104            Self::BareMarkerRejected(s) => write!(
1105                f,
1106                "bare marker '{s}' is not a valid stream cursor; use 'start' or 'end'"
1107            ),
1108            Self::Malformed(s) => write!(
1109                f,
1110                "invalid stream cursor '{s}' (expected 'start', 'end', '<ms>', or '<ms>-<seq>')"
1111            ),
1112        }
1113    }
1114}
1115
1116impl std::error::Error for StreamCursorParseError {}
1117
1118/// Shared grammar check — classifies `s` as `Start` / `End` / a
1119/// concrete-id shape / malformed / empty, WITHOUT allocating. The
1120/// owned vs borrowed entry points ([`StreamCursor::from_str`],
1121/// [`StreamCursor::try_from`]) consume this classification and move
1122/// the owned `String` into `At` when applicable, avoiding a
1123/// round-trip `String → &str → String::to_owned` for the common
1124/// REST-query path.
1125enum StreamCursorClass {
1126    Start,
1127    End,
1128    Concrete,
1129    BareMarker,
1130    Empty,
1131    Malformed,
1132}
1133
1134fn classify_stream_cursor(s: &str) -> StreamCursorClass {
1135    if s.is_empty() {
1136        return StreamCursorClass::Empty;
1137    }
1138    if s == "-" || s == "+" {
1139        return StreamCursorClass::BareMarker;
1140    }
1141    if s == "start" {
1142        return StreamCursorClass::Start;
1143    }
1144    if s == "end" {
1145        return StreamCursorClass::End;
1146    }
1147    if !s.is_ascii() {
1148        return StreamCursorClass::Malformed;
1149    }
1150    let (ms_part, seq_part) = match s.split_once('-') {
1151        Some((ms, seq)) => (ms, Some(seq)),
1152        None => (s, None),
1153    };
1154    let ms_ok = !ms_part.is_empty() && ms_part.bytes().all(|b| b.is_ascii_digit());
1155    let seq_ok = seq_part
1156        .map(|p| !p.is_empty() && p.bytes().all(|b| b.is_ascii_digit()))
1157        .unwrap_or(true);
1158    if ms_ok && seq_ok {
1159        StreamCursorClass::Concrete
1160    } else {
1161        StreamCursorClass::Malformed
1162    }
1163}
1164
1165impl std::str::FromStr for StreamCursor {
1166    type Err = StreamCursorParseError;
1167
1168    fn from_str(s: &str) -> Result<Self, Self::Err> {
1169        match classify_stream_cursor(s) {
1170            StreamCursorClass::Start => Ok(Self::Start),
1171            StreamCursorClass::End => Ok(Self::End),
1172            StreamCursorClass::Concrete => Ok(Self::At(s.to_owned())),
1173            StreamCursorClass::BareMarker => {
1174                Err(StreamCursorParseError::BareMarkerRejected(s.to_owned()))
1175            }
1176            StreamCursorClass::Empty => Err(StreamCursorParseError::Empty),
1177            StreamCursorClass::Malformed => {
1178                Err(StreamCursorParseError::Malformed(s.to_owned()))
1179            }
1180        }
1181    }
1182}
1183
1184impl TryFrom<String> for StreamCursor {
1185    type Error = StreamCursorParseError;
1186
1187    fn try_from(s: String) -> Result<Self, Self::Error> {
1188        // Owned parsing path — the `At` variant moves `s` in directly,
1189        // avoiding the `&str → String::to_owned` re-allocation that a
1190        // blind forward to `FromStr::from_str(&s)` would force. Error
1191        // paths still pay one allocation to describe the offending
1192        // input.
1193        match classify_stream_cursor(&s) {
1194            StreamCursorClass::Start => Ok(Self::Start),
1195            StreamCursorClass::End => Ok(Self::End),
1196            StreamCursorClass::Concrete => Ok(Self::At(s)),
1197            StreamCursorClass::BareMarker => {
1198                Err(StreamCursorParseError::BareMarkerRejected(s))
1199            }
1200            StreamCursorClass::Empty => Err(StreamCursorParseError::Empty),
1201            StreamCursorClass::Malformed => Err(StreamCursorParseError::Malformed(s)),
1202        }
1203    }
1204}
1205
1206impl From<StreamCursor> for String {
1207    fn from(c: StreamCursor) -> Self {
1208        c.to_string()
1209    }
1210}
1211
1212impl Serialize for StreamCursor {
1213    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
1214        serializer.collect_str(self)
1215    }
1216}
1217
1218impl<'de> Deserialize<'de> for StreamCursor {
1219    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
1220        let s = String::deserialize(deserializer)?;
1221        Self::try_from(s).map_err(serde::de::Error::custom)
1222    }
1223}
1224
1225// ─── read_attempt_stream / tail_attempt_stream ───
1226
1227/// Hard cap on the number of frames returned by a single read/tail call.
1228///
1229/// Single source of truth across the Rust layer (ff-script, ff-server,
1230/// ff-sdk). The Lua side in `lua/stream.lua` keeps a matching literal with
1231/// an inline reference back here; bump both together if you ever need to
1232/// lift the cap.
1233pub const STREAM_READ_HARD_CAP: u64 = 10_000;
1234
1235/// A single frame read from an attempt-scoped stream.
1236///
1237/// Field set mirrors what `ff_append_frame` writes: `frame_type`, `ts`,
1238/// `payload`, `encoding`, `source`, and optionally `correlation_id`. Stored
1239/// as an ordered map so field order is deterministic across read calls.
1240#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1241pub struct StreamFrame {
1242    /// Valkey Stream entry ID, e.g. "1713100800150-0".
1243    pub id: String,
1244    /// Frame fields in sorted order.
1245    pub fields: std::collections::BTreeMap<String, String>,
1246}
1247
1248/// Inputs to `ff_read_attempt_stream` (XRANGE wrapper).
1249#[derive(Clone, Debug, Serialize, Deserialize)]
1250pub struct ReadFramesArgs {
1251    pub execution_id: ExecutionId,
1252    pub attempt_index: AttemptIndex,
1253    /// XRANGE start ID. Use "-" for earliest.
1254    pub from_id: String,
1255    /// XRANGE end ID. Use "+" for latest.
1256    pub to_id: String,
1257    /// XRANGE COUNT limit. MUST be `>= 1`. The REST and SDK layers reject
1258    /// `0` at the boundary; the Lua side rejects it too. `STREAM_READ_HARD_CAP`
1259    /// is the upper bound.
1260    pub count_limit: u64,
1261}
1262
1263/// Result of reading frames from an attempt stream — frames plus terminal
1264/// signal so consumers can stop polling without a timeout fallback.
1265#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1266pub struct StreamFrames {
1267    /// Entries in the requested range (possibly empty).
1268    pub frames: Vec<StreamFrame>,
1269    /// Timestamp when the upstream writer closed the stream. `None` if the
1270    /// stream is still open (or has never been written).
1271    #[serde(default, skip_serializing_if = "Option::is_none")]
1272    pub closed_at: Option<TimestampMs>,
1273    /// Reason from the closing writer. Current values:
1274    /// `attempt_success`, `attempt_failure`, `attempt_cancelled`,
1275    /// `attempt_interrupted`. `None` iff the stream is still open.
1276    #[serde(default, skip_serializing_if = "Option::is_none")]
1277    pub closed_reason: Option<String>,
1278}
1279
1280impl StreamFrames {
1281    /// Construct an empty open-stream result (no frames, no terminal
1282    /// markers). Useful for fast-path peek helpers.
1283    pub fn empty_open() -> Self {
1284        Self {
1285            frames: Vec::new(),
1286            closed_at: None,
1287            closed_reason: None,
1288        }
1289    }
1290
1291    /// True iff the producer has closed this stream. Consumers should stop
1292    /// polling and drain once this returns true.
1293    pub fn is_closed(&self) -> bool {
1294        self.closed_at.is_some()
1295    }
1296}
1297
1298#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1299pub enum ReadFramesResult {
1300    /// Frames returned (possibly empty) plus optional closed markers.
1301    Frames(StreamFrames),
1302}
1303
1304// ═══════════════════════════════════════════════════════════════════════
1305// Phase 5 contracts: budget, quota, block/unblock
1306// ═══════════════════════════════════════════════════════════════════════
1307
1308// ─── create_budget ───
1309
1310#[derive(Clone, Debug, Serialize, Deserialize)]
1311pub struct CreateBudgetArgs {
1312    pub budget_id: crate::types::BudgetId,
1313    pub scope_type: String,
1314    pub scope_id: String,
1315    pub enforcement_mode: String,
1316    pub on_hard_limit: String,
1317    pub on_soft_limit: String,
1318    pub reset_interval_ms: u64,
1319    /// Dimension names.
1320    pub dimensions: Vec<String>,
1321    /// Hard limits per dimension (parallel with dimensions).
1322    pub hard_limits: Vec<u64>,
1323    /// Soft limits per dimension (parallel with dimensions).
1324    pub soft_limits: Vec<u64>,
1325    pub now: TimestampMs,
1326}
1327
1328#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1329pub enum CreateBudgetResult {
1330    /// Budget created.
1331    Created { budget_id: crate::types::BudgetId },
1332    /// Already exists (idempotent).
1333    AlreadySatisfied { budget_id: crate::types::BudgetId },
1334}
1335
1336// ─── create_quota_policy ───
1337
1338#[derive(Clone, Debug, Serialize, Deserialize)]
1339pub struct CreateQuotaPolicyArgs {
1340    pub quota_policy_id: crate::types::QuotaPolicyId,
1341    pub window_seconds: u64,
1342    pub max_requests_per_window: u64,
1343    pub max_concurrent: u64,
1344    pub now: TimestampMs,
1345}
1346
1347#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1348pub enum CreateQuotaPolicyResult {
1349    /// Quota policy created.
1350    Created {
1351        quota_policy_id: crate::types::QuotaPolicyId,
1352    },
1353    /// Already exists (idempotent).
1354    AlreadySatisfied {
1355        quota_policy_id: crate::types::QuotaPolicyId,
1356    },
1357}
1358
1359// ─── budget_status (read-only) ───
1360
1361/// Operator-facing budget status snapshot (not an FCALL — direct HGETALL reads).
1362#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1363pub struct BudgetStatus {
1364    pub budget_id: String,
1365    pub scope_type: String,
1366    pub scope_id: String,
1367    pub enforcement_mode: String,
1368    /// Current usage per dimension: {dimension_name: current_value}.
1369    pub usage: HashMap<String, u64>,
1370    /// Hard limits per dimension: {dimension_name: limit}.
1371    pub hard_limits: HashMap<String, u64>,
1372    /// Soft limits per dimension: {dimension_name: limit}.
1373    pub soft_limits: HashMap<String, u64>,
1374    pub breach_count: u64,
1375    pub soft_breach_count: u64,
1376    pub last_breach_at: Option<String>,
1377    pub last_breach_dim: Option<String>,
1378    pub next_reset_at: Option<String>,
1379    pub created_at: Option<String>,
1380}
1381
1382// ─── report_usage_and_check ───
1383
1384#[derive(Clone, Debug, Serialize, Deserialize)]
1385pub struct ReportUsageArgs {
1386    /// Dimension names to increment.
1387    pub dimensions: Vec<String>,
1388    /// Increment values (parallel with dimensions).
1389    pub deltas: Vec<u64>,
1390    pub now: TimestampMs,
1391    /// Optional idempotency key to prevent double-counting on retries.
1392    /// Pass the raw dedup id (e.g. `"retry-42"`); the typed FCALL wrapper
1393    /// wraps it into `ff:usagededup:{b:M}:<id>` using the budget
1394    /// partition's hash tag so it co-locates with the other budget keys
1395    /// (#108).
1396    #[serde(default)]
1397    pub dedup_key: Option<String>,
1398}
1399
1400#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1401pub enum ReportUsageResult {
1402    /// All increments applied, no breach.
1403    Ok,
1404    /// Soft limit breached on a dimension (advisory, increments applied).
1405    SoftBreach {
1406        dimension: String,
1407        current_usage: u64,
1408        soft_limit: u64,
1409    },
1410    /// Hard limit breached (increments NOT applied).
1411    HardBreach {
1412        dimension: String,
1413        current_usage: u64,
1414        hard_limit: u64,
1415    },
1416    /// Dedup key matched — usage already applied in a prior call.
1417    AlreadyApplied,
1418}
1419
1420// ─── reset_budget ───
1421
1422#[derive(Clone, Debug, Serialize, Deserialize)]
1423pub struct ResetBudgetArgs {
1424    pub budget_id: crate::types::BudgetId,
1425    pub now: TimestampMs,
1426}
1427
1428#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1429pub enum ResetBudgetResult {
1430    /// Budget reset successfully.
1431    Reset { next_reset_at: TimestampMs },
1432}
1433
1434// ─── check_admission_and_record ───
1435
1436#[derive(Clone, Debug, Serialize, Deserialize)]
1437pub struct CheckAdmissionArgs {
1438    pub execution_id: ExecutionId,
1439    pub now: TimestampMs,
1440    pub window_seconds: u64,
1441    pub rate_limit: u64,
1442    pub concurrency_cap: u64,
1443    #[serde(default)]
1444    pub jitter_ms: Option<u64>,
1445}
1446
1447#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1448pub enum CheckAdmissionResult {
1449    /// Admitted — execution may proceed.
1450    Admitted,
1451    /// Already admitted in this window (idempotent).
1452    AlreadyAdmitted,
1453    /// Rate limit exceeded.
1454    RateExceeded { retry_after_ms: u64 },
1455    /// Concurrency cap hit.
1456    ConcurrencyExceeded,
1457}
1458
1459// ─── release_admission ───
1460
1461#[derive(Clone, Debug, Serialize, Deserialize)]
1462pub struct ReleaseAdmissionArgs {
1463    pub execution_id: ExecutionId,
1464}
1465
1466#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1467pub enum ReleaseAdmissionResult {
1468    Released,
1469}
1470
1471// ─── block_execution_for_admission ───
1472
1473#[derive(Clone, Debug, Serialize, Deserialize)]
1474pub struct BlockExecutionArgs {
1475    pub execution_id: ExecutionId,
1476    pub blocking_reason: String,
1477    #[serde(default)]
1478    pub blocking_detail: Option<String>,
1479    pub now: TimestampMs,
1480}
1481
1482#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1483pub enum BlockExecutionResult {
1484    /// Execution blocked.
1485    Blocked,
1486}
1487
1488// ─── unblock_execution ───
1489
1490#[derive(Clone, Debug, Serialize, Deserialize)]
1491pub struct UnblockExecutionArgs {
1492    pub execution_id: ExecutionId,
1493    pub now: TimestampMs,
1494    /// Expected blocking reason (prevents stale unblock).
1495    #[serde(default)]
1496    pub expected_blocking_reason: Option<String>,
1497}
1498
1499#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1500pub enum UnblockExecutionResult {
1501    /// Execution unblocked and moved to eligible.
1502    Unblocked,
1503}
1504
1505// ═══════════════════════════════════════════════════════════════════════
1506// Phase 6 contracts: flow coordination and dependencies
1507// ═══════════════════════════════════════════════════════════════════════
1508
1509// ─── create_flow ───
1510
1511#[derive(Clone, Debug, Serialize, Deserialize)]
1512pub struct CreateFlowArgs {
1513    pub flow_id: crate::types::FlowId,
1514    pub flow_kind: String,
1515    pub namespace: Namespace,
1516    pub now: TimestampMs,
1517}
1518
1519#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1520pub enum CreateFlowResult {
1521    /// Flow created successfully.
1522    Created { flow_id: crate::types::FlowId },
1523    /// Flow already exists (idempotent).
1524    AlreadySatisfied { flow_id: crate::types::FlowId },
1525}
1526
1527// ─── add_execution_to_flow ───
1528
1529#[derive(Clone, Debug, Serialize, Deserialize)]
1530pub struct AddExecutionToFlowArgs {
1531    pub flow_id: crate::types::FlowId,
1532    pub execution_id: ExecutionId,
1533    pub now: TimestampMs,
1534}
1535
1536#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1537pub enum AddExecutionToFlowResult {
1538    /// Execution added to flow.
1539    Added {
1540        execution_id: ExecutionId,
1541        new_node_count: u32,
1542    },
1543    /// Already a member (idempotent).
1544    AlreadyMember {
1545        execution_id: ExecutionId,
1546        node_count: u32,
1547    },
1548}
1549
1550// ─── cancel_flow ───
1551
1552#[derive(Clone, Debug, Serialize, Deserialize)]
1553pub struct CancelFlowArgs {
1554    pub flow_id: crate::types::FlowId,
1555    pub reason: String,
1556    pub cancellation_policy: String,
1557    pub now: TimestampMs,
1558}
1559
1560#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1561pub enum CancelFlowResult {
1562    /// Flow cancelled and all member cancellations (if any) have completed
1563    /// synchronously. Used when `cancellation_policy != "cancel_all"`, when
1564    /// the flow has no members, when the caller opted into synchronous
1565    /// dispatch (e.g. `?wait=true`), or when the flow was already in a
1566    /// terminal state (idempotent retry).
1567    ///
1568    /// On the idempotent-retry path `member_execution_ids` may be *capped*
1569    /// at the server (default 1000 entries) to bound response bandwidth on
1570    /// flows with very large membership. The first (non-idempotent) call
1571    /// always returns the full list, so clients that need every member id
1572    /// should persist the initial response.
1573    Cancelled {
1574        cancellation_policy: String,
1575        member_execution_ids: Vec<String>,
1576    },
1577    /// Flow state was flipped to cancelled atomically, but member
1578    /// cancellations are dispatched asynchronously in the background.
1579    /// Clients may poll `GET /v1/executions/{id}/state` for each member
1580    /// execution id to track terminal state.
1581    CancellationScheduled {
1582        cancellation_policy: String,
1583        member_count: u32,
1584        member_execution_ids: Vec<String>,
1585    },
1586    /// `?wait=true` dispatch completed but one or more member cancellations
1587    /// failed mid-loop (e.g. ghost member, Lua error, transport fault after
1588    /// retries exhausted). The flow itself is still flipped to cancelled
1589    /// (atomic Lua already ran); callers SHOULD inspect
1590    /// `failed_member_execution_ids` and either retry those ids directly
1591    /// via `cancel_execution` or wait for the cancel-backlog reconciler
1592    /// to retry them in the background.
1593    ///
1594    /// Only emitted by the synchronous wait path
1595    /// ([`crate::CancelFlowArgs`] via `?wait=true`). The async path returns
1596    /// [`CancelFlowResult::CancellationScheduled`] and delegates retries
1597    /// to the reconciler — there is no visible "partial" state on the
1598    /// async path because the dispatch result is not observed inline.
1599    PartiallyCancelled {
1600        cancellation_policy: String,
1601        /// All member execution ids that the cancel_flow FCALL returned
1602        /// (i.e. the full membership at the moment of cancellation).
1603        member_execution_ids: Vec<String>,
1604        /// Strict subset of `member_execution_ids` whose per-member cancel
1605        /// FCALL returned an error. Order is deterministic (matches the
1606        /// iteration order over `member_execution_ids`).
1607        failed_member_execution_ids: Vec<String>,
1608    },
1609}
1610
1611// ─── stage_dependency_edge ───
1612
1613#[derive(Clone, Debug, Serialize, Deserialize)]
1614pub struct StageDependencyEdgeArgs {
1615    pub flow_id: crate::types::FlowId,
1616    pub edge_id: crate::types::EdgeId,
1617    pub upstream_execution_id: ExecutionId,
1618    pub downstream_execution_id: ExecutionId,
1619    #[serde(default = "default_dependency_kind")]
1620    pub dependency_kind: String,
1621    #[serde(default)]
1622    pub data_passing_ref: Option<String>,
1623    pub expected_graph_revision: u64,
1624    pub now: TimestampMs,
1625}
1626
1627fn default_dependency_kind() -> String {
1628    "success_only".to_owned()
1629}
1630
1631#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1632pub enum StageDependencyEdgeResult {
1633    /// Edge staged, new graph revision.
1634    Staged {
1635        edge_id: crate::types::EdgeId,
1636        new_graph_revision: u64,
1637    },
1638}
1639
1640// ─── apply_dependency_to_child ───
1641
1642#[derive(Clone, Debug, Serialize, Deserialize)]
1643pub struct ApplyDependencyToChildArgs {
1644    pub flow_id: crate::types::FlowId,
1645    pub edge_id: crate::types::EdgeId,
1646    /// The child execution that receives the dependency.
1647    pub downstream_execution_id: ExecutionId,
1648    pub upstream_execution_id: ExecutionId,
1649    pub graph_revision: u64,
1650    #[serde(default = "default_dependency_kind")]
1651    pub dependency_kind: String,
1652    #[serde(default)]
1653    pub data_passing_ref: Option<String>,
1654    pub now: TimestampMs,
1655}
1656
1657#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1658pub enum ApplyDependencyToChildResult {
1659    /// Dependency applied, N unsatisfied deps remaining.
1660    Applied { unsatisfied_count: u32 },
1661    /// Already applied (idempotent).
1662    AlreadyApplied,
1663}
1664
1665// ─── resolve_dependency ───
1666
1667#[derive(Clone, Debug, Serialize, Deserialize)]
1668pub struct ResolveDependencyArgs {
1669    pub edge_id: crate::types::EdgeId,
1670    /// "success", "failed", "cancelled", "expired", "skipped"
1671    pub upstream_outcome: String,
1672    pub now: TimestampMs,
1673}
1674
1675#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1676pub enum ResolveDependencyResult {
1677    /// Edge satisfied — downstream may become eligible.
1678    Satisfied,
1679    /// Edge made impossible — downstream becomes skipped.
1680    Impossible,
1681    /// Already resolved (idempotent).
1682    AlreadyResolved,
1683}
1684
1685// ─── promote_blocked_to_eligible ───
1686
1687#[derive(Clone, Debug, Serialize, Deserialize)]
1688pub struct PromoteBlockedToEligibleArgs {
1689    pub execution_id: ExecutionId,
1690    pub now: TimestampMs,
1691}
1692
1693#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1694pub enum PromoteBlockedToEligibleResult {
1695    Promoted,
1696}
1697
1698// ─── evaluate_flow_eligibility ───
1699
1700#[derive(Clone, Debug, Serialize, Deserialize)]
1701pub struct EvaluateFlowEligibilityArgs {
1702    pub execution_id: ExecutionId,
1703}
1704
1705#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1706pub enum EvaluateFlowEligibilityResult {
1707    /// Execution eligibility status.
1708    Status { status: String },
1709}
1710
1711// ─── replay_execution ───
1712
1713#[derive(Clone, Debug, Serialize, Deserialize)]
1714pub struct ReplayExecutionArgs {
1715    pub execution_id: ExecutionId,
1716    pub now: TimestampMs,
1717}
1718
1719#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1720pub enum ReplayExecutionResult {
1721    /// Replayed to runnable.
1722    Replayed { public_state: PublicState },
1723}
1724
1725// ─── get_execution (full read) ───
1726
1727/// Full execution info returned by `Server::get_execution`.
1728#[derive(Clone, Debug, Serialize, Deserialize)]
1729pub struct ExecutionInfo {
1730    pub execution_id: ExecutionId,
1731    pub namespace: String,
1732    pub lane_id: String,
1733    pub priority: i32,
1734    pub execution_kind: String,
1735    pub state_vector: StateVector,
1736    pub public_state: PublicState,
1737    pub created_at: String,
1738    /// TimestampMs (ms since epoch) when the execution's first attempt
1739    /// was started by a worker claim. Empty string until the first
1740    /// claim lands. Serialised as `Option<String>` so pre-claim reads
1741    /// deserialise cleanly even if the field is absent from the wire.
1742    #[serde(default, skip_serializing_if = "Option::is_none")]
1743    pub started_at: Option<String>,
1744    /// TimestampMs when the execution reached a terminal
1745    /// `completed`/`failed`/`cancelled`/`expired` state. Empty /
1746    /// absent while still in flight.
1747    #[serde(default, skip_serializing_if = "Option::is_none")]
1748    pub completed_at: Option<String>,
1749    pub current_attempt_index: u32,
1750    pub flow_id: Option<String>,
1751    pub blocking_detail: String,
1752}
1753
1754// ─── set_execution_tags / set_flow_tags (issue #58.4) ───
1755
1756/// Args for `ff_set_execution_tags`. Tag keys MUST match
1757/// `^[a-z][a-z0-9_]*\.` — the caller-namespace rule — or the FCALL
1758/// returns `invalid_tag_key`. Values are arbitrary strings. The map is
1759/// ordered (`BTreeMap`) so two callers submitting the same logical set
1760/// of tags produce identical ARGV.
1761#[derive(Clone, Debug, Serialize, Deserialize)]
1762pub struct SetExecutionTagsArgs {
1763    pub execution_id: ExecutionId,
1764    pub tags: BTreeMap<String, String>,
1765}
1766
1767/// Result of `ff_set_execution_tags`.
1768#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1769pub enum SetExecutionTagsResult {
1770    /// Tags written. `count` is the number of key-value pairs applied.
1771    Ok { count: u32 },
1772}
1773
1774/// Args for `ff_set_flow_tags`. Same namespace rule as
1775/// [`SetExecutionTagsArgs`]. The Lua function also lazy-migrates any
1776/// pre-58.4 reserved-namespace fields stashed inline on `flow_core` into
1777/// the new tags key.
1778#[derive(Clone, Debug, Serialize, Deserialize)]
1779pub struct SetFlowTagsArgs {
1780    pub flow_id: FlowId,
1781    pub tags: BTreeMap<String, String>,
1782}
1783
1784/// Result of `ff_set_flow_tags`.
1785#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1786pub enum SetFlowTagsResult {
1787    /// Tags written. `count` is the number of key-value pairs applied.
1788    Ok { count: u32 },
1789}
1790
1791// ─── describe_execution (issue #58.1) ───
1792
1793/// Engine-decoupled read-model for one execution.
1794///
1795/// Returned by `ff_sdk::FlowFabricWorker::describe_execution`. Consumers
1796/// consult this struct instead of reaching into Valkey's exec_core hash
1797/// directly — the engine is free to rename fields or restructure storage
1798/// under this surface.
1799///
1800/// `#[non_exhaustive]` — FF may add fields in minor releases without a
1801/// semver break. Match with `..` or use field-by-field construction.
1802#[derive(Clone, Debug, PartialEq, Eq)]
1803#[non_exhaustive]
1804pub struct ExecutionSnapshot {
1805    pub execution_id: ExecutionId,
1806    pub flow_id: Option<FlowId>,
1807    pub lane_id: LaneId,
1808    pub namespace: Namespace,
1809    pub public_state: PublicState,
1810    /// Blocking reason string (e.g. `"waiting_for_worker"`,
1811    /// `"waiting_for_delay"`, `"waiting_for_dependencies"`). `None` when
1812    /// the exec_core field is empty.
1813    pub blocking_reason: Option<String>,
1814    /// Free-form operator-readable detail explaining `blocking_reason`.
1815    /// `None` when the exec_core field is empty.
1816    pub blocking_detail: Option<String>,
1817    /// Summary of the execution's currently-active attempt. `None` when
1818    /// no attempt has been started (pre-claim) or when the exec_core
1819    /// attempt fields are all empty.
1820    pub current_attempt: Option<AttemptSummary>,
1821    /// Summary of the execution's currently-held lease. `None` when the
1822    /// execution is not held by a worker.
1823    pub current_lease: Option<LeaseSummary>,
1824    /// The waitpoint this execution is currently suspended on, if any.
1825    pub current_waitpoint: Option<WaitpointId>,
1826    pub created_at: TimestampMs,
1827    /// Timestamp of the last write that mutated exec_core. Engine-maintained.
1828    pub last_mutation_at: TimestampMs,
1829    pub total_attempt_count: u32,
1830    /// Caller-owned labels. The prefix `^[a-z][a-z0-9_]*\.` is reserved for
1831    /// consumer metadata (e.g. `cairn.task_id`); FF guarantees it will not
1832    /// write keys matching that shape. FF's own fields stay in snake_case
1833    /// without dots. Empty when no tags are set.
1834    pub tags: BTreeMap<String, String>,
1835}
1836
1837impl ExecutionSnapshot {
1838    /// Construct an [`ExecutionSnapshot`]. Present so downstream crates
1839    /// (ff-sdk's `describe_execution`) can assemble the struct despite
1840    /// the `#[non_exhaustive]` marker. Prefer adding builder-style
1841    /// helpers here over loosening `non_exhaustive`.
1842    #[allow(clippy::too_many_arguments)]
1843    pub fn new(
1844        execution_id: ExecutionId,
1845        flow_id: Option<FlowId>,
1846        lane_id: LaneId,
1847        namespace: Namespace,
1848        public_state: PublicState,
1849        blocking_reason: Option<String>,
1850        blocking_detail: Option<String>,
1851        current_attempt: Option<AttemptSummary>,
1852        current_lease: Option<LeaseSummary>,
1853        current_waitpoint: Option<WaitpointId>,
1854        created_at: TimestampMs,
1855        last_mutation_at: TimestampMs,
1856        total_attempt_count: u32,
1857        tags: BTreeMap<String, String>,
1858    ) -> Self {
1859        Self {
1860            execution_id,
1861            flow_id,
1862            lane_id,
1863            namespace,
1864            public_state,
1865            blocking_reason,
1866            blocking_detail,
1867            current_attempt,
1868            current_lease,
1869            current_waitpoint,
1870            created_at,
1871            last_mutation_at,
1872            total_attempt_count,
1873            tags,
1874        }
1875    }
1876}
1877
1878/// Currently-active attempt summary inside an [`ExecutionSnapshot`].
1879///
1880/// `#[non_exhaustive]`.
1881#[derive(Clone, Debug, PartialEq, Eq)]
1882#[non_exhaustive]
1883pub struct AttemptSummary {
1884    pub attempt_id: AttemptId,
1885    pub attempt_index: AttemptIndex,
1886}
1887
1888impl AttemptSummary {
1889    /// Construct an [`AttemptSummary`]. See [`ExecutionSnapshot::new`]
1890    /// for the rationale — `#[non_exhaustive]` blocks cross-crate
1891    /// struct-literal construction.
1892    pub fn new(attempt_id: AttemptId, attempt_index: AttemptIndex) -> Self {
1893        Self {
1894            attempt_id,
1895            attempt_index,
1896        }
1897    }
1898}
1899
1900/// Currently-held lease summary inside an [`ExecutionSnapshot`].
1901///
1902/// `#[non_exhaustive]`.
1903#[derive(Clone, Debug, PartialEq, Eq)]
1904#[non_exhaustive]
1905pub struct LeaseSummary {
1906    pub lease_epoch: LeaseEpoch,
1907    pub worker_instance_id: WorkerInstanceId,
1908    pub expires_at: TimestampMs,
1909}
1910
1911impl LeaseSummary {
1912    /// Construct a [`LeaseSummary`]. See [`ExecutionSnapshot::new`]
1913    /// for the rationale.
1914    pub fn new(
1915        lease_epoch: LeaseEpoch,
1916        worker_instance_id: WorkerInstanceId,
1917        expires_at: TimestampMs,
1918    ) -> Self {
1919        Self {
1920            lease_epoch,
1921            worker_instance_id,
1922            expires_at,
1923        }
1924    }
1925}
1926
1927// ─── Common sub-types ───
1928
1929// ─── describe_flow (issue #58.2) ───
1930
1931/// Engine-decoupled read-model for one flow.
1932///
1933/// Returned by `ff_sdk::FlowFabricWorker::describe_flow`. Consumers
1934/// consult this struct instead of reaching into Valkey's flow_core hash
1935/// directly — the engine is free to rename fields or restructure storage
1936/// under this surface.
1937///
1938/// `#[non_exhaustive]` — FF may add fields in minor releases without a
1939/// semver break. Match with `..` or use [`FlowSnapshot::new`].
1940///
1941/// # `public_flow_state`
1942///
1943/// Stored as an engine-written string literal on `flow_core`. Known
1944/// values today: `open`, `running`, `blocked`, `cancelled`, `completed`,
1945/// `failed`. Surfaced as `String` (not a typed enum) because FF does
1946/// not yet expose a `PublicFlowState` type — callers that need to act
1947/// on specific values should match on the literal. The flow_projector
1948/// writes a parallel `public_flow_state` into the flow's summary hash;
1949/// this field reflects the authoritative value on `flow_core`, which
1950/// is what mutation guards (cancel/add-member) consult.
1951///
1952/// # `tags`
1953///
1954/// Unlike [`ExecutionSnapshot::tags`] (which has a dedicated tags
1955/// hash), flow tags live inline on `flow_core`. FF's own fields are
1956/// snake_case without a `.`; any field whose name starts with
1957/// `<lowercase>.` (e.g. `cairn.task_id`) is treated as consumer-owned
1958/// metadata and routed here. An empty map means no namespaced tags
1959/// were written. The prefix convention mirrors
1960/// [`ExecutionSnapshot::tags`] — consumers should keep tag keys
1961/// namespaced (`cairn.*`, `operator.*`, etc.) so future FF field
1962/// additions don't collide.
1963#[derive(Clone, Debug, PartialEq, Eq)]
1964#[non_exhaustive]
1965pub struct FlowSnapshot {
1966    pub flow_id: FlowId,
1967    /// The `flow_kind` literal passed to `create_flow` (e.g. `dag`,
1968    /// `pipeline`). Preserved as-is; FF does not interpret it.
1969    pub flow_kind: String,
1970    pub namespace: Namespace,
1971    /// Authoritative flow state on `flow_core`. See the struct-level
1972    /// docs for the set of known values.
1973    pub public_flow_state: String,
1974    /// Monotonically increasing revision bumped on every structural
1975    /// mutation (add-member, stage-edge). Used by optimistic-concurrency
1976    /// writers via `expected_graph_revision`.
1977    pub graph_revision: u64,
1978    /// Number of member executions added so far. Never decremented.
1979    pub node_count: u32,
1980    /// Number of dependency edges staged so far. Never decremented.
1981    pub edge_count: u32,
1982    pub created_at: TimestampMs,
1983    /// Timestamp of the last write that mutated `flow_core`.
1984    /// Engine-maintained.
1985    pub last_mutation_at: TimestampMs,
1986    /// When the flow reached a terminal state via `cancel_flow`. `None`
1987    /// while the flow is live. Only written by the cancel path today;
1988    /// `completed`/`failed` terminal states do not populate this field
1989    /// (the flow_projector derives them from membership).
1990    pub cancelled_at: Option<TimestampMs>,
1991    /// Operator-supplied reason from the `cancel_flow` call. `None`
1992    /// when the flow has not been cancelled.
1993    pub cancel_reason: Option<String>,
1994    /// The `cancellation_policy` value persisted by `cancel_flow`
1995    /// (e.g. `cancel_all`, `cancel_flow_only`). `None` for flows
1996    /// cancelled before this field was persisted, or not yet cancelled.
1997    pub cancellation_policy: Option<String>,
1998    /// Consumer-owned namespaced metadata (e.g. `cairn.task_id`). See
1999    /// the struct-level docs for the routing rule.
2000    pub tags: BTreeMap<String, String>,
2001}
2002
2003impl FlowSnapshot {
2004    /// Construct a [`FlowSnapshot`]. Present so downstream crates
2005    /// (ff-sdk's `describe_flow`) can assemble the struct despite the
2006    /// `#[non_exhaustive]` marker.
2007    #[allow(clippy::too_many_arguments)]
2008    pub fn new(
2009        flow_id: FlowId,
2010        flow_kind: String,
2011        namespace: Namespace,
2012        public_flow_state: String,
2013        graph_revision: u64,
2014        node_count: u32,
2015        edge_count: u32,
2016        created_at: TimestampMs,
2017        last_mutation_at: TimestampMs,
2018        cancelled_at: Option<TimestampMs>,
2019        cancel_reason: Option<String>,
2020        cancellation_policy: Option<String>,
2021        tags: BTreeMap<String, String>,
2022    ) -> Self {
2023        Self {
2024            flow_id,
2025            flow_kind,
2026            namespace,
2027            public_flow_state,
2028            graph_revision,
2029            node_count,
2030            edge_count,
2031            created_at,
2032            last_mutation_at,
2033            cancelled_at,
2034            cancel_reason,
2035            cancellation_policy,
2036            tags,
2037        }
2038    }
2039}
2040
2041// ─── describe_edge / list_*_edges (issue #58.3) ───
2042
2043/// Engine-decoupled read-model for one dependency edge.
2044///
2045/// Returned by `ff_sdk::FlowFabricWorker::describe_edge`,
2046/// `list_incoming_edges`, and `list_outgoing_edges`. Consumers consult
2047/// this struct instead of reaching into Valkey's per-flow `edge:` hash
2048/// directly — the engine is free to rename hash fields or restructure
2049/// key layout under this surface.
2050///
2051/// `#[non_exhaustive]` — FF may add fields in minor releases without a
2052/// semver break. Match with `..` or use [`EdgeSnapshot::new`].
2053///
2054/// # Fields
2055///
2056/// The struct mirrors the immutable edge record written by
2057/// `ff_stage_dependency_edge` (see `lua/flow.lua`). The flow-scoped
2058/// edge hash is only ever written once, at staging time; per-execution
2059/// resolution state lives on a separate `dep:<edge_id>` hash and is not
2060/// surfaced here. The `edge_state` field therefore reflects the
2061/// staging-time literal (currently `pending`), not the downstream
2062/// execution's dep-edge state.
2063#[derive(Clone, Debug, PartialEq, Eq)]
2064#[non_exhaustive]
2065pub struct EdgeSnapshot {
2066    pub edge_id: EdgeId,
2067    pub flow_id: FlowId,
2068    pub upstream_execution_id: ExecutionId,
2069    pub downstream_execution_id: ExecutionId,
2070    /// The `dependency_kind` literal (e.g. `success_only`) from
2071    /// `stage_dependency_edge`. Preserved as-is; FF does not interpret
2072    /// it on reads.
2073    pub dependency_kind: String,
2074    /// The satisfaction-condition literal stamped at staging time
2075    /// (e.g. `all_required`).
2076    pub satisfaction_condition: String,
2077    /// Optional opaque handle to a data-passing artifact. `None` when
2078    /// the stored field is empty (the most common case).
2079    pub data_passing_ref: Option<String>,
2080    /// Edge-state literal on the flow-scoped edge hash. Written once
2081    /// at staging as `pending`; this hash is immutable on the flow
2082    /// side. Per-execution resolution state is tracked separately on
2083    /// the child's `dep:<edge_id>` hash.
2084    pub edge_state: String,
2085    pub created_at: TimestampMs,
2086    /// Origin of the edge (e.g. `engine`). Preserved as-is.
2087    pub created_by: String,
2088}
2089
2090impl EdgeSnapshot {
2091    /// Construct an [`EdgeSnapshot`]. Present so downstream crates
2092    /// (ff-sdk's `describe_edge` / `list_*_edges`) can assemble the
2093    /// struct despite the `#[non_exhaustive]` marker.
2094    #[allow(clippy::too_many_arguments)]
2095    pub fn new(
2096        edge_id: EdgeId,
2097        flow_id: FlowId,
2098        upstream_execution_id: ExecutionId,
2099        downstream_execution_id: ExecutionId,
2100        dependency_kind: String,
2101        satisfaction_condition: String,
2102        data_passing_ref: Option<String>,
2103        edge_state: String,
2104        created_at: TimestampMs,
2105        created_by: String,
2106    ) -> Self {
2107        Self {
2108            edge_id,
2109            flow_id,
2110            upstream_execution_id,
2111            downstream_execution_id,
2112            dependency_kind,
2113            satisfaction_condition,
2114            data_passing_ref,
2115            edge_state,
2116            created_at,
2117            created_by,
2118        }
2119    }
2120}
2121
2122/// Summary of state after a mutation, returned by many functions.
2123#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2124pub struct StateSummary {
2125    pub state_vector: StateVector,
2126    pub current_attempt_index: AttemptIndex,
2127}
2128
2129#[cfg(test)]
2130mod tests {
2131    use super::*;
2132    use crate::types::FlowId;
2133
2134    #[test]
2135    fn create_execution_args_serde() {
2136        let config = crate::partition::PartitionConfig::default();
2137        let args = CreateExecutionArgs {
2138            execution_id: ExecutionId::for_flow(&FlowId::new(), &config),
2139            namespace: Namespace::new("test"),
2140            lane_id: LaneId::new("default"),
2141            execution_kind: "llm_call".to_owned(),
2142            input_payload: b"hello".to_vec(),
2143            payload_encoding: Some("json".to_owned()),
2144            priority: 0,
2145            creator_identity: "test-user".to_owned(),
2146            idempotency_key: None,
2147            tags: HashMap::new(),
2148            policy: None,
2149            delay_until: None,
2150            execution_deadline_at: None,
2151            partition_id: 42,
2152            now: TimestampMs::now(),
2153        };
2154        let json = serde_json::to_string(&args).unwrap();
2155        let parsed: CreateExecutionArgs = serde_json::from_str(&json).unwrap();
2156        assert_eq!(args.execution_id, parsed.execution_id);
2157    }
2158
2159    #[test]
2160    fn claim_result_serde() {
2161        let config = crate::partition::PartitionConfig::default();
2162        let result = ClaimExecutionResult::Claimed(ClaimedExecution {
2163            execution_id: ExecutionId::for_flow(&FlowId::new(), &config),
2164            lease_id: LeaseId::new(),
2165            lease_epoch: LeaseEpoch::new(1),
2166            attempt_index: AttemptIndex::new(0),
2167            attempt_id: AttemptId::new(),
2168            attempt_type: AttemptType::Initial,
2169            lease_expires_at: TimestampMs::from_millis(1000),
2170        });
2171        let json = serde_json::to_string(&result).unwrap();
2172        let parsed: ClaimExecutionResult = serde_json::from_str(&json).unwrap();
2173        assert_eq!(result, parsed);
2174    }
2175
2176    // ── StreamCursor (issue #92) ──
2177
2178    #[test]
2179    fn stream_cursor_display_matches_wire_tokens() {
2180        assert_eq!(StreamCursor::Start.to_string(), "start");
2181        assert_eq!(StreamCursor::End.to_string(), "end");
2182        assert_eq!(StreamCursor::At("123".into()).to_string(), "123");
2183        assert_eq!(StreamCursor::At("123-4".into()).to_string(), "123-4");
2184    }
2185
2186    #[test]
2187    fn stream_cursor_to_wire_maps_to_valkey_markers() {
2188        assert_eq!(StreamCursor::Start.to_wire(), "-");
2189        assert_eq!(StreamCursor::End.to_wire(), "+");
2190        assert_eq!(StreamCursor::At("0-0".into()).to_wire(), "0-0");
2191        assert_eq!(StreamCursor::At("17-3".into()).to_wire(), "17-3");
2192    }
2193
2194    #[test]
2195    fn stream_cursor_from_str_accepts_wire_tokens() {
2196        use std::str::FromStr;
2197        assert_eq!(StreamCursor::from_str("start").unwrap(), StreamCursor::Start);
2198        assert_eq!(StreamCursor::from_str("end").unwrap(), StreamCursor::End);
2199        assert_eq!(
2200            StreamCursor::from_str("123").unwrap(),
2201            StreamCursor::At("123".into())
2202        );
2203        assert_eq!(
2204            StreamCursor::from_str("0-0").unwrap(),
2205            StreamCursor::At("0-0".into())
2206        );
2207        assert_eq!(
2208            StreamCursor::from_str("1713100800150-0").unwrap(),
2209            StreamCursor::At("1713100800150-0".into())
2210        );
2211    }
2212
2213    #[test]
2214    fn stream_cursor_from_str_rejects_bare_markers() {
2215        use std::str::FromStr;
2216        assert!(matches!(
2217            StreamCursor::from_str("-"),
2218            Err(StreamCursorParseError::BareMarkerRejected(s)) if s == "-"
2219        ));
2220        assert!(matches!(
2221            StreamCursor::from_str("+"),
2222            Err(StreamCursorParseError::BareMarkerRejected(s)) if s == "+"
2223        ));
2224    }
2225
2226    #[test]
2227    fn stream_cursor_from_str_rejects_empty() {
2228        use std::str::FromStr;
2229        assert_eq!(
2230            StreamCursor::from_str(""),
2231            Err(StreamCursorParseError::Empty)
2232        );
2233    }
2234
2235    #[test]
2236    fn stream_cursor_from_str_rejects_malformed() {
2237        use std::str::FromStr;
2238        for bad in ["abc", "-1", "1-", "-1-2", "1-2-3", "1.2", "1 2", "Start", "END"] {
2239            assert!(
2240                matches!(
2241                    StreamCursor::from_str(bad),
2242                    Err(StreamCursorParseError::Malformed(_))
2243                ),
2244                "must reject {bad:?}",
2245            );
2246        }
2247    }
2248
2249    #[test]
2250    fn stream_cursor_from_str_rejects_non_ascii() {
2251        use std::str::FromStr;
2252        assert!(matches!(
2253            StreamCursor::from_str("1\u{2013}2"),
2254            Err(StreamCursorParseError::Malformed(_))
2255        ));
2256    }
2257
2258    #[test]
2259    fn stream_cursor_serde_round_trip() {
2260        for c in [
2261            StreamCursor::Start,
2262            StreamCursor::End,
2263            StreamCursor::At("0-0".into()),
2264            StreamCursor::At("1713100800150-0".into()),
2265        ] {
2266            let json = serde_json::to_string(&c).unwrap();
2267            let back: StreamCursor = serde_json::from_str(&json).unwrap();
2268            assert_eq!(back, c);
2269        }
2270    }
2271
2272    #[test]
2273    fn stream_cursor_serializes_as_bare_string() {
2274        assert_eq!(serde_json::to_string(&StreamCursor::Start).unwrap(), r#""start""#);
2275        assert_eq!(serde_json::to_string(&StreamCursor::End).unwrap(), r#""end""#);
2276        assert_eq!(
2277            serde_json::to_string(&StreamCursor::At("123-0".into())).unwrap(),
2278            r#""123-0""#
2279        );
2280    }
2281
2282    #[test]
2283    fn stream_cursor_deserialize_rejects_bare_markers() {
2284        assert!(serde_json::from_str::<StreamCursor>(r#""-""#).is_err());
2285        assert!(serde_json::from_str::<StreamCursor>(r#""+""#).is_err());
2286    }
2287
2288    #[test]
2289    fn stream_cursor_from_beginning_is_zero_zero() {
2290        assert_eq!(
2291            StreamCursor::from_beginning(),
2292            StreamCursor::At("0-0".into())
2293        );
2294    }
2295
2296    #[test]
2297    fn stream_cursor_is_concrete_classifies_variants() {
2298        assert!(!StreamCursor::Start.is_concrete());
2299        assert!(!StreamCursor::End.is_concrete());
2300        assert!(StreamCursor::At("0-0".into()).is_concrete());
2301        assert!(StreamCursor::At("123-0".into()).is_concrete());
2302        assert!(StreamCursor::from_beginning().is_concrete());
2303    }
2304
2305    #[test]
2306    fn stream_cursor_into_wire_string_moves_without_cloning() {
2307        assert_eq!(StreamCursor::Start.into_wire_string(), "-");
2308        assert_eq!(StreamCursor::End.into_wire_string(), "+");
2309        assert_eq!(
2310            StreamCursor::At("17-3".into()).into_wire_string(),
2311            "17-3"
2312        );
2313    }
2314}
2315
2316// ─── list_executions ───
2317
2318/// Summary of an execution for list views.
2319#[derive(Clone, Debug, Serialize, Deserialize)]
2320pub struct ExecutionSummary {
2321    pub execution_id: ExecutionId,
2322    pub namespace: String,
2323    pub lane_id: String,
2324    pub execution_kind: String,
2325    pub public_state: String,
2326    pub priority: i32,
2327    pub created_at: String,
2328}
2329
2330/// Result of a list_executions query.
2331#[derive(Clone, Debug, Serialize, Deserialize)]
2332pub struct ListExecutionsResult {
2333    pub executions: Vec<ExecutionSummary>,
2334    pub total_returned: usize,
2335}
2336
2337// ─── rotate_waitpoint_hmac_secret ───
2338
2339/// Args for `ff_rotate_waitpoint_hmac_secret`. Rotates the HMAC signing
2340/// kid on ONE partition. Callers fan out across every partition themselves
2341/// (ff-server does the parallel fan-out in `rotate_waitpoint_secret`;
2342/// direct-Valkey consumers mirror the pattern).
2343///
2344/// "now" is derived server-side from `redis.call("TIME")` inside the FCALL
2345/// (consistency with `validate_waitpoint_token` and flow scanners).
2346/// `grace_ms` is a duration, not a clock value, so carrying it from the
2347/// caller is safe.
2348#[derive(Clone, Debug)]
2349pub struct RotateWaitpointHmacSecretArgs {
2350    pub new_kid: String,
2351    pub new_secret_hex: String,
2352    /// Grace window in ms. Must be non-negative. Tokens signed by the
2353    /// outgoing kid remain valid for `grace_ms` after this rotation.
2354    pub grace_ms: u64,
2355}
2356
2357/// Outcome of a single-partition rotation.
2358#[derive(Clone, Debug, PartialEq, Eq)]
2359pub enum RotateWaitpointHmacSecretOutcome {
2360    /// Installed the new kid. `previous_kid` is `None` on bootstrap
2361    /// (no prior `current_kid`). `gc_count` counts expired kids reaped
2362    /// during this rotation.
2363    Rotated {
2364        previous_kid: Option<String>,
2365        new_kid: String,
2366        gc_count: u32,
2367    },
2368    /// Exact replay — same kid + same secret already installed. Safe
2369    /// operator retry; no state change.
2370    Noop { kid: String },
2371}
2372
2373// ─── list_waitpoint_hmac_kids ───
2374
2375#[derive(Clone, Debug, PartialEq, Eq)]
2376pub struct ListWaitpointHmacKidsArgs {}
2377
2378/// Snapshot of the waitpoint HMAC keystore on ONE partition.
2379#[derive(Clone, Debug, PartialEq, Eq)]
2380pub struct WaitpointHmacKids {
2381    /// The currently-signing kid. `None` if uninitialized.
2382    pub current_kid: Option<String>,
2383    /// Kids that still validate existing tokens but no longer sign
2384    /// new ones. Order is Lua HGETALL traversal order — callers that
2385    /// need a stable sort should sort by `expires_at_ms`.
2386    pub verifying: Vec<VerifyingKid>,
2387}
2388
2389#[derive(Clone, Debug, PartialEq, Eq)]
2390pub struct VerifyingKid {
2391    pub kid: String,
2392    pub expires_at_ms: i64,
2393}