ff_core/contracts/mod.rs
1//! Phase 1 function contracts — Args + Result types for each FCALL.
2//!
3//! Each Args struct defines the typed inputs to a Valkey Function.
4//! Each Result enum defines the possible outcomes (success variants + error codes).
5
6pub mod decode;
7
8use crate::policy::ExecutionPolicy;
9use crate::state::{AttemptType, PublicState, StateVector};
10use crate::types::{
11 AttemptId, AttemptIndex, CancelSource, EdgeId, ExecutionId, FlowId, LaneId, LeaseEpoch,
12 LeaseFence, LeaseId, Namespace, SignalId, SuspensionId, TimestampMs, WaitpointId,
13 WaitpointToken, WorkerId, WorkerInstanceId,
14};
15use serde::{Deserialize, Serialize};
16use std::collections::{BTreeMap, BTreeSet, HashMap};
17
18// ─── create_execution ───
19
20#[derive(Clone, Debug, Serialize, Deserialize)]
21pub struct CreateExecutionArgs {
22 pub execution_id: ExecutionId,
23 pub namespace: Namespace,
24 pub lane_id: LaneId,
25 pub execution_kind: String,
26 pub input_payload: Vec<u8>,
27 #[serde(default)]
28 pub payload_encoding: Option<String>,
29 pub priority: i32,
30 pub creator_identity: String,
31 #[serde(default)]
32 pub idempotency_key: Option<String>,
33 #[serde(default)]
34 pub tags: HashMap<String, String>,
35 /// Execution policy (retry, timeout, suspension, routing, etc.).
36 #[serde(default)]
37 pub policy: Option<ExecutionPolicy>,
38 /// If set and in the future, execution starts delayed.
39 #[serde(default)]
40 pub delay_until: Option<TimestampMs>,
41 /// Absolute deadline timestamp (ms). Execution expires if not complete by this time.
42 #[serde(default)]
43 pub execution_deadline_at: Option<TimestampMs>,
44 /// Partition ID (pre-computed).
45 pub partition_id: u16,
46 pub now: TimestampMs,
47}
48
49#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
50pub enum CreateExecutionResult {
51 /// Execution created successfully.
52 Created {
53 execution_id: ExecutionId,
54 public_state: PublicState,
55 },
56 /// Idempotent duplicate — existing execution returned.
57 Duplicate { execution_id: ExecutionId },
58}
59
60// ─── issue_claim_grant ───
61
62#[derive(Clone, Debug, Serialize, Deserialize)]
63pub struct IssueClaimGrantArgs {
64 pub execution_id: ExecutionId,
65 pub lane_id: LaneId,
66 pub worker_id: WorkerId,
67 pub worker_instance_id: WorkerInstanceId,
68 #[serde(default)]
69 pub capability_hash: Option<String>,
70 #[serde(default)]
71 pub route_snapshot_json: Option<String>,
72 #[serde(default)]
73 pub admission_summary: Option<String>,
74 /// Capabilities this worker advertises. Serialized as a sorted,
75 /// comma-separated string to the Lua FCALL (see scheduling.lua
76 /// ff_issue_claim_grant). An empty set matches only executions whose
77 /// `required_capabilities` is also empty.
78 #[serde(default)]
79 pub worker_capabilities: BTreeSet<String>,
80 pub grant_ttl_ms: u64,
81 /// Caller-side timestamp for bookkeeping. NOT passed to the Lua FCALL —
82 /// ff_issue_claim_grant uses `redis.call("TIME")` for grant_expires_at.
83 pub now: TimestampMs,
84}
85
86#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
87pub enum IssueClaimGrantResult {
88 /// Grant issued.
89 Granted { execution_id: ExecutionId },
90}
91
92/// A claim grant issued by the scheduler for a specific execution.
93///
94/// The worker uses this to call `ff_claim_execution` (or
95/// `ff_acquire_lease`), which atomically consumes the grant and
96/// creates the lease.
97///
98/// Shared wire-level type between `ff-scheduler` (issuer) and
99/// `ff-sdk` (consumer, via `FlowFabricWorker::claim_from_grant`).
100/// Lives in `ff-core` so neither crate needs a dep on the other.
101///
102/// **Lane asymmetry with [`ReclaimGrant`]:** `ClaimGrant` does NOT
103/// carry `lane_id`. The issuing scheduler's caller already picked
104/// a lane (that's how admission reached this grant) and passes it
105/// through to `claim_from_grant` as a separate argument. The grant
106/// handle stays narrow to what uniquely identifies the admission
107/// decision. The matching field on [`ReclaimGrant`] is an
108/// intentional divergence — see the note on that type.
109#[derive(Clone, Debug, PartialEq, Eq)]
110pub struct ClaimGrant {
111 /// The execution that was granted.
112 pub execution_id: ExecutionId,
113 /// Opaque partition handle for this execution's hash-tag slot.
114 ///
115 /// Public wire type: consumers pass it back to FlowFabric but
116 /// must not parse the interior hash tag for routing decisions.
117 /// Internal consumers that need the typed
118 /// [`crate::partition::Partition`] call [`Self::partition`].
119 pub partition_key: crate::partition::PartitionKey,
120 /// The Valkey key holding the grant hash (for the worker to
121 /// reference).
122 pub grant_key: String,
123 /// When the grant expires if not consumed.
124 pub expires_at_ms: u64,
125}
126
127impl ClaimGrant {
128 /// Parse `partition_key` into a typed
129 /// [`crate::partition::Partition`]. Intended for internal
130 /// consumers (scheduler emitter, SDK worker claim path) that
131 /// need the family/index pair. Fails only on malformed keys
132 /// (which indicates a producer bug).
133 ///
134 /// Alias collapse applies: a grant issued against `Execution`
135 /// family round-trips to `Flow` (see [`crate::partition::PartitionKey`]
136 /// for the rationale — routing is preserved, only the metadata
137 /// family label normalises).
138 pub fn partition(
139 &self,
140 ) -> Result<crate::partition::Partition, crate::partition::PartitionKeyParseError> {
141 self.partition_key.parse()
142 }
143}
144
145/// A reclaim grant issued for a resumed (attempt_interrupted) execution.
146///
147/// Issued by a producer (typically `ff-scheduler` once a Batch-C
148/// reclaim scanner is in place; test fixtures in the interim — no
149/// production Rust caller exists in-tree today). Consumed by
150/// [`FlowFabricWorker::claim_from_reclaim_grant`], which calls
151/// `ff_claim_resumed_execution` atomically: that FCALL validates the
152/// grant, consumes it, and transitions `attempt_interrupted` →
153/// `started` while preserving the existing `attempt_index` +
154/// `attempt_id` (a resumed execution re-uses its attempt; it does
155/// not start a new one).
156///
157/// Mirrors [`ClaimGrant`] for the resume path. Differences:
158///
159/// * [`ClaimGrant`] is issued against a freshly-eligible
160/// execution and `ff_claim_execution` creates a new attempt.
161/// * [`ReclaimGrant`] is issued against an `attempt_interrupted`
162/// execution; `ff_claim_resumed_execution` re-uses the existing
163/// attempt and bumps the lease epoch.
164///
165/// The grant itself is written to the same `claim_grant` Valkey key
166/// that [`ClaimGrant`] uses; the distinction is which Lua FCALL
167/// consumes it (`ff_claim_execution` for new attempts,
168/// `ff_claim_resumed_execution` for resumes).
169///
170/// **Lane asymmetry with [`ClaimGrant`]:** `ReclaimGrant` CARRIES
171/// `lane_id` as a field. The issuing path already knows the lane
172/// (it's read from `exec_core` at grant time); carrying it here
173/// spares the consumer a `HGET exec_core lane_id` round trip on
174/// the hot claim path. The asymmetry is intentional — prefer
175/// one-fewer-HGET on a type that already lives with the resumer's
176/// lifecycle over strict handle symmetry with `ClaimGrant`.
177///
178/// Shared wire-level type between the eventual `ff-scheduler`
179/// producer (Batch-C reclaim scanner — not yet in-tree; test
180/// fixtures construct this type today) and `ff-sdk` (consumer, via
181/// `FlowFabricWorker::claim_from_reclaim_grant`). Lives in
182/// `ff-core` so neither crate needs a dep on the other.
183///
184/// [`FlowFabricWorker::claim_from_reclaim_grant`]: https://docs.rs/ff-sdk
185#[derive(Clone, Debug, PartialEq, Eq)]
186pub struct ReclaimGrant {
187 /// The execution granted for resumption.
188 pub execution_id: ExecutionId,
189 /// Opaque partition handle for this execution's hash-tag slot.
190 ///
191 /// Same wire-opacity contract as [`ClaimGrant::partition_key`].
192 /// Internal consumers call [`Self::partition`] for the parsed
193 /// form.
194 pub partition_key: crate::partition::PartitionKey,
195 /// Valkey key of the grant hash — same key shape as
196 /// [`ClaimGrant`].
197 pub grant_key: String,
198 /// Monotonic ms when the grant expires; unconsumed grants
199 /// vanish.
200 pub expires_at_ms: u64,
201 /// Lane the execution belongs to. Needed by
202 /// `ff_claim_resumed_execution` for `KEYS[3]` (eligible_zset)
203 /// and `KEYS[9]` (active_index).
204 pub lane_id: LaneId,
205}
206
207impl ReclaimGrant {
208 /// Parse `partition_key` into a typed
209 /// [`crate::partition::Partition`]. See [`ClaimGrant::partition`]
210 /// for the alias-collapse note.
211 pub fn partition(
212 &self,
213 ) -> Result<crate::partition::Partition, crate::partition::PartitionKeyParseError> {
214 self.partition_key.parse()
215 }
216}
217
218// ─── claim_execution ───
219
220#[derive(Clone, Debug, Serialize, Deserialize)]
221pub struct ClaimExecutionArgs {
222 pub execution_id: ExecutionId,
223 pub worker_id: WorkerId,
224 pub worker_instance_id: WorkerInstanceId,
225 pub lane_id: LaneId,
226 pub lease_id: LeaseId,
227 pub lease_ttl_ms: u64,
228 pub attempt_id: AttemptId,
229 /// Expected attempt index (pre-read from exec_core.total_attempt_count).
230 /// Used for KEYS construction — must match what the Lua computes.
231 pub expected_attempt_index: AttemptIndex,
232 /// JSON-encoded attempt policy snapshot.
233 #[serde(default)]
234 pub attempt_policy_json: String,
235 /// Per-attempt timeout in ms.
236 #[serde(default)]
237 pub attempt_timeout_ms: Option<u64>,
238 /// Total execution deadline (absolute timestamp ms).
239 #[serde(default)]
240 pub execution_deadline_at: Option<i64>,
241 pub now: TimestampMs,
242}
243
244#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
245pub struct ClaimedExecution {
246 pub execution_id: ExecutionId,
247 pub lease_id: LeaseId,
248 pub lease_epoch: LeaseEpoch,
249 pub attempt_index: AttemptIndex,
250 pub attempt_id: AttemptId,
251 pub attempt_type: AttemptType,
252 pub lease_expires_at: TimestampMs,
253}
254
255#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
256pub enum ClaimExecutionResult {
257 /// Successfully claimed.
258 Claimed(ClaimedExecution),
259}
260
261// ─── complete_execution ───
262
263#[derive(Clone, Debug, Serialize, Deserialize)]
264pub struct CompleteExecutionArgs {
265 pub execution_id: ExecutionId,
266 /// RFC #58.5 — fence triple. `Some` for SDK worker paths (standard
267 /// stale-lease fence). `None` for operator overrides, in which case
268 /// `source` must be `CancelSource::OperatorOverride` or the Lua
269 /// returns `fence_required`.
270 #[serde(default)]
271 pub fence: Option<LeaseFence>,
272 pub attempt_index: AttemptIndex,
273 #[serde(default)]
274 pub result_payload: Option<Vec<u8>>,
275 #[serde(default)]
276 pub result_encoding: Option<String>,
277 /// RFC #58.5 — unfenced-call gate. Ignored when `fence` is `Some`.
278 #[serde(default)]
279 pub source: CancelSource,
280 pub now: TimestampMs,
281}
282
283#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
284pub enum CompleteExecutionResult {
285 /// Execution completed successfully.
286 Completed {
287 execution_id: ExecutionId,
288 public_state: PublicState,
289 },
290}
291
292// ─── renew_lease ───
293
294#[derive(Clone, Debug, Serialize, Deserialize)]
295pub struct RenewLeaseArgs {
296 pub execution_id: ExecutionId,
297 pub attempt_index: AttemptIndex,
298 /// RFC #58.5 — fence triple. Required (no operator override path for
299 /// renew). `None` returns `fence_required`.
300 pub fence: Option<LeaseFence>,
301 /// How long to extend the lease (milliseconds).
302 pub lease_ttl_ms: u64,
303 /// Grace period after lease_expires_at before the lease_current key is auto-deleted.
304 #[serde(default = "default_lease_history_grace_ms")]
305 pub lease_history_grace_ms: u64,
306}
307
308fn default_lease_history_grace_ms() -> u64 {
309 60_000
310}
311
312#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
313pub enum RenewLeaseResult {
314 /// Lease renewed.
315 Renewed { expires_at: TimestampMs },
316}
317
318// ─── mark_lease_expired_if_due ───
319
320#[derive(Clone, Debug, Serialize, Deserialize)]
321pub struct MarkLeaseExpiredArgs {
322 pub execution_id: ExecutionId,
323}
324
325#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
326pub enum MarkLeaseExpiredResult {
327 /// Lease was marked as expired.
328 MarkedExpired,
329 /// No action needed (already expired, not yet due, not active, etc.).
330 AlreadySatisfied { reason: String },
331}
332
333// ─── cancel_execution ───
334
335#[derive(Clone, Debug, Serialize, Deserialize)]
336pub struct CancelExecutionArgs {
337 pub execution_id: ExecutionId,
338 pub reason: String,
339 #[serde(default)]
340 pub source: CancelSource,
341 /// Required if not operator_override and execution is active.
342 #[serde(default)]
343 pub lease_id: Option<LeaseId>,
344 #[serde(default)]
345 pub lease_epoch: Option<LeaseEpoch>,
346 /// Required if not operator_override and execution is active.
347 #[serde(default)]
348 pub attempt_id: Option<AttemptId>,
349 pub now: TimestampMs,
350}
351
352#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
353pub enum CancelExecutionResult {
354 /// Execution cancelled.
355 Cancelled {
356 execution_id: ExecutionId,
357 public_state: PublicState,
358 },
359}
360
361// ─── revoke_lease ───
362
363#[derive(Clone, Debug, Serialize, Deserialize)]
364pub struct RevokeLeaseArgs {
365 pub execution_id: ExecutionId,
366 /// If set, only revoke if this matches the current lease. Empty string skips check.
367 #[serde(default)]
368 pub expected_lease_id: Option<String>,
369 /// Worker instance whose lease set to clean up. Read from exec_core before calling.
370 pub worker_instance_id: WorkerInstanceId,
371 pub reason: String,
372}
373
374#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
375pub enum RevokeLeaseResult {
376 /// Lease revoked.
377 Revoked {
378 lease_id: String,
379 lease_epoch: String,
380 },
381 /// Already revoked or expired — no action needed.
382 AlreadySatisfied { reason: String },
383}
384
385// ─── delay_execution ───
386
387#[derive(Clone, Debug, Serialize, Deserialize)]
388pub struct DelayExecutionArgs {
389 pub execution_id: ExecutionId,
390 /// RFC #58.5 — fence triple. `None` requires `source ==
391 /// CancelSource::OperatorOverride`.
392 #[serde(default)]
393 pub fence: Option<LeaseFence>,
394 pub attempt_index: AttemptIndex,
395 pub delay_until: TimestampMs,
396 #[serde(default)]
397 pub source: CancelSource,
398 pub now: TimestampMs,
399}
400
401#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
402pub enum DelayExecutionResult {
403 /// Execution delayed.
404 Delayed {
405 execution_id: ExecutionId,
406 public_state: PublicState,
407 },
408}
409
410// ─── move_to_waiting_children ───
411
412#[derive(Clone, Debug, Serialize, Deserialize)]
413pub struct MoveToWaitingChildrenArgs {
414 pub execution_id: ExecutionId,
415 /// RFC #58.5 — fence triple. `None` requires `source ==
416 /// CancelSource::OperatorOverride`.
417 #[serde(default)]
418 pub fence: Option<LeaseFence>,
419 pub attempt_index: AttemptIndex,
420 #[serde(default)]
421 pub source: CancelSource,
422 pub now: TimestampMs,
423}
424
425#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
426pub enum MoveToWaitingChildrenResult {
427 /// Moved to waiting children.
428 Moved {
429 execution_id: ExecutionId,
430 public_state: PublicState,
431 },
432}
433
434// ─── change_priority ───
435
436#[derive(Clone, Debug, Serialize, Deserialize)]
437pub struct ChangePriorityArgs {
438 pub execution_id: ExecutionId,
439 pub new_priority: i32,
440 pub lane_id: LaneId,
441 pub now: TimestampMs,
442}
443
444#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
445pub enum ChangePriorityResult {
446 /// Priority changed and re-scored.
447 Changed { execution_id: ExecutionId },
448}
449
450// ─── update_progress ───
451
452#[derive(Clone, Debug, Serialize, Deserialize)]
453pub struct UpdateProgressArgs {
454 pub execution_id: ExecutionId,
455 pub lease_id: LeaseId,
456 pub lease_epoch: LeaseEpoch,
457 pub attempt_id: AttemptId,
458 #[serde(default)]
459 pub progress_pct: Option<u8>,
460 #[serde(default)]
461 pub progress_message: Option<String>,
462 pub now: TimestampMs,
463}
464
465#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
466pub enum UpdateProgressResult {
467 /// Progress updated.
468 Updated,
469}
470
471// ═══════════════════════════════════════════════════════════════════════
472// Phase 2 contracts: fail, reclaim, expire
473// ═══════════════════════════════════════════════════════════════════════
474
475// ─── fail_execution ───
476
477#[derive(Clone, Debug, Serialize, Deserialize)]
478pub struct FailExecutionArgs {
479 pub execution_id: ExecutionId,
480 /// RFC #58.5 — fence triple. `None` requires `source ==
481 /// CancelSource::OperatorOverride`.
482 #[serde(default)]
483 pub fence: Option<LeaseFence>,
484 pub attempt_index: AttemptIndex,
485 pub failure_reason: String,
486 pub failure_category: String,
487 /// JSON-encoded retry policy (from execution policy). Empty = no retries.
488 #[serde(default)]
489 pub retry_policy_json: String,
490 /// JSON-encoded attempt policy for the next retry attempt.
491 #[serde(default)]
492 pub next_attempt_policy_json: String,
493 #[serde(default)]
494 pub source: CancelSource,
495}
496
497/// Outcome of a fail_execution call.
498#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
499pub enum FailExecutionResult {
500 /// Retry was scheduled — execution is delayed with backoff.
501 RetryScheduled {
502 delay_until: TimestampMs,
503 next_attempt_index: AttemptIndex,
504 },
505 /// No retries left — execution is terminal failed.
506 TerminalFailed,
507}
508
509// ─── issue_reclaim_grant ───
510
511#[derive(Clone, Debug, Serialize, Deserialize)]
512pub struct IssueReclaimGrantArgs {
513 pub execution_id: ExecutionId,
514 pub worker_id: WorkerId,
515 pub worker_instance_id: WorkerInstanceId,
516 pub lane_id: LaneId,
517 #[serde(default)]
518 pub capability_hash: Option<String>,
519 pub grant_ttl_ms: u64,
520 #[serde(default)]
521 pub route_snapshot_json: Option<String>,
522 #[serde(default)]
523 pub admission_summary: Option<String>,
524 /// Caller-side timestamp for bookkeeping. NOT passed to the Lua FCALL —
525 /// ff_issue_reclaim_grant uses `redis.call("TIME")` for grant_expires_at
526 /// (same as ff_issue_claim_grant). Kept for contract symmetry with
527 /// IssueClaimGrantArgs and scheduler audit logging.
528 pub now: TimestampMs,
529}
530
531#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
532pub enum IssueReclaimGrantResult {
533 /// Reclaim grant issued.
534 Granted { expires_at_ms: TimestampMs },
535}
536
537// ─── reclaim_execution ───
538
539#[derive(Clone, Debug, Serialize, Deserialize)]
540pub struct ReclaimExecutionArgs {
541 pub execution_id: ExecutionId,
542 pub worker_id: WorkerId,
543 pub worker_instance_id: WorkerInstanceId,
544 pub lane_id: LaneId,
545 #[serde(default)]
546 pub capability_hash: Option<String>,
547 pub lease_id: LeaseId,
548 pub lease_ttl_ms: u64,
549 pub attempt_id: AttemptId,
550 /// JSON-encoded attempt policy for the reclaim attempt.
551 #[serde(default)]
552 pub attempt_policy_json: String,
553 /// Maximum reclaim count before terminal failure. Default: 100.
554 #[serde(default = "default_max_reclaim_count")]
555 pub max_reclaim_count: u32,
556 /// Old worker instance (for old_worker_leases key construction).
557 pub old_worker_instance_id: WorkerInstanceId,
558 /// Current attempt index (for old_attempt/old_stream_meta key construction).
559 pub current_attempt_index: AttemptIndex,
560}
561
562fn default_max_reclaim_count() -> u32 {
563 100
564}
565
566#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
567pub enum ReclaimExecutionResult {
568 /// Execution reclaimed — new attempt + new lease.
569 Reclaimed {
570 new_attempt_index: AttemptIndex,
571 new_attempt_id: AttemptId,
572 new_lease_id: LeaseId,
573 new_lease_epoch: LeaseEpoch,
574 lease_expires_at: TimestampMs,
575 },
576 /// Max reclaims exceeded — execution moved to terminal.
577 MaxReclaimsExceeded,
578}
579
580// ─── expire_execution ───
581
582#[derive(Clone, Debug, Serialize, Deserialize)]
583pub struct ExpireExecutionArgs {
584 pub execution_id: ExecutionId,
585 /// "attempt_timeout" or "execution_deadline"
586 pub expire_reason: String,
587}
588
589#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
590pub enum ExpireExecutionResult {
591 /// Execution expired.
592 Expired { execution_id: ExecutionId },
593 /// Already terminal — no-op.
594 AlreadyTerminal,
595}
596
597// ═══════════════════════════════════════════════════════════════════════
598// Phase 3 contracts: suspend, signal, resume, waitpoint
599// ═══════════════════════════════════════════════════════════════════════
600
601// ─── suspend_execution ───
602
603#[derive(Clone, Debug, Serialize, Deserialize)]
604pub struct SuspendExecutionArgs {
605 pub execution_id: ExecutionId,
606 /// RFC #58.5 — fence triple. Required (no operator override path for
607 /// suspend). `None` returns `fence_required`.
608 pub fence: Option<LeaseFence>,
609 pub attempt_index: AttemptIndex,
610 pub suspension_id: SuspensionId,
611 pub waitpoint_id: WaitpointId,
612 pub waitpoint_key: String,
613 pub reason_code: String,
614 pub requested_by: String,
615 pub resume_condition_json: String,
616 pub resume_policy_json: String,
617 #[serde(default)]
618 pub continuation_metadata_pointer: Option<String>,
619 #[serde(default)]
620 pub timeout_at: Option<TimestampMs>,
621 /// true to activate a pending waitpoint, false to create new.
622 #[serde(default)]
623 pub use_pending_waitpoint: bool,
624 /// Timeout behavior: "fail", "cancel", "expire", "auto_resume", "escalate".
625 #[serde(default = "default_timeout_behavior")]
626 pub timeout_behavior: String,
627}
628
629fn default_timeout_behavior() -> String {
630 "fail".to_owned()
631}
632
633#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
634pub enum SuspendExecutionResult {
635 /// Execution suspended, waitpoint active.
636 Suspended {
637 suspension_id: SuspensionId,
638 waitpoint_id: WaitpointId,
639 waitpoint_key: String,
640 /// HMAC-SHA1 token bound to (waitpoint_id, waitpoint_key, created_at).
641 /// Required by signal-delivery callers to authenticate against this
642 /// waitpoint (RFC-004 §Waitpoint Security).
643 waitpoint_token: WaitpointToken,
644 },
645 /// Buffered signals already satisfied the condition — suspension skipped.
646 /// Lease is still held. Token comes from the pending waitpoint record.
647 AlreadySatisfied {
648 suspension_id: SuspensionId,
649 waitpoint_id: WaitpointId,
650 waitpoint_key: String,
651 waitpoint_token: WaitpointToken,
652 },
653}
654
655// ─── resume_execution ───
656
657#[derive(Clone, Debug, Serialize, Deserialize)]
658pub struct ResumeExecutionArgs {
659 pub execution_id: ExecutionId,
660 /// "signal", "operator", "auto_resume"
661 #[serde(default = "default_trigger_type")]
662 pub trigger_type: String,
663 /// Optional delay before becoming eligible (ms).
664 #[serde(default)]
665 pub resume_delay_ms: u64,
666}
667
668fn default_trigger_type() -> String {
669 "signal".to_owned()
670}
671
672#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
673pub enum ResumeExecutionResult {
674 /// Execution resumed to runnable.
675 Resumed { public_state: PublicState },
676}
677
678// ─── create_pending_waitpoint ───
679
680#[derive(Clone, Debug, Serialize, Deserialize)]
681pub struct CreatePendingWaitpointArgs {
682 pub execution_id: ExecutionId,
683 pub lease_id: LeaseId,
684 pub lease_epoch: LeaseEpoch,
685 pub attempt_index: AttemptIndex,
686 pub attempt_id: AttemptId,
687 pub waitpoint_id: WaitpointId,
688 pub waitpoint_key: String,
689 /// Short expiry for the pending waitpoint (ms).
690 pub expires_in_ms: u64,
691}
692
693#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
694pub enum CreatePendingWaitpointResult {
695 /// Pending waitpoint created.
696 Created {
697 waitpoint_id: WaitpointId,
698 waitpoint_key: String,
699 /// HMAC-SHA1 token bound to the pending waitpoint. Required for
700 /// `buffer_signal_for_pending_waitpoint` and carried forward when
701 /// the waitpoint is activated by `suspend_execution`.
702 waitpoint_token: WaitpointToken,
703 },
704}
705
706// ─── close_waitpoint ───
707
708#[derive(Clone, Debug, Serialize, Deserialize)]
709pub struct CloseWaitpointArgs {
710 pub waitpoint_id: WaitpointId,
711 pub reason: String,
712}
713
714#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
715pub enum CloseWaitpointResult {
716 /// Waitpoint closed.
717 Closed,
718}
719
720// ─── deliver_signal ───
721
722#[derive(Clone, Debug, Serialize, Deserialize)]
723pub struct DeliverSignalArgs {
724 pub execution_id: ExecutionId,
725 pub waitpoint_id: WaitpointId,
726 pub signal_id: SignalId,
727 pub signal_name: String,
728 pub signal_category: String,
729 pub source_type: String,
730 pub source_identity: String,
731 #[serde(default)]
732 pub payload: Option<Vec<u8>>,
733 #[serde(default)]
734 pub payload_encoding: Option<String>,
735 #[serde(default)]
736 pub correlation_id: Option<String>,
737 #[serde(default)]
738 pub idempotency_key: Option<String>,
739 pub target_scope: String,
740 #[serde(default)]
741 pub created_at: Option<TimestampMs>,
742 /// Dedup TTL for idempotency key (ms).
743 #[serde(default)]
744 pub dedup_ttl_ms: Option<u64>,
745 /// Resume delay after signal satisfaction (ms).
746 #[serde(default)]
747 pub resume_delay_ms: Option<u64>,
748 /// Max signals per execution (default 10000).
749 #[serde(default)]
750 pub max_signals_per_execution: Option<u64>,
751 /// MAXLEN for the waitpoint signal stream.
752 #[serde(default)]
753 pub signal_maxlen: Option<u64>,
754 /// HMAC-SHA1 token issued when the waitpoint was created. Required for
755 /// signal delivery; missing/tampered/rotated-past-grace tokens are
756 /// rejected with `invalid_token` or `token_expired` (RFC-004).
757 ///
758 /// Defense-in-depth: `WaitpointToken` is a transparent string newtype,
759 /// so an empty string deserializes successfully from JSON. The
760 /// validation boundary is in Lua (`validate_waitpoint_token` returns
761 /// `missing_token` on empty input); this type intentionally does NOT
762 /// pre-reject at the Rust layer so callers get a consistent typed
763 /// error regardless of how they constructed the args.
764 pub waitpoint_token: WaitpointToken,
765 pub now: TimestampMs,
766}
767
768#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
769pub enum DeliverSignalResult {
770 /// Signal accepted with the given effect.
771 Accepted { signal_id: SignalId, effect: String },
772 /// Duplicate signal (idempotency key matched).
773 Duplicate { existing_signal_id: SignalId },
774}
775
776// ─── buffer_signal_for_pending_waitpoint ───
777
778#[derive(Clone, Debug, Serialize, Deserialize)]
779pub struct BufferSignalArgs {
780 pub execution_id: ExecutionId,
781 pub waitpoint_id: WaitpointId,
782 pub signal_id: SignalId,
783 pub signal_name: String,
784 pub signal_category: String,
785 pub source_type: String,
786 pub source_identity: String,
787 #[serde(default)]
788 pub payload: Option<Vec<u8>>,
789 #[serde(default)]
790 pub payload_encoding: Option<String>,
791 #[serde(default)]
792 pub idempotency_key: Option<String>,
793 pub target_scope: String,
794 /// HMAC-SHA1 token issued when `create_pending_waitpoint` ran. Required
795 /// to authenticate early signals targeting the pending waitpoint.
796 pub waitpoint_token: WaitpointToken,
797 pub now: TimestampMs,
798}
799
800#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
801pub enum BufferSignalResult {
802 /// Signal buffered for pending waitpoint.
803 Buffered { signal_id: SignalId },
804 /// Duplicate signal.
805 Duplicate { existing_signal_id: SignalId },
806}
807
808// ─── list_pending_waitpoints ───
809
810/// One entry in the read-only view of an execution's active waitpoints.
811///
812/// Returned by `Server::list_pending_waitpoints` (and the
813/// `GET /v1/executions/{id}/pending-waitpoints` REST endpoint). The
814/// `waitpoint_token` is the same HMAC-SHA1 credential a suspending worker
815/// receives in `SuspendOutcome::Suspended` — a reviewer that needs to
816/// deliver a signal against this waitpoint must present it in
817/// `DeliverSignalArgs::waitpoint_token`.
818///
819/// Exposing the token here is a deliberate API gap closure: a
820/// human-in-the-loop reviewer has no other path to the token, since only
821/// the suspending worker sees the `SuspendOutcome`. Access is gated by
822/// the same bearer-auth middleware as every other REST endpoint.
823#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
824pub struct PendingWaitpointInfo {
825 pub waitpoint_id: WaitpointId,
826 pub waitpoint_key: String,
827 /// Current waitpoint state: `pending`, `active`, `closed`. Callers
828 /// typically filter to `pending` or `active`.
829 pub state: String,
830 /// HMAC-SHA1 token minted at create time; required by
831 /// `ff_deliver_signal` and `ff_buffer_signal_for_pending_waitpoint`.
832 pub waitpoint_token: WaitpointToken,
833 /// Signal names the resume condition is waiting for. Reviewers that
834 /// need to drive a specific waitpoint — particularly when multiple
835 /// concurrent waitpoints exist on one execution — filter on this to
836 /// pick the right target.
837 ///
838 /// An EMPTY vec means the condition matches any signal (wildcard, per
839 /// `lua/helpers.lua` `initialize_condition`). Callers must not infer
840 /// "no waitpoint" from empty; check `state` / length of the outer
841 /// list for that.
842 #[serde(default)]
843 pub required_signal_names: Vec<String>,
844 /// Timestamp when the waitpoint record was first written.
845 pub created_at: TimestampMs,
846 /// Timestamp when the waitpoint was activated (suspension landed).
847 /// `None` while the waitpoint is still `pending`.
848 #[serde(default, skip_serializing_if = "Option::is_none")]
849 pub activated_at: Option<TimestampMs>,
850 /// Scheduled expiration timestamp. `None` if no timeout configured.
851 #[serde(default, skip_serializing_if = "Option::is_none")]
852 pub expires_at: Option<TimestampMs>,
853}
854
855// ─── expire_suspension ───
856
857#[derive(Clone, Debug, Serialize, Deserialize)]
858pub struct ExpireSuspensionArgs {
859 pub execution_id: ExecutionId,
860}
861
862#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
863pub enum ExpireSuspensionResult {
864 /// Suspension expired with the given behavior applied.
865 Expired { behavior_applied: String },
866 /// Already resolved — no action needed.
867 AlreadySatisfied { reason: String },
868}
869
870// ─── claim_resumed_execution ───
871
872#[derive(Clone, Debug, Serialize, Deserialize)]
873pub struct ClaimResumedExecutionArgs {
874 pub execution_id: ExecutionId,
875 pub worker_id: WorkerId,
876 pub worker_instance_id: WorkerInstanceId,
877 pub lane_id: LaneId,
878 pub lease_id: LeaseId,
879 pub lease_ttl_ms: u64,
880 /// Current attempt index (for KEYS construction — from exec_core).
881 pub current_attempt_index: AttemptIndex,
882 /// Remaining attempt timeout from before suspension (ms). 0 = no timeout.
883 #[serde(default)]
884 pub remaining_attempt_timeout_ms: Option<u64>,
885 pub now: TimestampMs,
886}
887
888#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
889pub struct ClaimedResumedExecution {
890 pub execution_id: ExecutionId,
891 pub lease_id: LeaseId,
892 pub lease_epoch: LeaseEpoch,
893 pub attempt_index: AttemptIndex,
894 pub attempt_id: AttemptId,
895 pub lease_expires_at: TimestampMs,
896}
897
898#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
899pub enum ClaimResumedExecutionResult {
900 /// Successfully claimed resumed execution (same attempt continues).
901 Claimed(ClaimedResumedExecution),
902}
903
904// ═══════════════════════════════════════════════════════════════════════
905// Phase 4 contracts: stream
906// ═══════════════════════════════════════════════════════════════════════
907
908// ─── append_frame ───
909
910#[derive(Clone, Debug, Serialize, Deserialize)]
911pub struct AppendFrameArgs {
912 pub execution_id: ExecutionId,
913 pub attempt_index: AttemptIndex,
914 pub lease_id: LeaseId,
915 pub lease_epoch: LeaseEpoch,
916 pub attempt_id: AttemptId,
917 pub frame_type: String,
918 pub timestamp: TimestampMs,
919 pub payload: Vec<u8>,
920 #[serde(default)]
921 pub encoding: Option<String>,
922 /// Optional structured metadata for the frame (JSON blob).
923 #[serde(default)]
924 pub metadata_json: Option<String>,
925 #[serde(default)]
926 pub correlation_id: Option<String>,
927 #[serde(default)]
928 pub source: Option<String>,
929 /// MAXLEN for the stream. 0 = no trim.
930 #[serde(default)]
931 pub retention_maxlen: Option<u32>,
932 /// Max payload bytes per frame. Default: 65536.
933 #[serde(default)]
934 pub max_payload_bytes: Option<u32>,
935}
936
937#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
938pub enum AppendFrameResult {
939 /// Frame appended successfully.
940 Appended {
941 /// Valkey Stream entry ID (e.g. "1713100800150-0").
942 entry_id: String,
943 /// Total frame count after this append.
944 frame_count: u64,
945 },
946}
947
948// ─── StreamCursor (issue #92) ───
949
950/// Opaque cursor for attempt-stream reads/tails.
951///
952/// Replaces the bare `&str` / `String` stream-id parameters previously
953/// carried on `read_stream` / `tail_stream` / `ReadStreamParams` /
954/// `TailStreamParams`. The wire form is a flat string — serde is
955/// transparent via `try_from`/`into` — so `?from=start&to=end` and
956/// `?after=123-0` continue to work for REST clients.
957///
958/// # Public wire grammar
959///
960/// The ONLY accepted tokens are:
961///
962/// * `"start"` — first entry in the stream (XRANGE `-` equivalent).
963/// Valid in `read_stream` / `ReadStreamParams`.
964/// * `"end"` — latest entry in the stream (XRANGE `+` equivalent).
965/// Valid in `read_stream` / `ReadStreamParams`.
966/// * `"<ms>"` or `"<ms>-<seq>"` — a concrete Valkey Stream entry id.
967/// Valid everywhere.
968///
969/// The bare XRANGE/XREAD markers `"-"` and `"+"` are **NOT** accepted
970/// on the wire. The opaque `StreamCursor` grammar is the public
971/// contract; the Valkey `-`/`+` markers are an internal implementation
972/// detail carried only inside the Lua-adjacent [`ReadFramesArgs`] /
973/// `xread_block` path via [`StreamCursor::to_wire`].
974///
975/// For XREAD (tail), the documented "from the beginning" convention is
976/// `StreamCursor::At("0-0".into())` — use the convenience constructor
977/// [`StreamCursor::from_beginning`] which returns exactly that value.
978/// `Start` / `End` are rejected by the SDK's `tail_stream` boundary
979/// because XREAD does not accept `-` / `+` as cursors. The
980/// [`StreamCursor::is_concrete`] helper centralises this
981/// Start/End-vs-At decision for boundary-validation call sites.
982///
983/// # Why an enum instead of a string
984///
985/// A string parameter lets malformed ids escape to the Lua/Valkey
986/// layer, surfacing as a script error and HTTP 500. An enum with
987/// fallible `FromStr` / `TryFrom<String>` catches every malformed input
988/// at the wire boundary with a structured error, and prevents bare `-`
989/// / `+` from leaking into consumer code as tacit extensions of the
990/// public API.
991#[derive(Clone, Debug, PartialEq, Eq, Hash)]
992pub enum StreamCursor {
993 /// First entry in the stream (XRANGE start marker).
994 Start,
995 /// Latest entry in the stream (XRANGE end marker).
996 End,
997 /// A concrete Valkey Stream entry id (`<ms>` or `<ms>-<seq>`).
998 ///
999 /// For XREAD-style tails, the documented "from the beginning"
1000 /// convention is `At("0-0".to_owned())` — see
1001 /// [`StreamCursor::from_beginning`].
1002 At(String),
1003}
1004
1005impl StreamCursor {
1006 /// Convenience constructor for the XREAD-from-beginning convention
1007 /// (`"0-0"`). XREAD's `last_id` is exclusive, so passing this as
1008 /// the `after` cursor returns every entry in the stream.
1009 pub fn from_beginning() -> Self {
1010 Self::At("0-0".to_owned())
1011 }
1012
1013 /// Serde default helper — emits `StreamCursor::Start`. Used as
1014 /// `#[serde(default = "StreamCursor::start")]` on REST query
1015 /// structs.
1016 pub fn start() -> Self {
1017 Self::Start
1018 }
1019
1020 /// Serde default helper — emits `StreamCursor::End`.
1021 pub fn end() -> Self {
1022 Self::End
1023 }
1024
1025 /// Serde default helper — emits
1026 /// `StreamCursor::from_beginning()`. Used as the default for
1027 /// `TailStreamParams::after`.
1028 pub fn beginning() -> Self {
1029 Self::from_beginning()
1030 }
1031
1032 /// Internal-only: lower the cursor to the XRANGE/XREAD marker
1033 /// string Valkey expects. `Start → "-"`, `End → "+"`,
1034 /// `At(s) → s`.
1035 ///
1036 /// Used at the ff-script adapter edge (right before constructing
1037 /// `ReadFramesArgs` or calling `xread_block`) to translate the
1038 /// opaque wire grammar into the Lua-ABI form. NOT part of the
1039 /// public wire — do not emit these raw characters to consumers.
1040 /// Hidden from the generated docs to discourage external use;
1041 /// external consumers should never need to see the raw `-` / `+`.
1042 #[doc(hidden)]
1043 pub fn to_wire(&self) -> &str {
1044 match self {
1045 Self::Start => "-",
1046 Self::End => "+",
1047 Self::At(s) => s.as_str(),
1048 }
1049 }
1050
1051 /// Internal-only owned variant of [`Self::to_wire`] — moves the
1052 /// inner `String` out of `At(s)` without cloning. Use at adapter
1053 /// edges that construct an owned wire string (e.g.
1054 /// `ReadFramesArgs.from_id`) from a `StreamCursor` that is about
1055 /// to be dropped.
1056 #[doc(hidden)]
1057 pub fn into_wire_string(self) -> String {
1058 match self {
1059 Self::Start => "-".to_owned(),
1060 Self::End => "+".to_owned(),
1061 Self::At(s) => s,
1062 }
1063 }
1064
1065 /// True iff this cursor is a concrete entry id
1066 /// (`"<ms>"` / `"<ms>-<seq>"`). False for the open markers
1067 /// `Start` / `End`.
1068 ///
1069 /// Used by boundaries like XREAD (tailing) that do not accept
1070 /// open markers — rejecting a cursor is equivalent to
1071 /// `!cursor.is_concrete()`. Centralised here to keep the SDK and
1072 /// REST guards in lock-step.
1073 pub fn is_concrete(&self) -> bool {
1074 matches!(self, Self::At(_))
1075 }
1076}
1077
1078impl std::fmt::Display for StreamCursor {
1079 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1080 match self {
1081 Self::Start => f.write_str("start"),
1082 Self::End => f.write_str("end"),
1083 Self::At(s) => f.write_str(s),
1084 }
1085 }
1086}
1087
1088/// Error produced when parsing a [`StreamCursor`] from a string.
1089#[derive(Clone, Debug, PartialEq, Eq)]
1090pub enum StreamCursorParseError {
1091 /// Empty input.
1092 Empty,
1093 /// Input matched a rejected bare-marker alias (`"-"`, `"+"`).
1094 /// The public wire requires `"start"` / `"end"`; the raw Valkey
1095 /// markers are internal-only.
1096 BareMarkerRejected(String),
1097 /// Input was neither a recognized keyword nor a well-formed
1098 /// Stream entry id. Entry ids must match `^\d+(?:-\d+)?$`.
1099 Malformed(String),
1100}
1101
1102impl std::fmt::Display for StreamCursorParseError {
1103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1104 match self {
1105 Self::Empty => f.write_str("stream cursor must not be empty"),
1106 Self::BareMarkerRejected(s) => write!(
1107 f,
1108 "bare marker '{s}' is not a valid stream cursor; use 'start' or 'end'"
1109 ),
1110 Self::Malformed(s) => write!(
1111 f,
1112 "invalid stream cursor '{s}' (expected 'start', 'end', '<ms>', or '<ms>-<seq>')"
1113 ),
1114 }
1115 }
1116}
1117
1118impl std::error::Error for StreamCursorParseError {}
1119
1120/// Shared grammar check — classifies `s` as `Start` / `End` / a
1121/// concrete-id shape / malformed / empty, WITHOUT allocating. The
1122/// owned vs borrowed entry points ([`StreamCursor::from_str`],
1123/// [`StreamCursor::try_from`]) consume this classification and move
1124/// the owned `String` into `At` when applicable, avoiding a
1125/// round-trip `String → &str → String::to_owned` for the common
1126/// REST-query path.
1127enum StreamCursorClass {
1128 Start,
1129 End,
1130 Concrete,
1131 BareMarker,
1132 Empty,
1133 Malformed,
1134}
1135
1136fn classify_stream_cursor(s: &str) -> StreamCursorClass {
1137 if s.is_empty() {
1138 return StreamCursorClass::Empty;
1139 }
1140 if s == "-" || s == "+" {
1141 return StreamCursorClass::BareMarker;
1142 }
1143 if s == "start" {
1144 return StreamCursorClass::Start;
1145 }
1146 if s == "end" {
1147 return StreamCursorClass::End;
1148 }
1149 if !s.is_ascii() {
1150 return StreamCursorClass::Malformed;
1151 }
1152 let (ms_part, seq_part) = match s.split_once('-') {
1153 Some((ms, seq)) => (ms, Some(seq)),
1154 None => (s, None),
1155 };
1156 let ms_ok = !ms_part.is_empty() && ms_part.bytes().all(|b| b.is_ascii_digit());
1157 let seq_ok = seq_part
1158 .map(|p| !p.is_empty() && p.bytes().all(|b| b.is_ascii_digit()))
1159 .unwrap_or(true);
1160 if ms_ok && seq_ok {
1161 StreamCursorClass::Concrete
1162 } else {
1163 StreamCursorClass::Malformed
1164 }
1165}
1166
1167impl std::str::FromStr for StreamCursor {
1168 type Err = StreamCursorParseError;
1169
1170 fn from_str(s: &str) -> Result<Self, Self::Err> {
1171 match classify_stream_cursor(s) {
1172 StreamCursorClass::Start => Ok(Self::Start),
1173 StreamCursorClass::End => Ok(Self::End),
1174 StreamCursorClass::Concrete => Ok(Self::At(s.to_owned())),
1175 StreamCursorClass::BareMarker => {
1176 Err(StreamCursorParseError::BareMarkerRejected(s.to_owned()))
1177 }
1178 StreamCursorClass::Empty => Err(StreamCursorParseError::Empty),
1179 StreamCursorClass::Malformed => Err(StreamCursorParseError::Malformed(s.to_owned())),
1180 }
1181 }
1182}
1183
1184impl TryFrom<String> for StreamCursor {
1185 type Error = StreamCursorParseError;
1186
1187 fn try_from(s: String) -> Result<Self, Self::Error> {
1188 // Owned parsing path — the `At` variant moves `s` in directly,
1189 // avoiding the `&str → String::to_owned` re-allocation that a
1190 // blind forward to `FromStr::from_str(&s)` would force. Error
1191 // paths still pay one allocation to describe the offending
1192 // input.
1193 match classify_stream_cursor(&s) {
1194 StreamCursorClass::Start => Ok(Self::Start),
1195 StreamCursorClass::End => Ok(Self::End),
1196 StreamCursorClass::Concrete => Ok(Self::At(s)),
1197 StreamCursorClass::BareMarker => Err(StreamCursorParseError::BareMarkerRejected(s)),
1198 StreamCursorClass::Empty => Err(StreamCursorParseError::Empty),
1199 StreamCursorClass::Malformed => Err(StreamCursorParseError::Malformed(s)),
1200 }
1201 }
1202}
1203
1204impl From<StreamCursor> for String {
1205 fn from(c: StreamCursor) -> Self {
1206 c.to_string()
1207 }
1208}
1209
1210impl Serialize for StreamCursor {
1211 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
1212 serializer.collect_str(self)
1213 }
1214}
1215
1216impl<'de> Deserialize<'de> for StreamCursor {
1217 fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
1218 let s = String::deserialize(deserializer)?;
1219 Self::try_from(s).map_err(serde::de::Error::custom)
1220 }
1221}
1222
1223// ─── read_attempt_stream / tail_attempt_stream ───
1224
1225/// Hard cap on the number of frames returned by a single read/tail call.
1226///
1227/// Single source of truth across the Rust layer (ff-script, ff-server,
1228/// ff-sdk). The Lua side in `lua/stream.lua` keeps a matching literal with
1229/// an inline reference back here; bump both together if you ever need to
1230/// lift the cap.
1231pub const STREAM_READ_HARD_CAP: u64 = 10_000;
1232
1233/// A single frame read from an attempt-scoped stream.
1234///
1235/// Field set mirrors what `ff_append_frame` writes: `frame_type`, `ts`,
1236/// `payload`, `encoding`, `source`, and optionally `correlation_id`. Stored
1237/// as an ordered map so field order is deterministic across read calls.
1238#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1239pub struct StreamFrame {
1240 /// Valkey Stream entry ID, e.g. "1713100800150-0".
1241 pub id: String,
1242 /// Frame fields in sorted order.
1243 pub fields: std::collections::BTreeMap<String, String>,
1244}
1245
1246/// Inputs to `ff_read_attempt_stream` (XRANGE wrapper).
1247#[derive(Clone, Debug, Serialize, Deserialize)]
1248pub struct ReadFramesArgs {
1249 pub execution_id: ExecutionId,
1250 pub attempt_index: AttemptIndex,
1251 /// XRANGE start ID. Use "-" for earliest.
1252 pub from_id: String,
1253 /// XRANGE end ID. Use "+" for latest.
1254 pub to_id: String,
1255 /// XRANGE COUNT limit. MUST be `>= 1`. The REST and SDK layers reject
1256 /// `0` at the boundary; the Lua side rejects it too. `STREAM_READ_HARD_CAP`
1257 /// is the upper bound.
1258 pub count_limit: u64,
1259}
1260
1261/// Result of reading frames from an attempt stream — frames plus terminal
1262/// signal so consumers can stop polling without a timeout fallback.
1263#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1264pub struct StreamFrames {
1265 /// Entries in the requested range (possibly empty).
1266 pub frames: Vec<StreamFrame>,
1267 /// Timestamp when the upstream writer closed the stream. `None` if the
1268 /// stream is still open (or has never been written).
1269 #[serde(default, skip_serializing_if = "Option::is_none")]
1270 pub closed_at: Option<TimestampMs>,
1271 /// Reason from the closing writer. Current values:
1272 /// `attempt_success`, `attempt_failure`, `attempt_cancelled`,
1273 /// `attempt_interrupted`. `None` iff the stream is still open.
1274 #[serde(default, skip_serializing_if = "Option::is_none")]
1275 pub closed_reason: Option<String>,
1276}
1277
1278impl StreamFrames {
1279 /// Construct an empty open-stream result (no frames, no terminal
1280 /// markers). Useful for fast-path peek helpers.
1281 pub fn empty_open() -> Self {
1282 Self {
1283 frames: Vec::new(),
1284 closed_at: None,
1285 closed_reason: None,
1286 }
1287 }
1288
1289 /// True iff the producer has closed this stream. Consumers should stop
1290 /// polling and drain once this returns true.
1291 pub fn is_closed(&self) -> bool {
1292 self.closed_at.is_some()
1293 }
1294}
1295
1296#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1297pub enum ReadFramesResult {
1298 /// Frames returned (possibly empty) plus optional closed markers.
1299 Frames(StreamFrames),
1300}
1301
1302// ═══════════════════════════════════════════════════════════════════════
1303// Phase 5 contracts: budget, quota, block/unblock
1304// ═══════════════════════════════════════════════════════════════════════
1305
1306// ─── create_budget ───
1307
1308#[derive(Clone, Debug, Serialize, Deserialize)]
1309pub struct CreateBudgetArgs {
1310 pub budget_id: crate::types::BudgetId,
1311 pub scope_type: String,
1312 pub scope_id: String,
1313 pub enforcement_mode: String,
1314 pub on_hard_limit: String,
1315 pub on_soft_limit: String,
1316 pub reset_interval_ms: u64,
1317 /// Dimension names.
1318 pub dimensions: Vec<String>,
1319 /// Hard limits per dimension (parallel with dimensions).
1320 pub hard_limits: Vec<u64>,
1321 /// Soft limits per dimension (parallel with dimensions).
1322 pub soft_limits: Vec<u64>,
1323 pub now: TimestampMs,
1324}
1325
1326#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1327pub enum CreateBudgetResult {
1328 /// Budget created.
1329 Created { budget_id: crate::types::BudgetId },
1330 /// Already exists (idempotent).
1331 AlreadySatisfied { budget_id: crate::types::BudgetId },
1332}
1333
1334// ─── create_quota_policy ───
1335
1336#[derive(Clone, Debug, Serialize, Deserialize)]
1337pub struct CreateQuotaPolicyArgs {
1338 pub quota_policy_id: crate::types::QuotaPolicyId,
1339 pub window_seconds: u64,
1340 pub max_requests_per_window: u64,
1341 pub max_concurrent: u64,
1342 pub now: TimestampMs,
1343}
1344
1345#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1346pub enum CreateQuotaPolicyResult {
1347 /// Quota policy created.
1348 Created {
1349 quota_policy_id: crate::types::QuotaPolicyId,
1350 },
1351 /// Already exists (idempotent).
1352 AlreadySatisfied {
1353 quota_policy_id: crate::types::QuotaPolicyId,
1354 },
1355}
1356
1357// ─── budget_status (read-only) ───
1358
1359/// Operator-facing budget status snapshot (not an FCALL — direct HGETALL reads).
1360#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1361pub struct BudgetStatus {
1362 pub budget_id: String,
1363 pub scope_type: String,
1364 pub scope_id: String,
1365 pub enforcement_mode: String,
1366 /// Current usage per dimension: {dimension_name: current_value}.
1367 pub usage: HashMap<String, u64>,
1368 /// Hard limits per dimension: {dimension_name: limit}.
1369 pub hard_limits: HashMap<String, u64>,
1370 /// Soft limits per dimension: {dimension_name: limit}.
1371 pub soft_limits: HashMap<String, u64>,
1372 pub breach_count: u64,
1373 pub soft_breach_count: u64,
1374 pub last_breach_at: Option<String>,
1375 pub last_breach_dim: Option<String>,
1376 pub next_reset_at: Option<String>,
1377 pub created_at: Option<String>,
1378}
1379
1380// ─── report_usage_and_check ───
1381
1382#[derive(Clone, Debug, Serialize, Deserialize)]
1383pub struct ReportUsageArgs {
1384 /// Dimension names to increment.
1385 pub dimensions: Vec<String>,
1386 /// Increment values (parallel with dimensions).
1387 pub deltas: Vec<u64>,
1388 pub now: TimestampMs,
1389 /// Optional idempotency key to prevent double-counting on retries.
1390 /// Pass the raw dedup id (e.g. `"retry-42"`); the typed FCALL wrapper
1391 /// wraps it into `ff:usagededup:{b:M}:<id>` using the budget
1392 /// partition's hash tag so it co-locates with the other budget keys
1393 /// (#108).
1394 #[serde(default)]
1395 pub dedup_key: Option<String>,
1396}
1397
1398#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1399#[non_exhaustive]
1400pub enum ReportUsageResult {
1401 /// All increments applied, no breach.
1402 Ok,
1403 /// Soft limit breached on a dimension (advisory, increments applied).
1404 SoftBreach {
1405 dimension: String,
1406 current_usage: u64,
1407 soft_limit: u64,
1408 },
1409 /// Hard limit breached (increments NOT applied).
1410 HardBreach {
1411 dimension: String,
1412 current_usage: u64,
1413 hard_limit: u64,
1414 },
1415 /// Dedup key matched — usage already applied in a prior call.
1416 AlreadyApplied,
1417}
1418
1419// ─── reset_budget ───
1420
1421#[derive(Clone, Debug, Serialize, Deserialize)]
1422pub struct ResetBudgetArgs {
1423 pub budget_id: crate::types::BudgetId,
1424 pub now: TimestampMs,
1425}
1426
1427#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1428pub enum ResetBudgetResult {
1429 /// Budget reset successfully.
1430 Reset { next_reset_at: TimestampMs },
1431}
1432
1433// ─── check_admission_and_record ───
1434
1435#[derive(Clone, Debug, Serialize, Deserialize)]
1436pub struct CheckAdmissionArgs {
1437 pub execution_id: ExecutionId,
1438 pub now: TimestampMs,
1439 pub window_seconds: u64,
1440 pub rate_limit: u64,
1441 pub concurrency_cap: u64,
1442 #[serde(default)]
1443 pub jitter_ms: Option<u64>,
1444}
1445
1446#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1447pub enum CheckAdmissionResult {
1448 /// Admitted — execution may proceed.
1449 Admitted,
1450 /// Already admitted in this window (idempotent).
1451 AlreadyAdmitted,
1452 /// Rate limit exceeded.
1453 RateExceeded { retry_after_ms: u64 },
1454 /// Concurrency cap hit.
1455 ConcurrencyExceeded,
1456}
1457
1458// ─── release_admission ───
1459
1460#[derive(Clone, Debug, Serialize, Deserialize)]
1461pub struct ReleaseAdmissionArgs {
1462 pub execution_id: ExecutionId,
1463}
1464
1465#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1466pub enum ReleaseAdmissionResult {
1467 Released,
1468}
1469
1470// ─── block_execution_for_admission ───
1471
1472#[derive(Clone, Debug, Serialize, Deserialize)]
1473pub struct BlockExecutionArgs {
1474 pub execution_id: ExecutionId,
1475 pub blocking_reason: String,
1476 #[serde(default)]
1477 pub blocking_detail: Option<String>,
1478 pub now: TimestampMs,
1479}
1480
1481#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1482pub enum BlockExecutionResult {
1483 /// Execution blocked.
1484 Blocked,
1485}
1486
1487// ─── unblock_execution ───
1488
1489#[derive(Clone, Debug, Serialize, Deserialize)]
1490pub struct UnblockExecutionArgs {
1491 pub execution_id: ExecutionId,
1492 pub now: TimestampMs,
1493 /// Expected blocking reason (prevents stale unblock).
1494 #[serde(default)]
1495 pub expected_blocking_reason: Option<String>,
1496}
1497
1498#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1499pub enum UnblockExecutionResult {
1500 /// Execution unblocked and moved to eligible.
1501 Unblocked,
1502}
1503
1504// ═══════════════════════════════════════════════════════════════════════
1505// Phase 6 contracts: flow coordination and dependencies
1506// ═══════════════════════════════════════════════════════════════════════
1507
1508// ─── create_flow ───
1509
1510#[derive(Clone, Debug, Serialize, Deserialize)]
1511pub struct CreateFlowArgs {
1512 pub flow_id: crate::types::FlowId,
1513 pub flow_kind: String,
1514 pub namespace: Namespace,
1515 pub now: TimestampMs,
1516}
1517
1518#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1519pub enum CreateFlowResult {
1520 /// Flow created successfully.
1521 Created { flow_id: crate::types::FlowId },
1522 /// Flow already exists (idempotent).
1523 AlreadySatisfied { flow_id: crate::types::FlowId },
1524}
1525
1526// ─── add_execution_to_flow ───
1527
1528#[derive(Clone, Debug, Serialize, Deserialize)]
1529pub struct AddExecutionToFlowArgs {
1530 pub flow_id: crate::types::FlowId,
1531 pub execution_id: ExecutionId,
1532 pub now: TimestampMs,
1533}
1534
1535#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1536pub enum AddExecutionToFlowResult {
1537 /// Execution added to flow.
1538 Added {
1539 execution_id: ExecutionId,
1540 new_node_count: u32,
1541 },
1542 /// Already a member (idempotent).
1543 AlreadyMember {
1544 execution_id: ExecutionId,
1545 node_count: u32,
1546 },
1547}
1548
1549// ─── cancel_flow ───
1550
1551#[derive(Clone, Debug, Serialize, Deserialize)]
1552pub struct CancelFlowArgs {
1553 pub flow_id: crate::types::FlowId,
1554 pub reason: String,
1555 pub cancellation_policy: String,
1556 pub now: TimestampMs,
1557}
1558
1559#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1560pub enum CancelFlowResult {
1561 /// Flow cancelled and all member cancellations (if any) have completed
1562 /// synchronously. Used when `cancellation_policy != "cancel_all"`, when
1563 /// the flow has no members, when the caller opted into synchronous
1564 /// dispatch (e.g. `?wait=true`), or when the flow was already in a
1565 /// terminal state (idempotent retry).
1566 ///
1567 /// On the idempotent-retry path `member_execution_ids` may be *capped*
1568 /// at the server (default 1000 entries) to bound response bandwidth on
1569 /// flows with very large membership. The first (non-idempotent) call
1570 /// always returns the full list, so clients that need every member id
1571 /// should persist the initial response.
1572 Cancelled {
1573 cancellation_policy: String,
1574 member_execution_ids: Vec<String>,
1575 },
1576 /// Flow state was flipped to cancelled atomically, but member
1577 /// cancellations are dispatched asynchronously in the background.
1578 /// Clients may poll `GET /v1/executions/{id}/state` for each member
1579 /// execution id to track terminal state.
1580 CancellationScheduled {
1581 cancellation_policy: String,
1582 member_count: u32,
1583 member_execution_ids: Vec<String>,
1584 },
1585 /// `?wait=true` dispatch completed but one or more member cancellations
1586 /// failed mid-loop (e.g. ghost member, Lua error, transport fault after
1587 /// retries exhausted). The flow itself is still flipped to cancelled
1588 /// (atomic Lua already ran); callers SHOULD inspect
1589 /// `failed_member_execution_ids` and either retry those ids directly
1590 /// via `cancel_execution` or wait for the cancel-backlog reconciler
1591 /// to retry them in the background.
1592 ///
1593 /// Only emitted by the synchronous wait path
1594 /// ([`crate::CancelFlowArgs`] via `?wait=true`). The async path returns
1595 /// [`CancelFlowResult::CancellationScheduled`] and delegates retries
1596 /// to the reconciler — there is no visible "partial" state on the
1597 /// async path because the dispatch result is not observed inline.
1598 PartiallyCancelled {
1599 cancellation_policy: String,
1600 /// All member execution ids that the cancel_flow FCALL returned
1601 /// (i.e. the full membership at the moment of cancellation).
1602 member_execution_ids: Vec<String>,
1603 /// Strict subset of `member_execution_ids` whose per-member cancel
1604 /// FCALL returned an error. Order is deterministic (matches the
1605 /// iteration order over `member_execution_ids`).
1606 failed_member_execution_ids: Vec<String>,
1607 },
1608}
1609
1610// ─── stage_dependency_edge ───
1611
1612#[derive(Clone, Debug, Serialize, Deserialize)]
1613pub struct StageDependencyEdgeArgs {
1614 pub flow_id: crate::types::FlowId,
1615 pub edge_id: crate::types::EdgeId,
1616 pub upstream_execution_id: ExecutionId,
1617 pub downstream_execution_id: ExecutionId,
1618 #[serde(default = "default_dependency_kind")]
1619 pub dependency_kind: String,
1620 #[serde(default)]
1621 pub data_passing_ref: Option<String>,
1622 pub expected_graph_revision: u64,
1623 pub now: TimestampMs,
1624}
1625
1626fn default_dependency_kind() -> String {
1627 "success_only".to_owned()
1628}
1629
1630#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1631pub enum StageDependencyEdgeResult {
1632 /// Edge staged, new graph revision.
1633 Staged {
1634 edge_id: crate::types::EdgeId,
1635 new_graph_revision: u64,
1636 },
1637}
1638
1639// ─── apply_dependency_to_child ───
1640
1641#[derive(Clone, Debug, Serialize, Deserialize)]
1642pub struct ApplyDependencyToChildArgs {
1643 pub flow_id: crate::types::FlowId,
1644 pub edge_id: crate::types::EdgeId,
1645 /// The child execution that receives the dependency.
1646 pub downstream_execution_id: ExecutionId,
1647 pub upstream_execution_id: ExecutionId,
1648 pub graph_revision: u64,
1649 #[serde(default = "default_dependency_kind")]
1650 pub dependency_kind: String,
1651 #[serde(default)]
1652 pub data_passing_ref: Option<String>,
1653 pub now: TimestampMs,
1654}
1655
1656#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1657pub enum ApplyDependencyToChildResult {
1658 /// Dependency applied, N unsatisfied deps remaining.
1659 Applied { unsatisfied_count: u32 },
1660 /// Already applied (idempotent).
1661 AlreadyApplied,
1662}
1663
1664// ─── resolve_dependency ───
1665
1666#[derive(Clone, Debug, Serialize, Deserialize)]
1667pub struct ResolveDependencyArgs {
1668 pub edge_id: crate::types::EdgeId,
1669 /// "success", "failed", "cancelled", "expired", "skipped"
1670 pub upstream_outcome: String,
1671 pub now: TimestampMs,
1672}
1673
1674#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1675pub enum ResolveDependencyResult {
1676 /// Edge satisfied — downstream may become eligible.
1677 Satisfied,
1678 /// Edge made impossible — downstream becomes skipped.
1679 Impossible,
1680 /// Already resolved (idempotent).
1681 AlreadyResolved,
1682}
1683
1684// ─── promote_blocked_to_eligible ───
1685
1686#[derive(Clone, Debug, Serialize, Deserialize)]
1687pub struct PromoteBlockedToEligibleArgs {
1688 pub execution_id: ExecutionId,
1689 pub now: TimestampMs,
1690}
1691
1692#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1693pub enum PromoteBlockedToEligibleResult {
1694 Promoted,
1695}
1696
1697// ─── evaluate_flow_eligibility ───
1698
1699#[derive(Clone, Debug, Serialize, Deserialize)]
1700pub struct EvaluateFlowEligibilityArgs {
1701 pub execution_id: ExecutionId,
1702}
1703
1704#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1705pub enum EvaluateFlowEligibilityResult {
1706 /// Execution eligibility status.
1707 Status { status: String },
1708}
1709
1710// ─── replay_execution ───
1711
1712#[derive(Clone, Debug, Serialize, Deserialize)]
1713pub struct ReplayExecutionArgs {
1714 pub execution_id: ExecutionId,
1715 pub now: TimestampMs,
1716}
1717
1718#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1719pub enum ReplayExecutionResult {
1720 /// Replayed to runnable.
1721 Replayed { public_state: PublicState },
1722}
1723
1724// ─── get_execution (full read) ───
1725
1726/// Full execution info returned by `Server::get_execution`.
1727#[derive(Clone, Debug, Serialize, Deserialize)]
1728pub struct ExecutionInfo {
1729 pub execution_id: ExecutionId,
1730 pub namespace: String,
1731 pub lane_id: String,
1732 pub priority: i32,
1733 pub execution_kind: String,
1734 pub state_vector: StateVector,
1735 pub public_state: PublicState,
1736 pub created_at: String,
1737 /// TimestampMs (ms since epoch) when the execution's first attempt
1738 /// was started by a worker claim. Empty string until the first
1739 /// claim lands. Serialised as `Option<String>` so pre-claim reads
1740 /// deserialise cleanly even if the field is absent from the wire.
1741 #[serde(default, skip_serializing_if = "Option::is_none")]
1742 pub started_at: Option<String>,
1743 /// TimestampMs when the execution reached a terminal
1744 /// `completed`/`failed`/`cancelled`/`expired` state. Empty /
1745 /// absent while still in flight.
1746 #[serde(default, skip_serializing_if = "Option::is_none")]
1747 pub completed_at: Option<String>,
1748 pub current_attempt_index: u32,
1749 pub flow_id: Option<String>,
1750 pub blocking_detail: String,
1751}
1752
1753// ─── set_execution_tags / set_flow_tags (issue #58.4) ───
1754
1755/// Args for `ff_set_execution_tags`. Tag keys MUST match
1756/// `^[a-z][a-z0-9_]*\.` — the caller-namespace rule — or the FCALL
1757/// returns `invalid_tag_key`. Values are arbitrary strings. The map is
1758/// ordered (`BTreeMap`) so two callers submitting the same logical set
1759/// of tags produce identical ARGV.
1760#[derive(Clone, Debug, Serialize, Deserialize)]
1761pub struct SetExecutionTagsArgs {
1762 pub execution_id: ExecutionId,
1763 pub tags: BTreeMap<String, String>,
1764}
1765
1766/// Result of `ff_set_execution_tags`.
1767#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1768pub enum SetExecutionTagsResult {
1769 /// Tags written. `count` is the number of key-value pairs applied.
1770 Ok { count: u32 },
1771}
1772
1773/// Args for `ff_set_flow_tags`. Same namespace rule as
1774/// [`SetExecutionTagsArgs`]. The Lua function also lazy-migrates any
1775/// pre-58.4 reserved-namespace fields stashed inline on `flow_core` into
1776/// the new tags key.
1777#[derive(Clone, Debug, Serialize, Deserialize)]
1778pub struct SetFlowTagsArgs {
1779 pub flow_id: FlowId,
1780 pub tags: BTreeMap<String, String>,
1781}
1782
1783/// Result of `ff_set_flow_tags`.
1784#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1785pub enum SetFlowTagsResult {
1786 /// Tags written. `count` is the number of key-value pairs applied.
1787 Ok { count: u32 },
1788}
1789
1790// ─── describe_execution (issue #58.1) ───
1791
1792/// Engine-decoupled read-model for one execution.
1793///
1794/// Returned by `ff_sdk::FlowFabricWorker::describe_execution`. Consumers
1795/// consult this struct instead of reaching into Valkey's exec_core hash
1796/// directly — the engine is free to rename fields or restructure storage
1797/// under this surface.
1798///
1799/// `#[non_exhaustive]` — FF may add fields in minor releases without a
1800/// semver break. Match with `..` or use field-by-field construction.
1801#[derive(Clone, Debug, PartialEq, Eq)]
1802#[non_exhaustive]
1803pub struct ExecutionSnapshot {
1804 pub execution_id: ExecutionId,
1805 pub flow_id: Option<FlowId>,
1806 pub lane_id: LaneId,
1807 pub namespace: Namespace,
1808 pub public_state: PublicState,
1809 /// Blocking reason string (e.g. `"waiting_for_worker"`,
1810 /// `"waiting_for_delay"`, `"waiting_for_dependencies"`). `None` when
1811 /// the exec_core field is empty.
1812 pub blocking_reason: Option<String>,
1813 /// Free-form operator-readable detail explaining `blocking_reason`.
1814 /// `None` when the exec_core field is empty.
1815 pub blocking_detail: Option<String>,
1816 /// Summary of the execution's currently-active attempt. `None` when
1817 /// no attempt has been started (pre-claim) or when the exec_core
1818 /// attempt fields are all empty.
1819 pub current_attempt: Option<AttemptSummary>,
1820 /// Summary of the execution's currently-held lease. `None` when the
1821 /// execution is not held by a worker.
1822 pub current_lease: Option<LeaseSummary>,
1823 /// The waitpoint this execution is currently suspended on, if any.
1824 pub current_waitpoint: Option<WaitpointId>,
1825 pub created_at: TimestampMs,
1826 /// Timestamp of the last write that mutated exec_core. Engine-maintained.
1827 pub last_mutation_at: TimestampMs,
1828 pub total_attempt_count: u32,
1829 /// Caller-owned labels. The prefix `^[a-z][a-z0-9_]*\.` is reserved for
1830 /// consumer metadata (e.g. `cairn.task_id`); FF guarantees it will not
1831 /// write keys matching that shape. FF's own fields stay in snake_case
1832 /// without dots. Empty when no tags are set.
1833 pub tags: BTreeMap<String, String>,
1834}
1835
1836impl ExecutionSnapshot {
1837 /// Construct an [`ExecutionSnapshot`]. Present so downstream crates
1838 /// (ff-sdk's `describe_execution`) can assemble the struct despite
1839 /// the `#[non_exhaustive]` marker. Prefer adding builder-style
1840 /// helpers here over loosening `non_exhaustive`.
1841 #[allow(clippy::too_many_arguments)]
1842 pub fn new(
1843 execution_id: ExecutionId,
1844 flow_id: Option<FlowId>,
1845 lane_id: LaneId,
1846 namespace: Namespace,
1847 public_state: PublicState,
1848 blocking_reason: Option<String>,
1849 blocking_detail: Option<String>,
1850 current_attempt: Option<AttemptSummary>,
1851 current_lease: Option<LeaseSummary>,
1852 current_waitpoint: Option<WaitpointId>,
1853 created_at: TimestampMs,
1854 last_mutation_at: TimestampMs,
1855 total_attempt_count: u32,
1856 tags: BTreeMap<String, String>,
1857 ) -> Self {
1858 Self {
1859 execution_id,
1860 flow_id,
1861 lane_id,
1862 namespace,
1863 public_state,
1864 blocking_reason,
1865 blocking_detail,
1866 current_attempt,
1867 current_lease,
1868 current_waitpoint,
1869 created_at,
1870 last_mutation_at,
1871 total_attempt_count,
1872 tags,
1873 }
1874 }
1875}
1876
1877/// Currently-active attempt summary inside an [`ExecutionSnapshot`].
1878///
1879/// `#[non_exhaustive]`.
1880#[derive(Clone, Debug, PartialEq, Eq)]
1881#[non_exhaustive]
1882pub struct AttemptSummary {
1883 pub attempt_id: AttemptId,
1884 pub attempt_index: AttemptIndex,
1885}
1886
1887impl AttemptSummary {
1888 /// Construct an [`AttemptSummary`]. See [`ExecutionSnapshot::new`]
1889 /// for the rationale — `#[non_exhaustive]` blocks cross-crate
1890 /// struct-literal construction.
1891 pub fn new(attempt_id: AttemptId, attempt_index: AttemptIndex) -> Self {
1892 Self {
1893 attempt_id,
1894 attempt_index,
1895 }
1896 }
1897}
1898
1899/// Currently-held lease summary inside an [`ExecutionSnapshot`].
1900///
1901/// `#[non_exhaustive]`.
1902#[derive(Clone, Debug, PartialEq, Eq)]
1903#[non_exhaustive]
1904pub struct LeaseSummary {
1905 pub lease_epoch: LeaseEpoch,
1906 pub worker_instance_id: WorkerInstanceId,
1907 pub expires_at: TimestampMs,
1908}
1909
1910impl LeaseSummary {
1911 /// Construct a [`LeaseSummary`]. See [`ExecutionSnapshot::new`]
1912 /// for the rationale.
1913 pub fn new(
1914 lease_epoch: LeaseEpoch,
1915 worker_instance_id: WorkerInstanceId,
1916 expires_at: TimestampMs,
1917 ) -> Self {
1918 Self {
1919 lease_epoch,
1920 worker_instance_id,
1921 expires_at,
1922 }
1923 }
1924}
1925
1926// ─── Common sub-types ───
1927
1928// ─── describe_flow (issue #58.2) ───
1929
1930/// Engine-decoupled read-model for one flow.
1931///
1932/// Returned by `ff_sdk::FlowFabricWorker::describe_flow`. Consumers
1933/// consult this struct instead of reaching into Valkey's flow_core hash
1934/// directly — the engine is free to rename fields or restructure storage
1935/// under this surface.
1936///
1937/// `#[non_exhaustive]` — FF may add fields in minor releases without a
1938/// semver break. Match with `..` or use [`FlowSnapshot::new`].
1939///
1940/// # `public_flow_state`
1941///
1942/// Stored as an engine-written string literal on `flow_core`. Known
1943/// values today: `open`, `running`, `blocked`, `cancelled`, `completed`,
1944/// `failed`. Surfaced as `String` (not a typed enum) because FF does
1945/// not yet expose a `PublicFlowState` type — callers that need to act
1946/// on specific values should match on the literal. The flow_projector
1947/// writes a parallel `public_flow_state` into the flow's summary hash;
1948/// this field reflects the authoritative value on `flow_core`, which
1949/// is what mutation guards (cancel/add-member) consult.
1950///
1951/// # `tags`
1952///
1953/// Unlike [`ExecutionSnapshot::tags`] (which has a dedicated tags
1954/// hash), flow tags live inline on `flow_core`. FF's own fields are
1955/// snake_case without a `.`; any field whose name starts with
1956/// `<lowercase>.` (e.g. `cairn.task_id`) is treated as consumer-owned
1957/// metadata and routed here. An empty map means no namespaced tags
1958/// were written. The prefix convention mirrors
1959/// [`ExecutionSnapshot::tags`] — consumers should keep tag keys
1960/// namespaced (`cairn.*`, `operator.*`, etc.) so future FF field
1961/// additions don't collide.
1962#[derive(Clone, Debug, PartialEq, Eq)]
1963#[non_exhaustive]
1964pub struct FlowSnapshot {
1965 pub flow_id: FlowId,
1966 /// The `flow_kind` literal passed to `create_flow` (e.g. `dag`,
1967 /// `pipeline`). Preserved as-is; FF does not interpret it.
1968 pub flow_kind: String,
1969 pub namespace: Namespace,
1970 /// Authoritative flow state on `flow_core`. See the struct-level
1971 /// docs for the set of known values.
1972 pub public_flow_state: String,
1973 /// Monotonically increasing revision bumped on every structural
1974 /// mutation (add-member, stage-edge). Used by optimistic-concurrency
1975 /// writers via `expected_graph_revision`.
1976 pub graph_revision: u64,
1977 /// Number of member executions added so far. Never decremented.
1978 pub node_count: u32,
1979 /// Number of dependency edges staged so far. Never decremented.
1980 pub edge_count: u32,
1981 pub created_at: TimestampMs,
1982 /// Timestamp of the last write that mutated `flow_core`.
1983 /// Engine-maintained.
1984 pub last_mutation_at: TimestampMs,
1985 /// When the flow reached a terminal state via `cancel_flow`. `None`
1986 /// while the flow is live. Only written by the cancel path today;
1987 /// `completed`/`failed` terminal states do not populate this field
1988 /// (the flow_projector derives them from membership).
1989 pub cancelled_at: Option<TimestampMs>,
1990 /// Operator-supplied reason from the `cancel_flow` call. `None`
1991 /// when the flow has not been cancelled.
1992 pub cancel_reason: Option<String>,
1993 /// The `cancellation_policy` value persisted by `cancel_flow`
1994 /// (e.g. `cancel_all`, `cancel_flow_only`). `None` for flows
1995 /// cancelled before this field was persisted, or not yet cancelled.
1996 pub cancellation_policy: Option<String>,
1997 /// Consumer-owned namespaced metadata (e.g. `cairn.task_id`). See
1998 /// the struct-level docs for the routing rule.
1999 pub tags: BTreeMap<String, String>,
2000 /// RFC-016 Stage A: inbound edge groups known on this flow.
2001 ///
2002 /// One entry per downstream execution that has at least one staged
2003 /// inbound dependency edge. Populated from the
2004 /// `ff:flow:{fp:N}:<flow_id>:edgegroup:<downstream_eid>` hash —
2005 /// when that hash is absent (existing flows created before Stage A),
2006 /// the backend falls back to reading the legacy
2007 /// `deps_meta.unsatisfied_required_count` counter on the
2008 /// downstream's exec partition and reports the group as
2009 /// [`EdgeDependencyPolicy::AllOf`] with the derived counters
2010 /// (backward-compat shim — see RFC-016 §11 Stage A).
2011 ///
2012 /// Every entry in Stage A reports `policy = AllOf`; Stage B/C/D/E
2013 /// extend the variants and wire the quorum counters.
2014 pub edge_groups: Vec<EdgeGroupSnapshot>,
2015}
2016
2017impl FlowSnapshot {
2018 /// Construct a [`FlowSnapshot`]. Present so downstream crates
2019 /// (ff-sdk's `describe_flow`) can assemble the struct despite the
2020 /// `#[non_exhaustive]` marker.
2021 #[allow(clippy::too_many_arguments)]
2022 pub fn new(
2023 flow_id: FlowId,
2024 flow_kind: String,
2025 namespace: Namespace,
2026 public_flow_state: String,
2027 graph_revision: u64,
2028 node_count: u32,
2029 edge_count: u32,
2030 created_at: TimestampMs,
2031 last_mutation_at: TimestampMs,
2032 cancelled_at: Option<TimestampMs>,
2033 cancel_reason: Option<String>,
2034 cancellation_policy: Option<String>,
2035 tags: BTreeMap<String, String>,
2036 edge_groups: Vec<EdgeGroupSnapshot>,
2037 ) -> Self {
2038 Self {
2039 flow_id,
2040 flow_kind,
2041 namespace,
2042 public_flow_state,
2043 graph_revision,
2044 node_count,
2045 edge_count,
2046 created_at,
2047 last_mutation_at,
2048 cancelled_at,
2049 cancel_reason,
2050 cancellation_policy,
2051 tags,
2052 edge_groups,
2053 }
2054 }
2055}
2056
2057// ─── describe_edge / list_*_edges (issue #58.3) ───
2058
2059/// Engine-decoupled read-model for one dependency edge.
2060///
2061/// Returned by `ff_sdk::FlowFabricWorker::describe_edge`,
2062/// `list_incoming_edges`, and `list_outgoing_edges`. Consumers consult
2063/// this struct instead of reaching into Valkey's per-flow `edge:` hash
2064/// directly — the engine is free to rename hash fields or restructure
2065/// key layout under this surface.
2066///
2067/// `#[non_exhaustive]` — FF may add fields in minor releases without a
2068/// semver break. Match with `..` or use [`EdgeSnapshot::new`].
2069///
2070/// # Fields
2071///
2072/// The struct mirrors the immutable edge record written by
2073/// `ff_stage_dependency_edge` (see `lua/flow.lua`). The flow-scoped
2074/// edge hash is only ever written once, at staging time; per-execution
2075/// resolution state lives on a separate `dep:<edge_id>` hash and is not
2076/// surfaced here. The `edge_state` field therefore reflects the
2077/// staging-time literal (currently `pending`), not the downstream
2078/// execution's dep-edge state.
2079#[derive(Clone, Debug, PartialEq, Eq)]
2080#[non_exhaustive]
2081pub struct EdgeSnapshot {
2082 pub edge_id: EdgeId,
2083 pub flow_id: FlowId,
2084 pub upstream_execution_id: ExecutionId,
2085 pub downstream_execution_id: ExecutionId,
2086 /// The `dependency_kind` literal (e.g. `success_only`) from
2087 /// `stage_dependency_edge`. Preserved as-is; FF does not interpret
2088 /// it on reads.
2089 pub dependency_kind: String,
2090 /// The satisfaction-condition literal stamped at staging time
2091 /// (e.g. `all_required`).
2092 pub satisfaction_condition: String,
2093 /// Optional opaque handle to a data-passing artifact. `None` when
2094 /// the stored field is empty (the most common case).
2095 pub data_passing_ref: Option<String>,
2096 /// Edge-state literal on the flow-scoped edge hash. Written once
2097 /// at staging as `pending`; this hash is immutable on the flow
2098 /// side. Per-execution resolution state is tracked separately on
2099 /// the child's `dep:<edge_id>` hash.
2100 pub edge_state: String,
2101 pub created_at: TimestampMs,
2102 /// Origin of the edge (e.g. `engine`). Preserved as-is.
2103 pub created_by: String,
2104}
2105
2106/// Direction marker for [`crate::engine_backend::EngineBackend::list_edges`].
2107///
2108/// Carries the subject execution whose adjacency side the caller wants
2109/// to list — mirrors the internal `AdjacencySide + subject_eid` pair
2110/// the ff-sdk free-fn `list_edges_from_set` already uses. Keeping
2111/// direction + subject fused in one enum means the trait method has a
2112/// single `direction` parameter rather than a `(side, eid)` pair, and
2113/// the backend impl can't forget one of the two.
2114///
2115/// * `Outgoing { from_node }` — the caller wants every edge whose
2116/// `upstream_execution_id == from_node`. Corresponds to the
2117/// `out:<execution_id>` adjacency SET under the execution's flow
2118/// partition.
2119/// * `Incoming { to_node }` — the caller wants every edge whose
2120/// `downstream_execution_id == to_node`. Corresponds to the
2121/// `in:<execution_id>` adjacency SET under the execution's flow
2122/// partition.
2123#[derive(Clone, Debug, PartialEq, Eq)]
2124pub enum EdgeDirection {
2125 /// Edges leaving `from_node` — `out:` adjacency SET.
2126 Outgoing {
2127 /// The subject execution whose outgoing edges to list.
2128 from_node: ExecutionId,
2129 },
2130 /// Edges landing on `to_node` — `in:` adjacency SET.
2131 Incoming {
2132 /// The subject execution whose incoming edges to list.
2133 to_node: ExecutionId,
2134 },
2135}
2136
2137impl EdgeDirection {
2138 /// Return the subject execution id regardless of direction. Shared
2139 /// helper for backend impls that need the execution id for the
2140 /// initial `HGET exec_core.flow_id` lookup (flow routing) before
2141 /// they know which adjacency SET to read.
2142 pub fn subject(&self) -> &ExecutionId {
2143 match self {
2144 Self::Outgoing { from_node } => from_node,
2145 Self::Incoming { to_node } => to_node,
2146 }
2147 }
2148}
2149
2150impl EdgeSnapshot {
2151 /// Construct an [`EdgeSnapshot`]. Present so downstream crates
2152 /// (ff-sdk's `describe_edge` / `list_*_edges`) can assemble the
2153 /// struct despite the `#[non_exhaustive]` marker.
2154 #[allow(clippy::too_many_arguments)]
2155 pub fn new(
2156 edge_id: EdgeId,
2157 flow_id: FlowId,
2158 upstream_execution_id: ExecutionId,
2159 downstream_execution_id: ExecutionId,
2160 dependency_kind: String,
2161 satisfaction_condition: String,
2162 data_passing_ref: Option<String>,
2163 edge_state: String,
2164 created_at: TimestampMs,
2165 created_by: String,
2166 ) -> Self {
2167 Self {
2168 edge_id,
2169 flow_id,
2170 upstream_execution_id,
2171 downstream_execution_id,
2172 dependency_kind,
2173 satisfaction_condition,
2174 data_passing_ref,
2175 edge_state,
2176 created_at,
2177 created_by,
2178 }
2179 }
2180}
2181
2182// ─── RFC-016 edge-group policy (Stage A) ───
2183
2184/// Policy controlling how an inbound edge group's satisfaction is
2185/// decided.
2186///
2187/// Stage A honours only [`EdgeDependencyPolicy::AllOf`] — the two
2188/// quorum variants exist so the wire/snapshot surface is stable for
2189/// Stage B/C/D's resolver extensions, but
2190/// [`crate::engine_backend::EngineBackend::set_edge_group_policy`]
2191/// rejects them with [`crate::engine_error::EngineError::Validation`]
2192/// until Stage B lands.
2193///
2194/// `#[non_exhaustive]` — future stages may add variants (e.g.
2195/// `Threshold` — see RFC-016 §10.3) without a semver break. Construct
2196/// via the [`EdgeDependencyPolicy::all_of`], [`EdgeDependencyPolicy::any_of`],
2197/// and [`EdgeDependencyPolicy::quorum`] helpers.
2198#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2199#[non_exhaustive]
2200#[serde(tag = "kind", rename_all = "snake_case")]
2201pub enum EdgeDependencyPolicy {
2202 /// Today's behavior: every edge in the inbound group must be
2203 /// satisfied (RFC-007 `all_required` + `success_only`).
2204 AllOf,
2205 /// k-of-n where k==1 — satisfied on the first upstream success.
2206 /// Stage A: rejected on
2207 /// [`crate::engine_backend::EngineBackend::set_edge_group_policy`];
2208 /// resolver emits nothing for this variant yet.
2209 AnyOf {
2210 #[serde(rename = "on_satisfied")]
2211 on_satisfied: OnSatisfied,
2212 },
2213 /// k-of-n quorum. Stage A: rejected on
2214 /// [`crate::engine_backend::EngineBackend::set_edge_group_policy`].
2215 Quorum {
2216 k: u32,
2217 #[serde(rename = "on_satisfied")]
2218 on_satisfied: OnSatisfied,
2219 },
2220}
2221
2222impl EdgeDependencyPolicy {
2223 /// Construct the default all-of policy (RFC-007 behavior).
2224 pub fn all_of() -> Self {
2225 Self::AllOf
2226 }
2227
2228 /// Construct an any-of policy — reserved for Stage B.
2229 pub fn any_of(on_satisfied: OnSatisfied) -> Self {
2230 Self::AnyOf { on_satisfied }
2231 }
2232
2233 /// Construct a quorum policy — reserved for Stage B.
2234 pub fn quorum(k: u32, on_satisfied: OnSatisfied) -> Self {
2235 Self::Quorum { k, on_satisfied }
2236 }
2237
2238 /// Stable string label used for wire format + metric labels.
2239 /// `all_of` | `any_of` | `quorum`.
2240 pub fn variant_str(&self) -> &'static str {
2241 match self {
2242 Self::AllOf => "all_of",
2243 Self::AnyOf { .. } => "any_of",
2244 Self::Quorum { .. } => "quorum",
2245 }
2246 }
2247}
2248
2249/// Policy for unfinished sibling upstreams once the quorum is met.
2250///
2251/// `#[non_exhaustive]` — RFC-016 §10.5 rejects a third variant today
2252/// but keeps the door open. Construct via [`OnSatisfied::cancel_remaining`]
2253/// / [`OnSatisfied::let_run`].
2254#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2255#[non_exhaustive]
2256#[serde(rename_all = "snake_case")]
2257pub enum OnSatisfied {
2258 /// Default. Cancel any still-running siblings once quorum met.
2259 CancelRemaining,
2260 /// Let stragglers finish; their terminals update counters for
2261 /// observability only (one-shot downstream).
2262 LetRun,
2263}
2264
2265impl OnSatisfied {
2266 /// Construct the default `cancel_remaining` disposition.
2267 pub fn cancel_remaining() -> Self {
2268 Self::CancelRemaining
2269 }
2270
2271 /// Construct the `let_run` disposition.
2272 pub fn let_run() -> Self {
2273 Self::LetRun
2274 }
2275
2276 /// Stable string label for wire format.
2277 pub fn variant_str(&self) -> &'static str {
2278 match self {
2279 Self::CancelRemaining => "cancel_remaining",
2280 Self::LetRun => "let_run",
2281 }
2282 }
2283}
2284
2285/// Edge-group lifecycle state (Stage A exposes only `pending` +
2286/// `satisfied` + `impossible`; `cancelled` reserved for Stage C).
2287#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
2288#[non_exhaustive]
2289#[serde(rename_all = "snake_case")]
2290pub enum EdgeGroupState {
2291 Pending,
2292 Satisfied,
2293 Impossible,
2294 Cancelled,
2295}
2296
2297impl EdgeGroupState {
2298 pub fn from_literal(s: &str) -> Self {
2299 match s {
2300 "satisfied" => Self::Satisfied,
2301 "impossible" => Self::Impossible,
2302 "cancelled" => Self::Cancelled,
2303 _ => Self::Pending,
2304 }
2305 }
2306
2307 pub fn as_str(&self) -> &'static str {
2308 match self {
2309 Self::Pending => "pending",
2310 Self::Satisfied => "satisfied",
2311 Self::Impossible => "impossible",
2312 Self::Cancelled => "cancelled",
2313 }
2314 }
2315}
2316
2317/// Snapshot of one inbound edge group (per downstream execution).
2318///
2319/// Exposed via [`FlowSnapshot::edge_groups`]. Stage A only populates
2320/// `AllOf` groups and their counters; Stage B/C add `failed` /
2321/// `skipped` / `satisfied_at` wiring for the quorum variants.
2322///
2323/// `#[non_exhaustive]` — future stages will add fields (`satisfied_at`,
2324/// `failed_count` write-path, `cancel_siblings_pending`). Match with
2325/// `..` or use [`EdgeGroupSnapshot::new`].
2326#[derive(Clone, Debug, PartialEq, Eq)]
2327#[non_exhaustive]
2328pub struct EdgeGroupSnapshot {
2329 pub downstream_execution_id: ExecutionId,
2330 pub policy: EdgeDependencyPolicy,
2331 pub total_deps: u32,
2332 pub satisfied_count: u32,
2333 pub failed_count: u32,
2334 pub skipped_count: u32,
2335 pub running_count: u32,
2336 pub group_state: EdgeGroupState,
2337}
2338
2339impl EdgeGroupSnapshot {
2340 #[allow(clippy::too_many_arguments)]
2341 pub fn new(
2342 downstream_execution_id: ExecutionId,
2343 policy: EdgeDependencyPolicy,
2344 total_deps: u32,
2345 satisfied_count: u32,
2346 failed_count: u32,
2347 skipped_count: u32,
2348 running_count: u32,
2349 group_state: EdgeGroupState,
2350 ) -> Self {
2351 Self {
2352 downstream_execution_id,
2353 policy,
2354 total_deps,
2355 satisfied_count,
2356 failed_count,
2357 skipped_count,
2358 running_count,
2359 group_state,
2360 }
2361 }
2362}
2363
2364// ─── set_edge_group_policy (RFC-016 §6.1) ───
2365
2366#[derive(Clone, Debug, Serialize, Deserialize)]
2367pub struct SetEdgeGroupPolicyArgs {
2368 pub flow_id: FlowId,
2369 pub downstream_execution_id: ExecutionId,
2370 pub policy: EdgeDependencyPolicy,
2371 pub now: TimestampMs,
2372}
2373
2374#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2375pub enum SetEdgeGroupPolicyResult {
2376 /// Policy stored (fresh write).
2377 Set,
2378 /// Policy already stored with an identical value (idempotent).
2379 AlreadySet,
2380}
2381
2382// ─── list_flows (issue #185) ───
2383
2384/// Typed flow-lifecycle status surfaced on [`FlowSummary`].
2385///
2386/// Mirrors the free-form `public_flow_state` literal that FF's flow
2387/// lifecycle writes onto `flow_core` (known values: `open`, `running`,
2388/// `blocked`, `cancelled`, `completed`, `failed` — see [`FlowSnapshot`]).
2389/// The three "active" runtime states (`open`, `running`, `blocked`)
2390/// collapse to [`FlowStatus::Active`] here — callers that need the
2391/// exact runtime sub-state should use [`FlowSnapshot::public_flow_state`]
2392/// via [`crate::engine_backend::EngineBackend::describe_flow`]. `failed`
2393/// maps to [`FlowStatus::Failed`].
2394///
2395/// `#[non_exhaustive]` so future lifecycle states (if FF introduces
2396/// any) can be added without a semver break.
2397#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
2398#[non_exhaustive]
2399pub enum FlowStatus {
2400 /// `open` / `running` / `blocked` — flow is still live on the engine.
2401 Active,
2402 /// Terminal success: all members reached a successful terminal state
2403 /// and the flow projector flipped `public_flow_state` to `completed`.
2404 Completed,
2405 /// Terminal failure: one or more members failed and the flow
2406 /// projector flipped `public_flow_state` to `failed`.
2407 Failed,
2408 /// Cancelled by an operator via `cancel_flow`.
2409 Cancelled,
2410 /// The stored `public_flow_state` literal is present but not a
2411 /// known value. The raw literal is preserved on
2412 /// [`FlowSnapshot::public_flow_state`] — callers that need to act
2413 /// on it should fall back to [`crate::engine_backend::EngineBackend::describe_flow`].
2414 Unknown,
2415}
2416
2417impl FlowStatus {
2418 /// Map the raw `public_flow_state` literal stored on `flow_core`
2419 /// to a typed [`FlowStatus`]. Unknown literals surface as
2420 /// [`FlowStatus::Unknown`] so the list surface stays forwards-
2421 /// compatible with future engine-side state additions.
2422 pub fn from_public_flow_state(raw: &str) -> Self {
2423 match raw {
2424 "open" | "running" | "blocked" => Self::Active,
2425 "completed" => Self::Completed,
2426 "failed" => Self::Failed,
2427 "cancelled" => Self::Cancelled,
2428 _ => Self::Unknown,
2429 }
2430 }
2431}
2432
2433/// Lightweight per-flow projection returned by
2434/// [`crate::engine_backend::EngineBackend::list_flows`].
2435///
2436/// Deliberately narrower than [`FlowSnapshot`] — listing pages serve
2437/// dashboard-style enumerations where the caller does not want to pay
2438/// for the full `flow_core` hash on every row. Consumers that need
2439/// revision / node-count / tags / cancel metadata should fan out to
2440/// [`crate::engine_backend::EngineBackend::describe_flow`] for the
2441/// specific ids they care about.
2442///
2443/// `#[non_exhaustive]` — FF may add fields in minor releases without
2444/// a semver break. Match with `..` or use [`FlowSummary::new`].
2445#[derive(Clone, Debug, PartialEq, Eq)]
2446#[non_exhaustive]
2447pub struct FlowSummary {
2448 pub flow_id: FlowId,
2449 /// Timestamp (ms since unix epoch) `flow_core.created_at` was
2450 /// stamped. Mirrors [`FlowSnapshot::created_at`]; kept typed so
2451 /// callers that want raw millis can read `.0`.
2452 pub created_at: TimestampMs,
2453 /// Typed projection of `flow_core.public_flow_state`. See
2454 /// [`FlowStatus`] for the mapping.
2455 pub status: FlowStatus,
2456}
2457
2458impl FlowSummary {
2459 /// Construct a [`FlowSummary`]. Present so downstream crates can
2460 /// assemble the struct despite the `#[non_exhaustive]` marker.
2461 pub fn new(flow_id: FlowId, created_at: TimestampMs, status: FlowStatus) -> Self {
2462 Self {
2463 flow_id,
2464 created_at,
2465 status,
2466 }
2467 }
2468}
2469
2470/// One page of [`FlowSummary`] rows returned by
2471/// [`crate::engine_backend::EngineBackend::list_flows`].
2472///
2473/// `next_cursor` is `Some(last_flow_id)` when at least one more row
2474/// may exist on the partition — callers forward it verbatim as the
2475/// next call's `cursor` argument to continue iteration. `None` means
2476/// the listing is exhausted. Cursor semantics match the Postgres
2477/// `WHERE flow_id > $cursor ORDER BY flow_id LIMIT $limit` pattern
2478/// (see the trait method's rustdoc).
2479///
2480/// `#[non_exhaustive]` — FF may add summary-level fields (total count,
2481/// partition hint) in future minor releases without a semver break.
2482#[derive(Clone, Debug, PartialEq, Eq)]
2483#[non_exhaustive]
2484pub struct ListFlowsPage {
2485 pub flows: Vec<FlowSummary>,
2486 pub next_cursor: Option<FlowId>,
2487}
2488
2489impl ListFlowsPage {
2490 /// Construct a [`ListFlowsPage`]. Present so downstream crates can
2491 /// assemble the struct despite the `#[non_exhaustive]` marker.
2492 pub fn new(flows: Vec<FlowSummary>, next_cursor: Option<FlowId>) -> Self {
2493 Self { flows, next_cursor }
2494 }
2495}
2496
2497/// Summary of state after a mutation, returned by many functions.
2498#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2499pub struct StateSummary {
2500 pub state_vector: StateVector,
2501 pub current_attempt_index: AttemptIndex,
2502}
2503
2504#[cfg(test)]
2505mod tests {
2506 use super::*;
2507 use crate::types::FlowId;
2508
2509 #[test]
2510 fn create_execution_args_serde() {
2511 let config = crate::partition::PartitionConfig::default();
2512 let args = CreateExecutionArgs {
2513 execution_id: ExecutionId::for_flow(&FlowId::new(), &config),
2514 namespace: Namespace::new("test"),
2515 lane_id: LaneId::new("default"),
2516 execution_kind: "llm_call".to_owned(),
2517 input_payload: b"hello".to_vec(),
2518 payload_encoding: Some("json".to_owned()),
2519 priority: 0,
2520 creator_identity: "test-user".to_owned(),
2521 idempotency_key: None,
2522 tags: HashMap::new(),
2523 policy: None,
2524 delay_until: None,
2525 execution_deadline_at: None,
2526 partition_id: 42,
2527 now: TimestampMs::now(),
2528 };
2529 let json = serde_json::to_string(&args).unwrap();
2530 let parsed: CreateExecutionArgs = serde_json::from_str(&json).unwrap();
2531 assert_eq!(args.execution_id, parsed.execution_id);
2532 }
2533
2534 #[test]
2535 fn claim_result_serde() {
2536 let config = crate::partition::PartitionConfig::default();
2537 let result = ClaimExecutionResult::Claimed(ClaimedExecution {
2538 execution_id: ExecutionId::for_flow(&FlowId::new(), &config),
2539 lease_id: LeaseId::new(),
2540 lease_epoch: LeaseEpoch::new(1),
2541 attempt_index: AttemptIndex::new(0),
2542 attempt_id: AttemptId::new(),
2543 attempt_type: AttemptType::Initial,
2544 lease_expires_at: TimestampMs::from_millis(1000),
2545 });
2546 let json = serde_json::to_string(&result).unwrap();
2547 let parsed: ClaimExecutionResult = serde_json::from_str(&json).unwrap();
2548 assert_eq!(result, parsed);
2549 }
2550
2551 // ── StreamCursor (issue #92) ──
2552
2553 #[test]
2554 fn stream_cursor_display_matches_wire_tokens() {
2555 assert_eq!(StreamCursor::Start.to_string(), "start");
2556 assert_eq!(StreamCursor::End.to_string(), "end");
2557 assert_eq!(StreamCursor::At("123".into()).to_string(), "123");
2558 assert_eq!(StreamCursor::At("123-4".into()).to_string(), "123-4");
2559 }
2560
2561 #[test]
2562 fn stream_cursor_to_wire_maps_to_valkey_markers() {
2563 assert_eq!(StreamCursor::Start.to_wire(), "-");
2564 assert_eq!(StreamCursor::End.to_wire(), "+");
2565 assert_eq!(StreamCursor::At("0-0".into()).to_wire(), "0-0");
2566 assert_eq!(StreamCursor::At("17-3".into()).to_wire(), "17-3");
2567 }
2568
2569 #[test]
2570 fn stream_cursor_from_str_accepts_wire_tokens() {
2571 use std::str::FromStr;
2572 assert_eq!(
2573 StreamCursor::from_str("start").unwrap(),
2574 StreamCursor::Start
2575 );
2576 assert_eq!(StreamCursor::from_str("end").unwrap(), StreamCursor::End);
2577 assert_eq!(
2578 StreamCursor::from_str("123").unwrap(),
2579 StreamCursor::At("123".into())
2580 );
2581 assert_eq!(
2582 StreamCursor::from_str("0-0").unwrap(),
2583 StreamCursor::At("0-0".into())
2584 );
2585 assert_eq!(
2586 StreamCursor::from_str("1713100800150-0").unwrap(),
2587 StreamCursor::At("1713100800150-0".into())
2588 );
2589 }
2590
2591 #[test]
2592 fn stream_cursor_from_str_rejects_bare_markers() {
2593 use std::str::FromStr;
2594 assert!(matches!(
2595 StreamCursor::from_str("-"),
2596 Err(StreamCursorParseError::BareMarkerRejected(s)) if s == "-"
2597 ));
2598 assert!(matches!(
2599 StreamCursor::from_str("+"),
2600 Err(StreamCursorParseError::BareMarkerRejected(s)) if s == "+"
2601 ));
2602 }
2603
2604 #[test]
2605 fn stream_cursor_from_str_rejects_empty() {
2606 use std::str::FromStr;
2607 assert_eq!(
2608 StreamCursor::from_str(""),
2609 Err(StreamCursorParseError::Empty)
2610 );
2611 }
2612
2613 #[test]
2614 fn stream_cursor_from_str_rejects_malformed() {
2615 use std::str::FromStr;
2616 for bad in [
2617 "abc", "-1", "1-", "-1-2", "1-2-3", "1.2", "1 2", "Start", "END",
2618 ] {
2619 assert!(
2620 matches!(
2621 StreamCursor::from_str(bad),
2622 Err(StreamCursorParseError::Malformed(_))
2623 ),
2624 "must reject {bad:?}",
2625 );
2626 }
2627 }
2628
2629 #[test]
2630 fn stream_cursor_from_str_rejects_non_ascii() {
2631 use std::str::FromStr;
2632 assert!(matches!(
2633 StreamCursor::from_str("1\u{2013}2"),
2634 Err(StreamCursorParseError::Malformed(_))
2635 ));
2636 }
2637
2638 #[test]
2639 fn stream_cursor_serde_round_trip() {
2640 for c in [
2641 StreamCursor::Start,
2642 StreamCursor::End,
2643 StreamCursor::At("0-0".into()),
2644 StreamCursor::At("1713100800150-0".into()),
2645 ] {
2646 let json = serde_json::to_string(&c).unwrap();
2647 let back: StreamCursor = serde_json::from_str(&json).unwrap();
2648 assert_eq!(back, c);
2649 }
2650 }
2651
2652 #[test]
2653 fn stream_cursor_serializes_as_bare_string() {
2654 assert_eq!(
2655 serde_json::to_string(&StreamCursor::Start).unwrap(),
2656 r#""start""#
2657 );
2658 assert_eq!(
2659 serde_json::to_string(&StreamCursor::End).unwrap(),
2660 r#""end""#
2661 );
2662 assert_eq!(
2663 serde_json::to_string(&StreamCursor::At("123-0".into())).unwrap(),
2664 r#""123-0""#
2665 );
2666 }
2667
2668 #[test]
2669 fn stream_cursor_deserialize_rejects_bare_markers() {
2670 assert!(serde_json::from_str::<StreamCursor>(r#""-""#).is_err());
2671 assert!(serde_json::from_str::<StreamCursor>(r#""+""#).is_err());
2672 }
2673
2674 #[test]
2675 fn stream_cursor_from_beginning_is_zero_zero() {
2676 assert_eq!(
2677 StreamCursor::from_beginning(),
2678 StreamCursor::At("0-0".into())
2679 );
2680 }
2681
2682 #[test]
2683 fn stream_cursor_is_concrete_classifies_variants() {
2684 assert!(!StreamCursor::Start.is_concrete());
2685 assert!(!StreamCursor::End.is_concrete());
2686 assert!(StreamCursor::At("0-0".into()).is_concrete());
2687 assert!(StreamCursor::At("123-0".into()).is_concrete());
2688 assert!(StreamCursor::from_beginning().is_concrete());
2689 }
2690
2691 #[test]
2692 fn stream_cursor_into_wire_string_moves_without_cloning() {
2693 assert_eq!(StreamCursor::Start.into_wire_string(), "-");
2694 assert_eq!(StreamCursor::End.into_wire_string(), "+");
2695 assert_eq!(StreamCursor::At("17-3".into()).into_wire_string(), "17-3");
2696 }
2697}
2698
2699// ─── list_executions ───
2700
2701/// Summary of an execution for list views.
2702#[derive(Clone, Debug, Serialize, Deserialize)]
2703pub struct ExecutionSummary {
2704 pub execution_id: ExecutionId,
2705 pub namespace: String,
2706 pub lane_id: String,
2707 pub execution_kind: String,
2708 pub public_state: String,
2709 pub priority: i32,
2710 pub created_at: String,
2711}
2712
2713/// Result of a list_executions query.
2714#[derive(Clone, Debug, Serialize, Deserialize)]
2715pub struct ListExecutionsResult {
2716 pub executions: Vec<ExecutionSummary>,
2717 pub total_returned: usize,
2718}
2719
2720// ─── list_lanes (issue #184) ───
2721
2722/// One page of lane ids returned by
2723/// [`crate::engine_backend::EngineBackend::list_lanes`].
2724///
2725/// Lanes are global (not partition-scoped) — the backend enumerates
2726/// every registered lane, sorts by [`LaneId`] name, and returns a
2727/// `limit`-sized slice starting after `cursor` (exclusive).
2728///
2729/// `next_cursor` is `Some(last_lane_in_page)` when more pages remain
2730/// and `None` when the caller has read the final page. Callers that
2731/// want the full list loop until `next_cursor` is `None`, threading
2732/// the previous page's `next_cursor` into the next call's `cursor`
2733/// argument.
2734///
2735/// `#[non_exhaustive]` — FF may add fields (e.g. a `total` hint) in
2736/// minor releases without a semver break.
2737#[derive(Clone, Debug, PartialEq, Eq)]
2738#[non_exhaustive]
2739pub struct ListLanesPage {
2740 /// The lanes in this page, sorted by [`LaneId`] name.
2741 pub lanes: Vec<LaneId>,
2742 /// Cursor for the next page (exclusive). `None` ⇒ final page.
2743 pub next_cursor: Option<LaneId>,
2744}
2745
2746impl ListLanesPage {
2747 /// Construct a [`ListLanesPage`]. Present so downstream crates
2748 /// (ff-backend-valkey's `list_lanes` impl) can assemble the
2749 /// struct despite the `#[non_exhaustive]` marker.
2750 pub fn new(lanes: Vec<LaneId>, next_cursor: Option<LaneId>) -> Self {
2751 Self { lanes, next_cursor }
2752 }
2753}
2754
2755// ─── list_suspended ───
2756
2757/// One entry in a [`ListSuspendedPage`] — a suspended execution and
2758/// the reason it is blocked, answering an operator's "what's this
2759/// waiting on?" without a follow-up round-trip.
2760///
2761/// `reason` carries the free-form `reason_code` recorded by the
2762/// suspending worker at `lua/suspension.lua` (HSET `suspension:current
2763/// reason_code`). It is a `String`, not a closed enum: the suspension
2764/// pipeline accepts arbitrary caller-supplied codes (typical values
2765/// are `"signal"`, `"timer"`, `"children"`, `"join"`, but consumers
2766/// embed bespoke codes). A future enum projection can classify
2767/// known codes once the set is frozen; until then, callers that want
2768/// structured routing MUST match on the string explicitly.
2769#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2770#[non_exhaustive]
2771pub struct SuspendedExecutionEntry {
2772 /// Execution currently in `lifecycle_phase=suspended`.
2773 pub execution_id: ExecutionId,
2774 /// Score stored on the per-lane suspended ZSET — the scheduled
2775 /// `timeout_at` in milliseconds, or the `9999999999999` sentinel
2776 /// when no timeout was set (see `lua/suspension.lua`).
2777 pub suspended_at_ms: i64,
2778 /// Free-form reason code from `suspension:current.reason_code`.
2779 /// Empty string when the suspension hash is absent or does not
2780 /// carry a `reason_code` field (older records). See the struct
2781 /// rustdoc for the deliberate-String rationale.
2782 pub reason: String,
2783}
2784
2785impl SuspendedExecutionEntry {
2786 /// Construct a new entry. Preferred over direct field init for
2787 /// `#[non_exhaustive]` forward-compat.
2788 pub fn new(execution_id: ExecutionId, suspended_at_ms: i64, reason: String) -> Self {
2789 Self {
2790 execution_id,
2791 suspended_at_ms,
2792 reason,
2793 }
2794 }
2795}
2796
2797/// One cursor-paginated page of suspended executions.
2798///
2799/// Pagination is cursor-based (not offset/limit) so a Valkey backend
2800/// can resume a partition scan from the last seen execution id and a
2801/// future Postgres backend can do keyset pagination on
2802/// `executions WHERE state='suspended'`. The cursor is opaque to
2803/// callers: pass `next_cursor` back as the `cursor` argument to the
2804/// next [`EngineBackend::list_suspended`] call to fetch the next
2805/// page. `None` means the stream is exhausted.
2806///
2807/// [`EngineBackend::list_suspended`]: crate::engine_backend::EngineBackend::list_suspended
2808#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2809#[non_exhaustive]
2810pub struct ListSuspendedPage {
2811 /// Entries on this page, ordered by ascending `suspended_at_ms`
2812 /// (timeout order) with `execution_id` as a lex tiebreak.
2813 pub entries: Vec<SuspendedExecutionEntry>,
2814 /// Resume-point for the next page. `None` when no further
2815 /// entries remain in the partition.
2816 pub next_cursor: Option<ExecutionId>,
2817}
2818
2819impl ListSuspendedPage {
2820 /// Construct a new page. Preferred over direct field init for
2821 /// `#[non_exhaustive]` forward-compat.
2822 pub fn new(entries: Vec<SuspendedExecutionEntry>, next_cursor: Option<ExecutionId>) -> Self {
2823 Self {
2824 entries,
2825 next_cursor,
2826 }
2827 }
2828}
2829
2830// ─── list_executions ───
2831
2832/// One page of partition-scoped execution ids returned by
2833/// [`EngineBackend::list_executions`](crate::engine_backend::EngineBackend::list_executions).
2834///
2835/// Pagination is forward-only and cursor-based. `next_cursor` carries
2836/// the last `ExecutionId` emitted in `executions` iff another page is
2837/// available; callers pass that id back as the next call's `cursor`
2838/// (exclusive start). `next_cursor = None` signals end-of-stream.
2839///
2840/// `#[non_exhaustive]` — FF may add fields (e.g. `approximate_total`)
2841/// in minor releases without a semver break. Use
2842/// [`ListExecutionsPage::new`] for cross-crate construction.
2843#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2844#[non_exhaustive]
2845pub struct ListExecutionsPage {
2846 /// Execution ids on this page, in ascending lexicographic order.
2847 pub executions: Vec<ExecutionId>,
2848 /// Exclusive cursor to request the next page. `None` ⇒ no more
2849 /// results.
2850 pub next_cursor: Option<ExecutionId>,
2851}
2852
2853impl ListExecutionsPage {
2854 /// Construct a [`ListExecutionsPage`]. Present so downstream
2855 /// crates can assemble the struct despite the `#[non_exhaustive]`
2856 /// marker.
2857 pub fn new(executions: Vec<ExecutionId>, next_cursor: Option<ExecutionId>) -> Self {
2858 Self { executions, next_cursor }
2859 }
2860}
2861
2862// ─── rotate_waitpoint_hmac_secret ───
2863
2864/// Args for `ff_rotate_waitpoint_hmac_secret`. Rotates the HMAC signing
2865/// kid on ONE partition. Callers fan out across every partition themselves
2866/// (ff-server does the parallel fan-out in `rotate_waitpoint_secret`;
2867/// direct-Valkey consumers mirror the pattern).
2868///
2869/// "now" is derived server-side from `redis.call("TIME")` inside the FCALL
2870/// (consistency with `validate_waitpoint_token` and flow scanners).
2871/// `grace_ms` is a duration, not a clock value, so carrying it from the
2872/// caller is safe.
2873#[derive(Clone, Debug)]
2874pub struct RotateWaitpointHmacSecretArgs {
2875 pub new_kid: String,
2876 pub new_secret_hex: String,
2877 /// Grace window in ms. Must be non-negative. Tokens signed by the
2878 /// outgoing kid remain valid for `grace_ms` after this rotation.
2879 pub grace_ms: u64,
2880}
2881
2882/// Outcome of a single-partition rotation.
2883#[derive(Clone, Debug, PartialEq, Eq)]
2884pub enum RotateWaitpointHmacSecretOutcome {
2885 /// Installed the new kid. `previous_kid` is `None` on bootstrap
2886 /// (no prior `current_kid`). `gc_count` counts expired kids reaped
2887 /// during this rotation.
2888 Rotated {
2889 previous_kid: Option<String>,
2890 new_kid: String,
2891 gc_count: u32,
2892 },
2893 /// Exact replay — same kid + same secret already installed. Safe
2894 /// operator retry; no state change.
2895 Noop { kid: String },
2896}
2897
2898// ─── list_waitpoint_hmac_kids ───
2899
2900#[derive(Clone, Debug, PartialEq, Eq)]
2901pub struct ListWaitpointHmacKidsArgs {}
2902
2903/// Snapshot of the waitpoint HMAC keystore on ONE partition.
2904#[derive(Clone, Debug, PartialEq, Eq)]
2905pub struct WaitpointHmacKids {
2906 /// The currently-signing kid. `None` if uninitialized.
2907 pub current_kid: Option<String>,
2908 /// Kids that still validate existing tokens but no longer sign
2909 /// new ones. Order is Lua HGETALL traversal order — callers that
2910 /// need a stable sort should sort by `expires_at_ms`.
2911 pub verifying: Vec<VerifyingKid>,
2912}
2913
2914#[derive(Clone, Debug, PartialEq, Eq)]
2915pub struct VerifyingKid {
2916 pub kid: String,
2917 pub expires_at_ms: i64,
2918}
2919
2920// ═══════════════════════════════════════════════════════════════════════
2921// RFC-013 Stage 1d: EngineBackend::suspend typed args + outcome
2922// ═══════════════════════════════════════════════════════════════════════
2923//
2924// `SuspendExecutionArgs` / `SuspendExecutionResult` above remain the
2925// wire-level Lua-ARGV mirror used by the backend serializer. The types
2926// below are the public trait-surface shapes RFC-013 §2.2–§2.6 specifies.
2927//
2928// Every type in this block is `#[non_exhaustive]` per the RFC §2.2.1
2929// memory-rule compliance note; each gets a constructor so external-crate
2930// consumers can build them without struct-literal access.
2931
2932use crate::backend::WaitpointHmac;
2933
2934/// Partition-scoped idempotency key for retry-safe `EngineBackend::suspend`.
2935///
2936/// See RFC-013 §2.2 — when set on [`SuspendArgs::idempotency_key`], the
2937/// backend dedups the call on `(partition, execution_id, idempotency_key)`
2938/// and a second `suspend` with the same triple returns the first call's
2939/// [`SuspendOutcome`] verbatim. Absent a key, `suspend` is NOT retry-
2940/// idempotent; callers must describe-and-reconcile per §3.1.
2941///
2942/// Follows the `UsageDimensions::dedup_key` pattern — opaque to the
2943/// engine, byte-compared at the partition scope.
2944#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
2945#[serde(transparent)]
2946pub struct IdempotencyKey(String);
2947
2948impl IdempotencyKey {
2949 /// Construct from any stringy input. Empty strings are accepted;
2950 /// the backend treats an empty key as "no dedup" at the serialize
2951 /// step so `Some(IdempotencyKey::new(""))` is functionally the same
2952 /// as `None`.
2953 pub fn new(key: impl Into<String>) -> Self {
2954 Self(key.into())
2955 }
2956
2957 /// Borrow the underlying string.
2958 pub fn as_str(&self) -> &str {
2959 &self.0
2960 }
2961}
2962
2963impl std::fmt::Display for IdempotencyKey {
2964 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2965 f.write_str(&self.0)
2966 }
2967}
2968
2969/// v1 signal-match predicate inside [`ResumeCondition::Single`].
2970///
2971/// RFC-013 §2.4 — `ByName(String)` matches a single concrete signal
2972/// name; `Wildcard` matches any delivered signal. RFC-014 may extend
2973/// (payload predicates, pattern matching) — `#[non_exhaustive]`.
2974#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2975#[non_exhaustive]
2976pub enum SignalMatcher {
2977 /// Match by exact signal name.
2978 ByName(String),
2979 /// Match any signal delivered to the waitpoint.
2980 Wildcard,
2981}
2982
2983/// Hard cap on composite-condition nesting depth (RFC-014 §5.4
2984/// invariant 4; §5.5 cap rationale). Soft-cap: bumping requires only
2985/// this constant + the cap-rationale paragraph in RFC-014 §5.5 — no
2986/// wire-format change. Keep in sync.
2987pub const MAX_COMPOSITE_DEPTH: usize = 4;
2988
2989/// RFC-013 reserves this enum slot; RFC-014 populates it with the
2990/// concrete composition vocabulary (`AllOf` + `Count`). The enum is
2991/// `#[non_exhaustive]` so RFC-016 or later RFCs may add variants
2992/// (`AnyOf` has been explicitly rejected per RFC-014 §2.3 in favour of
2993/// `Count { n: 1, .. }`; the guard exists for orthogonal future work).
2994#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2995#[serde(tag = "kind")]
2996#[non_exhaustive]
2997pub enum CompositeBody {
2998 /// All listed sub-conditions must be satisfied. Order-independent.
2999 /// Once satisfied, further signals to member waitpoints are observed
3000 /// but do not re-open satisfaction. RFC-014 §2.1.
3001 AllOf {
3002 members: Vec<ResumeCondition>,
3003 },
3004 /// At least `n` distinct satisfiers (by [`CountKind`]) must match.
3005 /// `matcher` optionally constrains participating signals; `None`
3006 /// lets any signal on any of `waitpoints` count. RFC-014 §2.1.
3007 Count {
3008 n: u32,
3009 count_kind: CountKind,
3010 #[serde(default, skip_serializing_if = "Option::is_none")]
3011 matcher: Option<SignalMatcher>,
3012 waitpoints: Vec<String>,
3013 },
3014}
3015
3016/// How `Count` nodes distinguish satisfiers. RFC-014 §2.1 + §3.2.
3017#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
3018#[non_exhaustive]
3019pub enum CountKind {
3020 /// n distinct `waitpoint_id`s in `waitpoints` must fire.
3021 DistinctWaitpoints,
3022 /// n distinct `signal_id`s across the waitpoint set.
3023 DistinctSignals,
3024 /// n distinct `source_type:source_identity` tuples.
3025 DistinctSources,
3026}
3027
3028impl CompositeBody {
3029 /// `AllOf { members }` constructor (RFC-014 §10.3 SDK surface).
3030 pub fn all_of(members: impl IntoIterator<Item = ResumeCondition>) -> Self {
3031 Self::AllOf {
3032 members: members.into_iter().collect(),
3033 }
3034 }
3035
3036 /// `Count` constructor with explicit kind + waitpoint set.
3037 pub fn count(
3038 n: u32,
3039 count_kind: CountKind,
3040 matcher: Option<SignalMatcher>,
3041 waitpoints: impl IntoIterator<Item = String>,
3042 ) -> Self {
3043 Self::Count {
3044 n,
3045 count_kind,
3046 matcher,
3047 waitpoints: waitpoints.into_iter().collect(),
3048 }
3049 }
3050}
3051
3052/// Declarative resume condition for [`SuspendArgs::resume_condition`].
3053///
3054/// RFC-013 §2.4 — typed replacement for the SDK's former
3055/// `ConditionMatcher` / `resume_condition_json` pair.
3056#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
3057#[non_exhaustive]
3058pub enum ResumeCondition {
3059 /// Single waitpoint-key match with a predicate. `matcher` is
3060 /// evaluated against every signal delivered to `waitpoint_key`.
3061 Single {
3062 waitpoint_key: String,
3063 matcher: SignalMatcher,
3064 },
3065 /// Operator-only resume — no signal satisfies; only an explicit
3066 /// operator resume closes the waitpoint.
3067 OperatorOnly,
3068 /// Pure-timeout suspension. No signal satisfier; the waitpoint
3069 /// resolves only via `timeout_behavior` at `timeout_at`. Requires
3070 /// `SuspendArgs::timeout_at` to be `Some(_)` — otherwise the
3071 /// Rust-side validator rejects as `timeout_only_without_deadline`.
3072 TimeoutOnly,
3073 /// Multi-condition composition; RFC-014 defines the body.
3074 Composite(CompositeBody),
3075}
3076
3077/// RFC-014 §5.1 validation error shape. Emitted by
3078/// [`ResumeCondition::validate_composite`] when a composite fails a
3079/// structural / cardinality invariant at suspend-time, before any Valkey
3080/// call. Carries a human-readable `detail` per §5.1.1.
3081#[derive(Clone, Debug, PartialEq, Eq)]
3082pub struct CompositeValidationError {
3083 pub detail: String,
3084}
3085
3086impl CompositeValidationError {
3087 fn new(detail: impl Into<String>) -> Self {
3088 Self {
3089 detail: detail.into(),
3090 }
3091 }
3092}
3093
3094impl ResumeCondition {
3095 /// RFC-014 §10.3 builder — `AllOf` across N distinct waitpoints,
3096 /// each member a `Single { matcher: Wildcard }` leaf. Canonical
3097 /// Pattern 3 shape for heterogeneous-subsystem "all fired"
3098 /// semantics (e.g. `db-migration-complete` + `cache-warmed` +
3099 /// `feature-flag-set`).
3100 ///
3101 /// Callers that need per-waitpoint matchers should construct the
3102 /// tree directly via
3103 /// [`ResumeCondition::Composite(CompositeBody::all_of(..))`].
3104 pub fn all_of_waitpoints<I, S>(waitpoint_keys: I) -> Self
3105 where
3106 I: IntoIterator<Item = S>,
3107 S: Into<String>,
3108 {
3109 let members: Vec<ResumeCondition> = waitpoint_keys
3110 .into_iter()
3111 .map(|k| ResumeCondition::Single {
3112 waitpoint_key: k.into(),
3113 matcher: SignalMatcher::Wildcard,
3114 })
3115 .collect();
3116 ResumeCondition::Composite(CompositeBody::AllOf { members })
3117 }
3118
3119 /// Collect every distinct `waitpoint_key` the condition targets.
3120 /// Used at suspend-time to validate the condition's wp set against
3121 /// `SuspendArgs.waitpoints` (RFC-014 §5.1 multi-binding cross-
3122 /// check). Order follows tree DFS, de-duplicated preserving first
3123 /// occurrence.
3124 pub fn referenced_waitpoint_keys(&self) -> Vec<String> {
3125 let mut out: Vec<String> = Vec::new();
3126 let mut push = |k: &str| {
3127 if !out.iter().any(|e| e == k) {
3128 out.push(k.to_owned());
3129 }
3130 };
3131 fn walk(cond: &ResumeCondition, push: &mut dyn FnMut(&str)) {
3132 match cond {
3133 ResumeCondition::Single { waitpoint_key, .. } => push(waitpoint_key),
3134 ResumeCondition::Composite(body) => walk_body(body, push),
3135 _ => {}
3136 }
3137 }
3138 fn walk_body(body: &CompositeBody, push: &mut dyn FnMut(&str)) {
3139 match body {
3140 CompositeBody::AllOf { members } => {
3141 for m in members {
3142 walk(m, push);
3143 }
3144 }
3145 CompositeBody::Count { waitpoints, .. } => {
3146 for w in waitpoints {
3147 push(w.as_str());
3148 }
3149 }
3150 }
3151 }
3152 walk(self, &mut push);
3153 out
3154 }
3155
3156 /// Validate RFC-014 structural invariants on a composite condition.
3157 /// Single / OperatorOnly / TimeoutOnly return Ok — they carry no
3158 /// composite body. Checks cover:
3159 /// * `AllOf { members: [] }` — §5.1 `allof_empty_members`
3160 /// * `Count { n: 0 }` — §5.1 `count_n_zero`
3161 /// * `Count { waitpoints: [] }` — §5.1 `count_waitpoints_empty`
3162 /// * `Count { n > waitpoints.len(), DistinctWaitpoints }` — §5.1
3163 /// `count_exceeds_waitpoint_set`
3164 /// * depth > [`MAX_COMPOSITE_DEPTH`] — §5.1 `condition_depth_exceeded`
3165 pub fn validate_composite(&self) -> Result<(), CompositeValidationError> {
3166 match self {
3167 ResumeCondition::Composite(body) => validate_body(body, 1, ""),
3168 _ => Ok(()),
3169 }
3170 }
3171}
3172
3173fn validate_body(
3174 body: &CompositeBody,
3175 depth: usize,
3176 path: &str,
3177) -> Result<(), CompositeValidationError> {
3178 if depth > MAX_COMPOSITE_DEPTH {
3179 return Err(CompositeValidationError::new(format!(
3180 "depth {} exceeds cap {} at path {}",
3181 depth,
3182 MAX_COMPOSITE_DEPTH,
3183 if path.is_empty() { "<root>" } else { path }
3184 )));
3185 }
3186 match body {
3187 CompositeBody::AllOf { members } => {
3188 if members.is_empty() {
3189 return Err(CompositeValidationError::new(format!(
3190 "allof_empty_members at path {}",
3191 if path.is_empty() { "<root>" } else { path }
3192 )));
3193 }
3194 for (i, m) in members.iter().enumerate() {
3195 let child_path = if path.is_empty() {
3196 format!("members[{i}]")
3197 } else {
3198 format!("{path}.members[{i}]")
3199 };
3200 if let ResumeCondition::Composite(inner) = m {
3201 validate_body(inner, depth + 1, &child_path)?;
3202 }
3203 // Leaf `Single` / operator / timeout needs no further
3204 // structural checks — RFC-013 already constrains them.
3205 }
3206 Ok(())
3207 }
3208 CompositeBody::Count {
3209 n,
3210 count_kind,
3211 waitpoints,
3212 ..
3213 } => {
3214 if *n == 0 {
3215 return Err(CompositeValidationError::new(format!(
3216 "count_n_zero at path {}",
3217 if path.is_empty() { "<root>" } else { path }
3218 )));
3219 }
3220 if waitpoints.is_empty() {
3221 return Err(CompositeValidationError::new(format!(
3222 "count_waitpoints_empty at path {}",
3223 if path.is_empty() { "<root>" } else { path }
3224 )));
3225 }
3226 if matches!(count_kind, CountKind::DistinctWaitpoints)
3227 && (*n as usize) > waitpoints.len()
3228 {
3229 return Err(CompositeValidationError::new(format!(
3230 "count_exceeds_waitpoint_set: n={} > waitpoints.len()={} at path {}",
3231 n,
3232 waitpoints.len(),
3233 if path.is_empty() { "<root>" } else { path }
3234 )));
3235 }
3236 Ok(())
3237 }
3238 }
3239}
3240
3241/// Where a satisfied suspension routes back to.
3242///
3243/// v1 ships only [`ResumeTarget::Runnable`] — execution returns to
3244/// `runnable` and goes through normal scheduling.
3245#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
3246#[non_exhaustive]
3247pub enum ResumeTarget {
3248 Runnable,
3249}
3250
3251/// Resume-side policy carried alongside [`ResumeCondition`].
3252///
3253/// RFC-013 §2.5 — what happens when the condition is satisfied. Fields
3254/// mirror the `resume_policy_json` the backend serializer writes to Lua
3255/// (RFC-004 §Resume policy fields).
3256#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
3257#[non_exhaustive]
3258pub struct ResumePolicy {
3259 pub resume_target: ResumeTarget,
3260 pub consume_matched_signals: bool,
3261 pub retain_signal_buffer_until_closed: bool,
3262 #[serde(default, skip_serializing_if = "Option::is_none")]
3263 pub resume_delay_ms: Option<u64>,
3264 pub close_waitpoint_on_resume: bool,
3265}
3266
3267impl ResumePolicy {
3268 /// Canonical v1 defaults (RFC-013 §2.2.1):
3269 /// * `resume_target = Runnable`
3270 /// * `consume_matched_signals = true`
3271 /// * `retain_signal_buffer_until_closed = false`
3272 /// * `resume_delay_ms = None`
3273 /// * `close_waitpoint_on_resume = true`
3274 pub fn normal() -> Self {
3275 Self {
3276 resume_target: ResumeTarget::Runnable,
3277 consume_matched_signals: true,
3278 retain_signal_buffer_until_closed: false,
3279 resume_delay_ms: None,
3280 close_waitpoint_on_resume: true,
3281 }
3282 }
3283}
3284
3285/// Timeout behavior at the suspension deadline (RFC-004 §Timeout Behavior).
3286#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
3287#[non_exhaustive]
3288pub enum TimeoutBehavior {
3289 Fail,
3290 Cancel,
3291 Expire,
3292 AutoResumeWithTimeoutSignal,
3293 /// v2 per RFC-004 Implementation Notes; enum slot present for
3294 /// additive RFC-014/RFC-015 landing.
3295 Escalate,
3296}
3297
3298impl TimeoutBehavior {
3299 /// Lua-side string encoding. Matches the wire values Lua's
3300 /// `ff_expire_suspension` matches on.
3301 pub fn as_wire_str(self) -> &'static str {
3302 match self {
3303 Self::Fail => "fail",
3304 Self::Cancel => "cancel",
3305 Self::Expire => "expire",
3306 Self::AutoResumeWithTimeoutSignal => "auto_resume_with_timeout_signal",
3307 Self::Escalate => "escalate",
3308 }
3309 }
3310}
3311
3312/// Reason category for a suspension (RFC-004 §Suspension Reason Categories).
3313#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
3314#[non_exhaustive]
3315pub enum SuspensionReasonCode {
3316 WaitingForSignal,
3317 WaitingForApproval,
3318 WaitingForCallback,
3319 WaitingForToolResult,
3320 WaitingForOperatorReview,
3321 PausedByPolicy,
3322 PausedByBudget,
3323 StepBoundary,
3324 ManualPause,
3325}
3326
3327impl SuspensionReasonCode {
3328 pub fn as_wire_str(self) -> &'static str {
3329 match self {
3330 Self::WaitingForSignal => "waiting_for_signal",
3331 Self::WaitingForApproval => "waiting_for_approval",
3332 Self::WaitingForCallback => "waiting_for_callback",
3333 Self::WaitingForToolResult => "waiting_for_tool_result",
3334 Self::WaitingForOperatorReview => "waiting_for_operator_review",
3335 Self::PausedByPolicy => "paused_by_policy",
3336 Self::PausedByBudget => "paused_by_budget",
3337 Self::StepBoundary => "step_boundary",
3338 Self::ManualPause => "manual_pause",
3339 }
3340 }
3341}
3342
3343/// Who requested the suspension.
3344#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
3345#[non_exhaustive]
3346pub enum SuspensionRequester {
3347 Worker,
3348 Operator,
3349 Policy,
3350 SystemTimeoutPolicy,
3351}
3352
3353impl SuspensionRequester {
3354 pub fn as_wire_str(self) -> &'static str {
3355 match self {
3356 Self::Worker => "worker",
3357 Self::Operator => "operator",
3358 Self::Policy => "policy",
3359 Self::SystemTimeoutPolicy => "system_timeout_policy",
3360 }
3361 }
3362}
3363
3364/// How the waitpoint resource backing a [`SuspendArgs`] is obtained.
3365///
3366/// RFC-013 §2.2 — `Fresh` mints a new waitpoint as part of `suspend`;
3367/// `UsePending` activates a waitpoint previously issued via
3368/// `EngineBackend::create_waitpoint`.
3369#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
3370#[non_exhaustive]
3371pub enum WaitpointBinding {
3372 Fresh {
3373 waitpoint_id: WaitpointId,
3374 waitpoint_key: String,
3375 },
3376 UsePending {
3377 waitpoint_id: WaitpointId,
3378 },
3379}
3380
3381impl WaitpointBinding {
3382 /// Mint a fresh binding with a random `waitpoint_id` (UUID v4) and
3383 /// `waitpoint_key = "wpk:<uuid>"`.
3384 pub fn fresh() -> Self {
3385 let wp_id = WaitpointId::new();
3386 let key = format!("wpk:{wp_id}");
3387 Self::Fresh {
3388 waitpoint_id: wp_id,
3389 waitpoint_key: key,
3390 }
3391 }
3392
3393 /// Construct a `UsePending` binding from a pending waitpoint
3394 /// previously issued by `create_waitpoint`. The HMAC token is
3395 /// resolved Lua-side from the partition's waitpoint hash at
3396 /// `suspend` time (RFC-013 §5.1).
3397 pub fn use_pending(pending: &crate::backend::PendingWaitpoint) -> Self {
3398 Self::UsePending {
3399 waitpoint_id: pending.waitpoint_id.clone(),
3400 }
3401 }
3402}
3403
3404/// Trait-surface input to [`EngineBackend::suspend`] (RFC-013 §2.2 +
3405/// RFC-014 Pattern 3 widening).
3406///
3407/// Built via [`SuspendArgs::new`] + `with_*` setters; direct struct-
3408/// literal construction across crate boundaries is not possible
3409/// (`#[non_exhaustive]`).
3410///
3411/// ## Waitpoints
3412///
3413/// `waitpoints` is a non-empty `Vec<WaitpointBinding>`. The first entry
3414/// is the "primary" binding (accessible via [`primary`](Self::primary))
3415/// and carries the `current_waitpoint_id` written onto `exec_core` for
3416/// operator visibility. Additional entries land in Valkey as their own
3417/// waitpoint hashes / signal streams / HMAC tokens, enabling RFC-014
3418/// Pattern 3 `AllOf { members: [Single{wp1}, Single{wp2}, ...] }` across
3419/// distinct heterogeneous subsystems.
3420///
3421/// [`SuspendArgs::new`] takes exactly the primary binding; call
3422/// [`with_waitpoint`](Self::with_waitpoint) to append further bindings
3423/// (the RFC-014 builder API).
3424#[derive(Clone, Debug, Serialize, Deserialize)]
3425#[non_exhaustive]
3426pub struct SuspendArgs {
3427 pub suspension_id: SuspensionId,
3428 /// RFC-014 Pattern 3: all waitpoint bindings for this suspension.
3429 /// Guaranteed non-empty; `waitpoints[0]` is the primary.
3430 pub waitpoints: Vec<WaitpointBinding>,
3431 pub resume_condition: ResumeCondition,
3432 pub resume_policy: ResumePolicy,
3433 pub reason_code: SuspensionReasonCode,
3434 pub requested_by: SuspensionRequester,
3435 #[serde(default, skip_serializing_if = "Option::is_none")]
3436 pub timeout_at: Option<TimestampMs>,
3437 pub timeout_behavior: TimeoutBehavior,
3438 #[serde(default, skip_serializing_if = "Option::is_none")]
3439 pub continuation_metadata_pointer: Option<String>,
3440 pub now: TimestampMs,
3441 #[serde(default, skip_serializing_if = "Option::is_none")]
3442 pub idempotency_key: Option<IdempotencyKey>,
3443}
3444
3445impl SuspendArgs {
3446 /// Build a minimal `SuspendArgs` for a worker-originated suspension.
3447 ///
3448 /// `waitpoint` becomes the primary binding. Append additional
3449 /// bindings with [`with_waitpoint`](Self::with_waitpoint) (RFC-014
3450 /// Pattern 3) or replace the set with
3451 /// [`with_waitpoints`](Self::with_waitpoints).
3452 ///
3453 /// Defaults: `requested_by = Worker`, `timeout_at = None`,
3454 /// `timeout_behavior = Fail`, `continuation_metadata_pointer = None`,
3455 /// `idempotency_key = None`.
3456 pub fn new(
3457 suspension_id: SuspensionId,
3458 waitpoint: WaitpointBinding,
3459 resume_condition: ResumeCondition,
3460 resume_policy: ResumePolicy,
3461 reason_code: SuspensionReasonCode,
3462 now: TimestampMs,
3463 ) -> Self {
3464 Self {
3465 suspension_id,
3466 waitpoints: vec![waitpoint],
3467 resume_condition,
3468 resume_policy,
3469 reason_code,
3470 requested_by: SuspensionRequester::Worker,
3471 timeout_at: None,
3472 timeout_behavior: TimeoutBehavior::Fail,
3473 continuation_metadata_pointer: None,
3474 now,
3475 idempotency_key: None,
3476 }
3477 }
3478
3479 /// Primary binding — `waitpoints[0]`. Guaranteed present by
3480 /// construction.
3481 pub fn primary(&self) -> &WaitpointBinding {
3482 &self.waitpoints[0]
3483 }
3484
3485 pub fn with_timeout(mut self, at: TimestampMs, behavior: TimeoutBehavior) -> Self {
3486 self.timeout_at = Some(at);
3487 self.timeout_behavior = behavior;
3488 self
3489 }
3490
3491 pub fn with_requester(mut self, requester: SuspensionRequester) -> Self {
3492 self.requested_by = requester;
3493 self
3494 }
3495
3496 pub fn with_continuation_metadata_pointer(mut self, p: String) -> Self {
3497 self.continuation_metadata_pointer = Some(p);
3498 self
3499 }
3500
3501 pub fn with_idempotency_key(mut self, key: IdempotencyKey) -> Self {
3502 self.idempotency_key = Some(key);
3503 self
3504 }
3505
3506 /// RFC-014 Pattern 3 — append a further waitpoint binding to this
3507 /// suspension. Each additional binding yields its own waitpoint
3508 /// hash, signal stream, condition hash and HMAC token in Valkey,
3509 /// but all share the suspension record and composite evaluator
3510 /// under one `suspension:current`.
3511 ///
3512 /// Ordering: the primary (from [`SuspendArgs::new`]) stays at
3513 /// `waitpoints[0]`; subsequent `with_waitpoint` calls append at the
3514 /// tail.
3515 pub fn with_waitpoint(mut self, binding: WaitpointBinding) -> Self {
3516 self.waitpoints.push(binding);
3517 self
3518 }
3519
3520 /// RFC-014 Pattern 3 — replace the full binding vector in one call.
3521 /// Must be non-empty; an empty Vec is a programmer error and will
3522 /// be rejected by the backend's `validate_suspend_args` with
3523 /// `waitpoints_empty`.
3524 pub fn with_waitpoints(mut self, bindings: Vec<WaitpointBinding>) -> Self {
3525 self.waitpoints = bindings;
3526 self
3527 }
3528}
3529
3530/// Shared "what happened on the waitpoint" payload carried in both
3531/// [`SuspendOutcome`] variants.
3532///
3533/// For Pattern 3 (RFC-014) — multi-waitpoint suspensions — the primary
3534/// binding's identity lives at the top level (`waitpoint_id` /
3535/// `waitpoint_key` / `waitpoint_token`) and remaining bindings are
3536/// exposed via `additional_waitpoints`, each carrying its own minted
3537/// HMAC token so external signallers can deliver to any of the N
3538/// waitpoints the suspension is listening on.
3539#[derive(Clone, Debug, PartialEq, Eq)]
3540#[non_exhaustive]
3541pub struct SuspendOutcomeDetails {
3542 pub suspension_id: SuspensionId,
3543 pub waitpoint_id: WaitpointId,
3544 pub waitpoint_key: String,
3545 pub waitpoint_token: WaitpointHmac,
3546 /// RFC-014 Pattern 3 extras (beyond the primary). Empty for
3547 /// single-waitpoint suspensions (patterns 1 + 2); carries one
3548 /// entry per additional binding for Pattern 3.
3549 pub additional_waitpoints: Vec<AdditionalWaitpointBinding>,
3550}
3551
3552/// RFC-014 Pattern 3 — per-binding identity + HMAC token for
3553/// waitpoints beyond the primary. Structure mirrors the top-level
3554/// fields on [`SuspendOutcomeDetails`].
3555#[derive(Clone, Debug, PartialEq, Eq)]
3556#[non_exhaustive]
3557pub struct AdditionalWaitpointBinding {
3558 pub waitpoint_id: WaitpointId,
3559 pub waitpoint_key: String,
3560 pub waitpoint_token: WaitpointHmac,
3561}
3562
3563impl AdditionalWaitpointBinding {
3564 pub fn new(
3565 waitpoint_id: WaitpointId,
3566 waitpoint_key: String,
3567 waitpoint_token: WaitpointHmac,
3568 ) -> Self {
3569 Self {
3570 waitpoint_id,
3571 waitpoint_key,
3572 waitpoint_token,
3573 }
3574 }
3575}
3576
3577impl SuspendOutcomeDetails {
3578 pub fn new(
3579 suspension_id: SuspensionId,
3580 waitpoint_id: WaitpointId,
3581 waitpoint_key: String,
3582 waitpoint_token: WaitpointHmac,
3583 ) -> Self {
3584 Self {
3585 suspension_id,
3586 waitpoint_id,
3587 waitpoint_key,
3588 waitpoint_token,
3589 additional_waitpoints: Vec::new(),
3590 }
3591 }
3592
3593 /// Attach RFC-014 Pattern 3 additional-waitpoint bindings. The
3594 /// primary binding stays at the top-level fields; `extras` lands
3595 /// in [`additional_waitpoints`](Self::additional_waitpoints).
3596 pub fn with_additional_waitpoints(
3597 mut self,
3598 extras: Vec<AdditionalWaitpointBinding>,
3599 ) -> Self {
3600 self.additional_waitpoints = extras;
3601 self
3602 }
3603}
3604
3605/// Trait-surface output from [`EngineBackend::suspend`] (RFC-013 §2.3).
3606///
3607/// Two variants encode the "lease released" vs "lease retained" split.
3608/// See §2.3 for the runtime-enforcement semantics.
3609#[derive(Clone, Debug, PartialEq, Eq)]
3610#[non_exhaustive]
3611pub enum SuspendOutcome {
3612 /// The worker's pre-suspend handle is no longer lease-bearing; a
3613 /// fresh `HandleKind::Suspended` handle supersedes it.
3614 Suspended {
3615 details: SuspendOutcomeDetails,
3616 handle: crate::backend::Handle,
3617 },
3618 /// Buffered signals on a pending waitpoint already satisfied the
3619 /// condition at suspension time; the lease is retained and the
3620 /// caller's pre-suspend handle remains valid.
3621 AlreadySatisfied { details: SuspendOutcomeDetails },
3622}
3623
3624impl SuspendOutcome {
3625 /// Borrow the shared details regardless of variant.
3626 pub fn details(&self) -> &SuspendOutcomeDetails {
3627 match self {
3628 Self::Suspended { details, .. } => details,
3629 Self::AlreadySatisfied { details } => details,
3630 }
3631 }
3632}
3633
3634// `EngineBackend::suspend` type re-exports for `ff_core::backend::*`
3635// consumers. The `backend` module re-exports these below so external
3636// crates can reach them via the idiomatic `ff_core::backend` path that
3637// already sources the other trait-surface types (RFC-013 §9.1).
3638
3639#[cfg(test)]
3640mod rfc_014_validation_tests {
3641 use super::*;
3642
3643 fn single(wp: &str) -> ResumeCondition {
3644 ResumeCondition::Single {
3645 waitpoint_key: wp.to_owned(),
3646 matcher: SignalMatcher::ByName("x".to_owned()),
3647 }
3648 }
3649
3650 #[test]
3651 fn single_passes_validate() {
3652 assert!(single("wpk:a").validate_composite().is_ok());
3653 }
3654
3655 #[test]
3656 fn allof_empty_members_rejected() {
3657 let c = ResumeCondition::Composite(CompositeBody::AllOf { members: vec![] });
3658 let e = c.validate_composite().unwrap_err();
3659 assert!(e.detail.contains("allof_empty_members"), "{}", e.detail);
3660 }
3661
3662 #[test]
3663 fn count_n_zero_rejected() {
3664 let c = ResumeCondition::Composite(CompositeBody::Count {
3665 n: 0,
3666 count_kind: CountKind::DistinctWaitpoints,
3667 matcher: None,
3668 waitpoints: vec!["wpk:a".to_owned()],
3669 });
3670 let e = c.validate_composite().unwrap_err();
3671 assert!(e.detail.contains("count_n_zero"), "{}", e.detail);
3672 }
3673
3674 #[test]
3675 fn count_waitpoints_empty_rejected() {
3676 let c = ResumeCondition::Composite(CompositeBody::Count {
3677 n: 1,
3678 count_kind: CountKind::DistinctSources,
3679 matcher: None,
3680 waitpoints: vec![],
3681 });
3682 let e = c.validate_composite().unwrap_err();
3683 assert!(e.detail.contains("count_waitpoints_empty"), "{}", e.detail);
3684 }
3685
3686 #[test]
3687 fn count_exceeds_waitpoint_set_rejected_only_for_distinct_waitpoints() {
3688 // n=3, only 2 waitpoints, DistinctWaitpoints → reject.
3689 let c = ResumeCondition::Composite(CompositeBody::Count {
3690 n: 3,
3691 count_kind: CountKind::DistinctWaitpoints,
3692 matcher: None,
3693 waitpoints: vec!["a".into(), "b".into()],
3694 });
3695 let e = c.validate_composite().unwrap_err();
3696 assert!(e.detail.contains("count_exceeds_waitpoint_set"), "{}", e.detail);
3697
3698 // Same cardinality, DistinctSignals → allowed (no upper bound).
3699 let c2 = ResumeCondition::Composite(CompositeBody::Count {
3700 n: 3,
3701 count_kind: CountKind::DistinctSignals,
3702 matcher: None,
3703 waitpoints: vec!["a".into(), "b".into()],
3704 });
3705 assert!(c2.validate_composite().is_ok());
3706 }
3707
3708 #[test]
3709 fn depth_4_accepted_depth_5_rejected() {
3710 // Build Depth-4: AllOf { AllOf { AllOf { AllOf { Single } } } }
3711 let leaf = single("wpk:leaf");
3712 let d4 = ResumeCondition::Composite(CompositeBody::AllOf {
3713 members: vec![ResumeCondition::Composite(CompositeBody::AllOf {
3714 members: vec![ResumeCondition::Composite(CompositeBody::AllOf {
3715 members: vec![ResumeCondition::Composite(CompositeBody::AllOf {
3716 members: vec![leaf.clone()],
3717 })],
3718 })],
3719 })],
3720 });
3721 assert!(d4.validate_composite().is_ok());
3722
3723 // Depth-5 → reject.
3724 let d5 = ResumeCondition::Composite(CompositeBody::AllOf {
3725 members: vec![d4],
3726 });
3727 let e = d5.validate_composite().unwrap_err();
3728 assert!(e.detail.contains("exceeds cap"), "{}", e.detail);
3729 }
3730}