ff_core/contracts/mod.rs
1//! Phase 1 function contracts — Args + Result types for each FCALL.
2//!
3//! Each Args struct defines the typed inputs to a Valkey Function.
4//! Each Result enum defines the possible outcomes (success variants + error codes).
5
6pub mod decode;
7
8use crate::policy::ExecutionPolicy;
9use crate::state::{AttemptType, PublicState, StateVector};
10use crate::types::{
11 AttemptId, AttemptIndex, CancelSource, EdgeId, ExecutionId, FlowId, LaneId, LeaseEpoch,
12 LeaseFence, LeaseId, Namespace, SignalId, SuspensionId, TimestampMs, WaitpointId,
13 WaitpointToken, WorkerId, WorkerInstanceId,
14};
15use serde::{Deserialize, Serialize};
16use std::collections::{BTreeMap, BTreeSet, HashMap};
17
18// ─── create_execution ───
19
20#[derive(Clone, Debug, Serialize, Deserialize)]
21pub struct CreateExecutionArgs {
22 pub execution_id: ExecutionId,
23 pub namespace: Namespace,
24 pub lane_id: LaneId,
25 pub execution_kind: String,
26 pub input_payload: Vec<u8>,
27 #[serde(default)]
28 pub payload_encoding: Option<String>,
29 pub priority: i32,
30 pub creator_identity: String,
31 #[serde(default)]
32 pub idempotency_key: Option<String>,
33 #[serde(default)]
34 pub tags: HashMap<String, String>,
35 /// Execution policy (retry, timeout, suspension, routing, etc.).
36 #[serde(default)]
37 pub policy: Option<ExecutionPolicy>,
38 /// If set and in the future, execution starts delayed.
39 #[serde(default)]
40 pub delay_until: Option<TimestampMs>,
41 /// Absolute deadline timestamp (ms). Execution expires if not complete by this time.
42 #[serde(default)]
43 pub execution_deadline_at: Option<TimestampMs>,
44 /// Partition ID (pre-computed).
45 pub partition_id: u16,
46 pub now: TimestampMs,
47}
48
49#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
50pub enum CreateExecutionResult {
51 /// Execution created successfully.
52 Created {
53 execution_id: ExecutionId,
54 public_state: PublicState,
55 },
56 /// Idempotent duplicate — existing execution returned.
57 Duplicate { execution_id: ExecutionId },
58}
59
60// ─── issue_claim_grant ───
61
62#[derive(Clone, Debug, Serialize, Deserialize)]
63pub struct IssueClaimGrantArgs {
64 pub execution_id: ExecutionId,
65 pub lane_id: LaneId,
66 pub worker_id: WorkerId,
67 pub worker_instance_id: WorkerInstanceId,
68 #[serde(default)]
69 pub capability_hash: Option<String>,
70 #[serde(default)]
71 pub route_snapshot_json: Option<String>,
72 #[serde(default)]
73 pub admission_summary: Option<String>,
74 /// Capabilities this worker advertises. Serialized as a sorted,
75 /// comma-separated string to the Lua FCALL (see scheduling.lua
76 /// ff_issue_claim_grant). An empty set matches only executions whose
77 /// `required_capabilities` is also empty.
78 #[serde(default)]
79 pub worker_capabilities: BTreeSet<String>,
80 pub grant_ttl_ms: u64,
81 /// Caller-side timestamp for bookkeeping. NOT passed to the Lua FCALL —
82 /// ff_issue_claim_grant uses `redis.call("TIME")` for grant_expires_at.
83 pub now: TimestampMs,
84}
85
86#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
87pub enum IssueClaimGrantResult {
88 /// Grant issued.
89 Granted { execution_id: ExecutionId },
90}
91
92/// A claim grant issued by the scheduler for a specific execution.
93///
94/// The worker uses this to call `ff_claim_execution` (or
95/// `ff_acquire_lease`), which atomically consumes the grant and
96/// creates the lease.
97///
98/// Shared wire-level type between `ff-scheduler` (issuer) and
99/// `ff-sdk` (consumer, via `FlowFabricWorker::claim_from_grant`).
100/// Lives in `ff-core` so neither crate needs a dep on the other.
101///
102/// **Lane asymmetry with [`ReclaimGrant`]:** `ClaimGrant` does NOT
103/// carry `lane_id`. The issuing scheduler's caller already picked
104/// a lane (that's how admission reached this grant) and passes it
105/// through to `claim_from_grant` as a separate argument. The grant
106/// handle stays narrow to what uniquely identifies the admission
107/// decision. The matching field on [`ReclaimGrant`] is an
108/// intentional divergence — see the note on that type.
109#[derive(Clone, Debug, PartialEq, Eq)]
110pub struct ClaimGrant {
111 /// The execution that was granted.
112 pub execution_id: ExecutionId,
113 /// Opaque partition handle for this execution's hash-tag slot.
114 ///
115 /// Public wire type: consumers pass it back to FlowFabric but
116 /// must not parse the interior hash tag for routing decisions.
117 /// Internal consumers that need the typed
118 /// [`crate::partition::Partition`] call [`Self::partition`].
119 pub partition_key: crate::partition::PartitionKey,
120 /// The Valkey key holding the grant hash (for the worker to
121 /// reference).
122 pub grant_key: String,
123 /// When the grant expires if not consumed.
124 pub expires_at_ms: u64,
125}
126
127impl ClaimGrant {
128 /// Parse `partition_key` into a typed
129 /// [`crate::partition::Partition`]. Intended for internal
130 /// consumers (scheduler emitter, SDK worker claim path) that
131 /// need the family/index pair. Fails only on malformed keys
132 /// (which indicates a producer bug).
133 ///
134 /// Alias collapse applies: a grant issued against `Execution`
135 /// family round-trips to `Flow` (see [`crate::partition::PartitionKey`]
136 /// for the rationale — routing is preserved, only the metadata
137 /// family label normalises).
138 pub fn partition(
139 &self,
140 ) -> Result<crate::partition::Partition, crate::partition::PartitionKeyParseError> {
141 self.partition_key.parse()
142 }
143}
144
145/// A reclaim grant issued for a resumed (attempt_interrupted) execution.
146///
147/// Issued by a producer (typically `ff-scheduler` once a Batch-C
148/// reclaim scanner is in place; test fixtures in the interim — no
149/// production Rust caller exists in-tree today). Consumed by
150/// [`FlowFabricWorker::claim_from_reclaim_grant`], which calls
151/// `ff_claim_resumed_execution` atomically: that FCALL validates the
152/// grant, consumes it, and transitions `attempt_interrupted` →
153/// `started` while preserving the existing `attempt_index` +
154/// `attempt_id` (a resumed execution re-uses its attempt; it does
155/// not start a new one).
156///
157/// Mirrors [`ClaimGrant`] for the resume path. Differences:
158///
159/// * [`ClaimGrant`] is issued against a freshly-eligible
160/// execution and `ff_claim_execution` creates a new attempt.
161/// * [`ReclaimGrant`] is issued against an `attempt_interrupted`
162/// execution; `ff_claim_resumed_execution` re-uses the existing
163/// attempt and bumps the lease epoch.
164///
165/// The grant itself is written to the same `claim_grant` Valkey key
166/// that [`ClaimGrant`] uses; the distinction is which Lua FCALL
167/// consumes it (`ff_claim_execution` for new attempts,
168/// `ff_claim_resumed_execution` for resumes).
169///
170/// **Lane asymmetry with [`ClaimGrant`]:** `ReclaimGrant` CARRIES
171/// `lane_id` as a field. The issuing path already knows the lane
172/// (it's read from `exec_core` at grant time); carrying it here
173/// spares the consumer a `HGET exec_core lane_id` round trip on
174/// the hot claim path. The asymmetry is intentional — prefer
175/// one-fewer-HGET on a type that already lives with the resumer's
176/// lifecycle over strict handle symmetry with `ClaimGrant`.
177///
178/// Shared wire-level type between the eventual `ff-scheduler`
179/// producer (Batch-C reclaim scanner — not yet in-tree; test
180/// fixtures construct this type today) and `ff-sdk` (consumer, via
181/// `FlowFabricWorker::claim_from_reclaim_grant`). Lives in
182/// `ff-core` so neither crate needs a dep on the other.
183///
184/// [`FlowFabricWorker::claim_from_reclaim_grant`]: https://docs.rs/ff-sdk
185#[derive(Clone, Debug, PartialEq, Eq)]
186pub struct ReclaimGrant {
187 /// The execution granted for resumption.
188 pub execution_id: ExecutionId,
189 /// Opaque partition handle for this execution's hash-tag slot.
190 ///
191 /// Same wire-opacity contract as [`ClaimGrant::partition_key`].
192 /// Internal consumers call [`Self::partition`] for the parsed
193 /// form.
194 pub partition_key: crate::partition::PartitionKey,
195 /// Valkey key of the grant hash — same key shape as
196 /// [`ClaimGrant`].
197 pub grant_key: String,
198 /// Monotonic ms when the grant expires; unconsumed grants
199 /// vanish.
200 pub expires_at_ms: u64,
201 /// Lane the execution belongs to. Needed by
202 /// `ff_claim_resumed_execution` for `KEYS[3]` (eligible_zset)
203 /// and `KEYS[9]` (active_index).
204 pub lane_id: LaneId,
205}
206
207impl ReclaimGrant {
208 /// Parse `partition_key` into a typed
209 /// [`crate::partition::Partition`]. See [`ClaimGrant::partition`]
210 /// for the alias-collapse note.
211 pub fn partition(
212 &self,
213 ) -> Result<crate::partition::Partition, crate::partition::PartitionKeyParseError> {
214 self.partition_key.parse()
215 }
216}
217
218// ─── claim_execution ───
219
220#[derive(Clone, Debug, Serialize, Deserialize)]
221pub struct ClaimExecutionArgs {
222 pub execution_id: ExecutionId,
223 pub worker_id: WorkerId,
224 pub worker_instance_id: WorkerInstanceId,
225 pub lane_id: LaneId,
226 pub lease_id: LeaseId,
227 pub lease_ttl_ms: u64,
228 pub attempt_id: AttemptId,
229 /// Expected attempt index (pre-read from exec_core.total_attempt_count).
230 /// Used for KEYS construction — must match what the Lua computes.
231 pub expected_attempt_index: AttemptIndex,
232 /// JSON-encoded attempt policy snapshot.
233 #[serde(default)]
234 pub attempt_policy_json: String,
235 /// Per-attempt timeout in ms.
236 #[serde(default)]
237 pub attempt_timeout_ms: Option<u64>,
238 /// Total execution deadline (absolute timestamp ms).
239 #[serde(default)]
240 pub execution_deadline_at: Option<i64>,
241 pub now: TimestampMs,
242}
243
244#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
245pub struct ClaimedExecution {
246 pub execution_id: ExecutionId,
247 pub lease_id: LeaseId,
248 pub lease_epoch: LeaseEpoch,
249 pub attempt_index: AttemptIndex,
250 pub attempt_id: AttemptId,
251 pub attempt_type: AttemptType,
252 pub lease_expires_at: TimestampMs,
253}
254
255#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
256pub enum ClaimExecutionResult {
257 /// Successfully claimed.
258 Claimed(ClaimedExecution),
259}
260
261// ─── complete_execution ───
262
263#[derive(Clone, Debug, Serialize, Deserialize)]
264pub struct CompleteExecutionArgs {
265 pub execution_id: ExecutionId,
266 /// RFC #58.5 — fence triple. `Some` for SDK worker paths (standard
267 /// stale-lease fence). `None` for operator overrides, in which case
268 /// `source` must be `CancelSource::OperatorOverride` or the Lua
269 /// returns `fence_required`.
270 #[serde(default)]
271 pub fence: Option<LeaseFence>,
272 pub attempt_index: AttemptIndex,
273 #[serde(default)]
274 pub result_payload: Option<Vec<u8>>,
275 #[serde(default)]
276 pub result_encoding: Option<String>,
277 /// RFC #58.5 — unfenced-call gate. Ignored when `fence` is `Some`.
278 #[serde(default)]
279 pub source: CancelSource,
280 pub now: TimestampMs,
281}
282
283#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
284pub enum CompleteExecutionResult {
285 /// Execution completed successfully.
286 Completed {
287 execution_id: ExecutionId,
288 public_state: PublicState,
289 },
290}
291
292// ─── renew_lease ───
293
294#[derive(Clone, Debug, Serialize, Deserialize)]
295pub struct RenewLeaseArgs {
296 pub execution_id: ExecutionId,
297 pub attempt_index: AttemptIndex,
298 /// RFC #58.5 — fence triple. Required (no operator override path for
299 /// renew). `None` returns `fence_required`.
300 pub fence: Option<LeaseFence>,
301 /// How long to extend the lease (milliseconds).
302 pub lease_ttl_ms: u64,
303 /// Grace period after lease_expires_at before the lease_current key is auto-deleted.
304 #[serde(default = "default_lease_history_grace_ms")]
305 pub lease_history_grace_ms: u64,
306}
307
308fn default_lease_history_grace_ms() -> u64 {
309 60_000
310}
311
312#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
313pub enum RenewLeaseResult {
314 /// Lease renewed.
315 Renewed { expires_at: TimestampMs },
316}
317
318// ─── mark_lease_expired_if_due ───
319
320#[derive(Clone, Debug, Serialize, Deserialize)]
321pub struct MarkLeaseExpiredArgs {
322 pub execution_id: ExecutionId,
323}
324
325#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
326pub enum MarkLeaseExpiredResult {
327 /// Lease was marked as expired.
328 MarkedExpired,
329 /// No action needed (already expired, not yet due, not active, etc.).
330 AlreadySatisfied { reason: String },
331}
332
333// ─── cancel_execution ───
334
335#[derive(Clone, Debug, Serialize, Deserialize)]
336pub struct CancelExecutionArgs {
337 pub execution_id: ExecutionId,
338 pub reason: String,
339 #[serde(default)]
340 pub source: CancelSource,
341 /// Required if not operator_override and execution is active.
342 #[serde(default)]
343 pub lease_id: Option<LeaseId>,
344 #[serde(default)]
345 pub lease_epoch: Option<LeaseEpoch>,
346 /// Required if not operator_override and execution is active.
347 #[serde(default)]
348 pub attempt_id: Option<AttemptId>,
349 pub now: TimestampMs,
350}
351
352#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
353pub enum CancelExecutionResult {
354 /// Execution cancelled.
355 Cancelled {
356 execution_id: ExecutionId,
357 public_state: PublicState,
358 },
359}
360
361// ─── revoke_lease ───
362
363#[derive(Clone, Debug, Serialize, Deserialize)]
364pub struct RevokeLeaseArgs {
365 pub execution_id: ExecutionId,
366 /// If set, only revoke if this matches the current lease. Empty string skips check.
367 #[serde(default)]
368 pub expected_lease_id: Option<String>,
369 /// Worker instance whose lease set to clean up. Read from exec_core before calling.
370 pub worker_instance_id: WorkerInstanceId,
371 pub reason: String,
372}
373
374#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
375pub enum RevokeLeaseResult {
376 /// Lease revoked.
377 Revoked {
378 lease_id: String,
379 lease_epoch: String,
380 },
381 /// Already revoked or expired — no action needed.
382 AlreadySatisfied { reason: String },
383}
384
385// ─── delay_execution ───
386
387#[derive(Clone, Debug, Serialize, Deserialize)]
388pub struct DelayExecutionArgs {
389 pub execution_id: ExecutionId,
390 /// RFC #58.5 — fence triple. `None` requires `source ==
391 /// CancelSource::OperatorOverride`.
392 #[serde(default)]
393 pub fence: Option<LeaseFence>,
394 pub attempt_index: AttemptIndex,
395 pub delay_until: TimestampMs,
396 #[serde(default)]
397 pub source: CancelSource,
398 pub now: TimestampMs,
399}
400
401#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
402pub enum DelayExecutionResult {
403 /// Execution delayed.
404 Delayed {
405 execution_id: ExecutionId,
406 public_state: PublicState,
407 },
408}
409
410// ─── move_to_waiting_children ───
411
412#[derive(Clone, Debug, Serialize, Deserialize)]
413pub struct MoveToWaitingChildrenArgs {
414 pub execution_id: ExecutionId,
415 /// RFC #58.5 — fence triple. `None` requires `source ==
416 /// CancelSource::OperatorOverride`.
417 #[serde(default)]
418 pub fence: Option<LeaseFence>,
419 pub attempt_index: AttemptIndex,
420 #[serde(default)]
421 pub source: CancelSource,
422 pub now: TimestampMs,
423}
424
425#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
426pub enum MoveToWaitingChildrenResult {
427 /// Moved to waiting children.
428 Moved {
429 execution_id: ExecutionId,
430 public_state: PublicState,
431 },
432}
433
434// ─── change_priority ───
435
436#[derive(Clone, Debug, Serialize, Deserialize)]
437pub struct ChangePriorityArgs {
438 pub execution_id: ExecutionId,
439 pub new_priority: i32,
440 pub lane_id: LaneId,
441 pub now: TimestampMs,
442}
443
444#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
445pub enum ChangePriorityResult {
446 /// Priority changed and re-scored.
447 Changed { execution_id: ExecutionId },
448}
449
450// ─── update_progress ───
451
452#[derive(Clone, Debug, Serialize, Deserialize)]
453pub struct UpdateProgressArgs {
454 pub execution_id: ExecutionId,
455 pub lease_id: LeaseId,
456 pub lease_epoch: LeaseEpoch,
457 pub attempt_id: AttemptId,
458 #[serde(default)]
459 pub progress_pct: Option<u8>,
460 #[serde(default)]
461 pub progress_message: Option<String>,
462 pub now: TimestampMs,
463}
464
465#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
466pub enum UpdateProgressResult {
467 /// Progress updated.
468 Updated,
469}
470
471// ═══════════════════════════════════════════════════════════════════════
472// Phase 2 contracts: fail, reclaim, expire
473// ═══════════════════════════════════════════════════════════════════════
474
475// ─── fail_execution ───
476
477#[derive(Clone, Debug, Serialize, Deserialize)]
478pub struct FailExecutionArgs {
479 pub execution_id: ExecutionId,
480 /// RFC #58.5 — fence triple. `None` requires `source ==
481 /// CancelSource::OperatorOverride`.
482 #[serde(default)]
483 pub fence: Option<LeaseFence>,
484 pub attempt_index: AttemptIndex,
485 pub failure_reason: String,
486 pub failure_category: String,
487 /// JSON-encoded retry policy (from execution policy). Empty = no retries.
488 #[serde(default)]
489 pub retry_policy_json: String,
490 /// JSON-encoded attempt policy for the next retry attempt.
491 #[serde(default)]
492 pub next_attempt_policy_json: String,
493 #[serde(default)]
494 pub source: CancelSource,
495}
496
497/// Outcome of a fail_execution call.
498#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
499pub enum FailExecutionResult {
500 /// Retry was scheduled — execution is delayed with backoff.
501 RetryScheduled {
502 delay_until: TimestampMs,
503 next_attempt_index: AttemptIndex,
504 },
505 /// No retries left — execution is terminal failed.
506 TerminalFailed,
507}
508
509// ─── issue_reclaim_grant ───
510
511#[derive(Clone, Debug, Serialize, Deserialize)]
512pub struct IssueReclaimGrantArgs {
513 pub execution_id: ExecutionId,
514 pub worker_id: WorkerId,
515 pub worker_instance_id: WorkerInstanceId,
516 pub lane_id: LaneId,
517 #[serde(default)]
518 pub capability_hash: Option<String>,
519 pub grant_ttl_ms: u64,
520 #[serde(default)]
521 pub route_snapshot_json: Option<String>,
522 #[serde(default)]
523 pub admission_summary: Option<String>,
524 /// Caller-side timestamp for bookkeeping. NOT passed to the Lua FCALL —
525 /// ff_issue_reclaim_grant uses `redis.call("TIME")` for grant_expires_at
526 /// (same as ff_issue_claim_grant). Kept for contract symmetry with
527 /// IssueClaimGrantArgs and scheduler audit logging.
528 pub now: TimestampMs,
529}
530
531#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
532pub enum IssueReclaimGrantResult {
533 /// Reclaim grant issued.
534 Granted { expires_at_ms: TimestampMs },
535}
536
537// ─── reclaim_execution ───
538
539#[derive(Clone, Debug, Serialize, Deserialize)]
540pub struct ReclaimExecutionArgs {
541 pub execution_id: ExecutionId,
542 pub worker_id: WorkerId,
543 pub worker_instance_id: WorkerInstanceId,
544 pub lane_id: LaneId,
545 #[serde(default)]
546 pub capability_hash: Option<String>,
547 pub lease_id: LeaseId,
548 pub lease_ttl_ms: u64,
549 pub attempt_id: AttemptId,
550 /// JSON-encoded attempt policy for the reclaim attempt.
551 #[serde(default)]
552 pub attempt_policy_json: String,
553 /// Maximum reclaim count before terminal failure. Default: 100.
554 #[serde(default = "default_max_reclaim_count")]
555 pub max_reclaim_count: u32,
556 /// Old worker instance (for old_worker_leases key construction).
557 pub old_worker_instance_id: WorkerInstanceId,
558 /// Current attempt index (for old_attempt/old_stream_meta key construction).
559 pub current_attempt_index: AttemptIndex,
560}
561
562fn default_max_reclaim_count() -> u32 {
563 100
564}
565
566#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
567pub enum ReclaimExecutionResult {
568 /// Execution reclaimed — new attempt + new lease.
569 Reclaimed {
570 new_attempt_index: AttemptIndex,
571 new_attempt_id: AttemptId,
572 new_lease_id: LeaseId,
573 new_lease_epoch: LeaseEpoch,
574 lease_expires_at: TimestampMs,
575 },
576 /// Max reclaims exceeded — execution moved to terminal.
577 MaxReclaimsExceeded,
578}
579
580// ─── expire_execution ───
581
582#[derive(Clone, Debug, Serialize, Deserialize)]
583pub struct ExpireExecutionArgs {
584 pub execution_id: ExecutionId,
585 /// "attempt_timeout" or "execution_deadline"
586 pub expire_reason: String,
587}
588
589#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
590pub enum ExpireExecutionResult {
591 /// Execution expired.
592 Expired { execution_id: ExecutionId },
593 /// Already terminal — no-op.
594 AlreadyTerminal,
595}
596
597// ═══════════════════════════════════════════════════════════════════════
598// Phase 3 contracts: suspend, signal, resume, waitpoint
599// ═══════════════════════════════════════════════════════════════════════
600
601// ─── suspend_execution ───
602
603#[derive(Clone, Debug, Serialize, Deserialize)]
604pub struct SuspendExecutionArgs {
605 pub execution_id: ExecutionId,
606 /// RFC #58.5 — fence triple. Required (no operator override path for
607 /// suspend). `None` returns `fence_required`.
608 pub fence: Option<LeaseFence>,
609 pub attempt_index: AttemptIndex,
610 pub suspension_id: SuspensionId,
611 pub waitpoint_id: WaitpointId,
612 pub waitpoint_key: String,
613 pub reason_code: String,
614 pub requested_by: String,
615 pub resume_condition_json: String,
616 pub resume_policy_json: String,
617 #[serde(default)]
618 pub continuation_metadata_pointer: Option<String>,
619 #[serde(default)]
620 pub timeout_at: Option<TimestampMs>,
621 /// true to activate a pending waitpoint, false to create new.
622 #[serde(default)]
623 pub use_pending_waitpoint: bool,
624 /// Timeout behavior: "fail", "cancel", "expire", "auto_resume", "escalate".
625 #[serde(default = "default_timeout_behavior")]
626 pub timeout_behavior: String,
627}
628
629fn default_timeout_behavior() -> String {
630 "fail".to_owned()
631}
632
633#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
634pub enum SuspendExecutionResult {
635 /// Execution suspended, waitpoint active.
636 Suspended {
637 suspension_id: SuspensionId,
638 waitpoint_id: WaitpointId,
639 waitpoint_key: String,
640 /// HMAC-SHA1 token bound to (waitpoint_id, waitpoint_key, created_at).
641 /// Required by signal-delivery callers to authenticate against this
642 /// waitpoint (RFC-004 §Waitpoint Security).
643 waitpoint_token: WaitpointToken,
644 },
645 /// Buffered signals already satisfied the condition — suspension skipped.
646 /// Lease is still held. Token comes from the pending waitpoint record.
647 AlreadySatisfied {
648 suspension_id: SuspensionId,
649 waitpoint_id: WaitpointId,
650 waitpoint_key: String,
651 waitpoint_token: WaitpointToken,
652 },
653}
654
655// ─── resume_execution ───
656
657#[derive(Clone, Debug, Serialize, Deserialize)]
658pub struct ResumeExecutionArgs {
659 pub execution_id: ExecutionId,
660 /// "signal", "operator", "auto_resume"
661 #[serde(default = "default_trigger_type")]
662 pub trigger_type: String,
663 /// Optional delay before becoming eligible (ms).
664 #[serde(default)]
665 pub resume_delay_ms: u64,
666}
667
668fn default_trigger_type() -> String {
669 "signal".to_owned()
670}
671
672#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
673pub enum ResumeExecutionResult {
674 /// Execution resumed to runnable.
675 Resumed { public_state: PublicState },
676}
677
678// ─── create_pending_waitpoint ───
679
680#[derive(Clone, Debug, Serialize, Deserialize)]
681pub struct CreatePendingWaitpointArgs {
682 pub execution_id: ExecutionId,
683 pub lease_id: LeaseId,
684 pub lease_epoch: LeaseEpoch,
685 pub attempt_index: AttemptIndex,
686 pub attempt_id: AttemptId,
687 pub waitpoint_id: WaitpointId,
688 pub waitpoint_key: String,
689 /// Short expiry for the pending waitpoint (ms).
690 pub expires_in_ms: u64,
691}
692
693#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
694pub enum CreatePendingWaitpointResult {
695 /// Pending waitpoint created.
696 Created {
697 waitpoint_id: WaitpointId,
698 waitpoint_key: String,
699 /// HMAC-SHA1 token bound to the pending waitpoint. Required for
700 /// `buffer_signal_for_pending_waitpoint` and carried forward when
701 /// the waitpoint is activated by `suspend_execution`.
702 waitpoint_token: WaitpointToken,
703 },
704}
705
706// ─── close_waitpoint ───
707
708#[derive(Clone, Debug, Serialize, Deserialize)]
709pub struct CloseWaitpointArgs {
710 pub waitpoint_id: WaitpointId,
711 pub reason: String,
712}
713
714#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
715pub enum CloseWaitpointResult {
716 /// Waitpoint closed.
717 Closed,
718}
719
720// ─── deliver_signal ───
721
722#[derive(Clone, Debug, Serialize, Deserialize)]
723pub struct DeliverSignalArgs {
724 pub execution_id: ExecutionId,
725 pub waitpoint_id: WaitpointId,
726 pub signal_id: SignalId,
727 pub signal_name: String,
728 pub signal_category: String,
729 pub source_type: String,
730 pub source_identity: String,
731 #[serde(default)]
732 pub payload: Option<Vec<u8>>,
733 #[serde(default)]
734 pub payload_encoding: Option<String>,
735 #[serde(default)]
736 pub correlation_id: Option<String>,
737 #[serde(default)]
738 pub idempotency_key: Option<String>,
739 pub target_scope: String,
740 #[serde(default)]
741 pub created_at: Option<TimestampMs>,
742 /// Dedup TTL for idempotency key (ms).
743 #[serde(default)]
744 pub dedup_ttl_ms: Option<u64>,
745 /// Resume delay after signal satisfaction (ms).
746 #[serde(default)]
747 pub resume_delay_ms: Option<u64>,
748 /// Max signals per execution (default 10000).
749 #[serde(default)]
750 pub max_signals_per_execution: Option<u64>,
751 /// MAXLEN for the waitpoint signal stream.
752 #[serde(default)]
753 pub signal_maxlen: Option<u64>,
754 /// HMAC-SHA1 token issued when the waitpoint was created. Required for
755 /// signal delivery; missing/tampered/rotated-past-grace tokens are
756 /// rejected with `invalid_token` or `token_expired` (RFC-004).
757 ///
758 /// Defense-in-depth: `WaitpointToken` is a transparent string newtype,
759 /// so an empty string deserializes successfully from JSON. The
760 /// validation boundary is in Lua (`validate_waitpoint_token` returns
761 /// `missing_token` on empty input); this type intentionally does NOT
762 /// pre-reject at the Rust layer so callers get a consistent typed
763 /// error regardless of how they constructed the args.
764 pub waitpoint_token: WaitpointToken,
765 pub now: TimestampMs,
766}
767
768#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
769pub enum DeliverSignalResult {
770 /// Signal accepted with the given effect.
771 Accepted { signal_id: SignalId, effect: String },
772 /// Duplicate signal (idempotency key matched).
773 Duplicate { existing_signal_id: SignalId },
774}
775
776// ─── buffer_signal_for_pending_waitpoint ───
777
778#[derive(Clone, Debug, Serialize, Deserialize)]
779pub struct BufferSignalArgs {
780 pub execution_id: ExecutionId,
781 pub waitpoint_id: WaitpointId,
782 pub signal_id: SignalId,
783 pub signal_name: String,
784 pub signal_category: String,
785 pub source_type: String,
786 pub source_identity: String,
787 #[serde(default)]
788 pub payload: Option<Vec<u8>>,
789 #[serde(default)]
790 pub payload_encoding: Option<String>,
791 #[serde(default)]
792 pub idempotency_key: Option<String>,
793 pub target_scope: String,
794 /// HMAC-SHA1 token issued when `create_pending_waitpoint` ran. Required
795 /// to authenticate early signals targeting the pending waitpoint.
796 pub waitpoint_token: WaitpointToken,
797 pub now: TimestampMs,
798}
799
800#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
801pub enum BufferSignalResult {
802 /// Signal buffered for pending waitpoint.
803 Buffered { signal_id: SignalId },
804 /// Duplicate signal.
805 Duplicate { existing_signal_id: SignalId },
806}
807
808// ─── list_pending_waitpoints ───
809
810/// One entry in the read-only view of an execution's active waitpoints.
811///
812/// Returned by `EngineBackend::list_pending_waitpoints` (and the
813/// `GET /v1/executions/{id}/pending-waitpoints` REST endpoint).
814///
815/// **RFC-017 §8 schema rewrite (Stage D1).** This struct no longer
816/// carries the raw HMAC `waitpoint_token` at the trait boundary — the
817/// backend emits only the sanitised `(token_kid, token_fingerprint)`
818/// pair. The HTTP handler (see `ff-server::api::list_pending_waitpoints`)
819/// wraps the trait response and re-injects the real token on the
820/// v0.7.x wire for one-release deprecation warning; the wire field is
821/// removed entirely at v0.8.0.
822#[non_exhaustive]
823#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
824pub struct PendingWaitpointInfo {
825 pub waitpoint_id: WaitpointId,
826 pub waitpoint_key: String,
827 /// Current waitpoint state: `pending`, `active`, `closed`. Callers
828 /// typically filter to `pending` or `active`.
829 pub state: String,
830 /// Signal names the resume condition is waiting for. Reviewers that
831 /// need to drive a specific waitpoint — particularly when multiple
832 /// concurrent waitpoints exist on one execution — filter on this to
833 /// pick the right target.
834 ///
835 /// An EMPTY vec means the condition matches any signal (wildcard, per
836 /// `lua/helpers.lua` `initialize_condition`). Callers must not infer
837 /// "no waitpoint" from empty; check `state` / length of the outer
838 /// list for that.
839 #[serde(default)]
840 pub required_signal_names: Vec<String>,
841 /// Timestamp when the waitpoint record was first written.
842 pub created_at: TimestampMs,
843 /// Timestamp when the waitpoint was activated (suspension landed).
844 /// `None` while the waitpoint is still `pending`.
845 #[serde(default, skip_serializing_if = "Option::is_none")]
846 pub activated_at: Option<TimestampMs>,
847 /// Scheduled expiration timestamp. `None` if no timeout configured.
848 #[serde(default, skip_serializing_if = "Option::is_none")]
849 pub expires_at: Option<TimestampMs>,
850 /// Owning execution — surfaces without a separate lookup.
851 pub execution_id: ExecutionId,
852 /// HMAC key identifier (the `<kid>` prefix of the stored
853 /// `waitpoint_token`). Safe to expose — identifies which signing
854 /// key minted the token without revealing the key material.
855 pub token_kid: String,
856 /// 16-hex-char (8-byte) fingerprint of the HMAC digest. Audit-friendly
857 /// handle that correlates across logs without being replayable.
858 pub token_fingerprint: String,
859}
860
861impl PendingWaitpointInfo {
862 /// Construct a `PendingWaitpointInfo` with the 7 required fields.
863 /// Optional fields (`activated_at`, `expires_at`) default to
864 /// `None`; use [`Self::with_activated_at`] / [`Self::with_expires_at`]
865 /// to populate them. `required_signal_names` defaults to empty
866 /// (wildcard condition); use [`Self::with_required_signal_names`]
867 /// to set it.
868 pub fn new(
869 waitpoint_id: WaitpointId,
870 waitpoint_key: String,
871 state: String,
872 created_at: TimestampMs,
873 execution_id: ExecutionId,
874 token_kid: String,
875 token_fingerprint: String,
876 ) -> Self {
877 Self {
878 waitpoint_id,
879 waitpoint_key,
880 state,
881 required_signal_names: Vec::new(),
882 created_at,
883 activated_at: None,
884 expires_at: None,
885 execution_id,
886 token_kid,
887 token_fingerprint,
888 }
889 }
890
891 pub fn with_activated_at(mut self, activated_at: TimestampMs) -> Self {
892 self.activated_at = Some(activated_at);
893 self
894 }
895
896 pub fn with_expires_at(mut self, expires_at: TimestampMs) -> Self {
897 self.expires_at = Some(expires_at);
898 self
899 }
900
901 pub fn with_required_signal_names(mut self, names: Vec<String>) -> Self {
902 self.required_signal_names = names;
903 self
904 }
905}
906
907// ─── expire_suspension ───
908
909#[derive(Clone, Debug, Serialize, Deserialize)]
910pub struct ExpireSuspensionArgs {
911 pub execution_id: ExecutionId,
912}
913
914#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
915pub enum ExpireSuspensionResult {
916 /// Suspension expired with the given behavior applied.
917 Expired { behavior_applied: String },
918 /// Already resolved — no action needed.
919 AlreadySatisfied { reason: String },
920}
921
922// ─── claim_resumed_execution ───
923
924#[derive(Clone, Debug, Serialize, Deserialize)]
925pub struct ClaimResumedExecutionArgs {
926 pub execution_id: ExecutionId,
927 pub worker_id: WorkerId,
928 pub worker_instance_id: WorkerInstanceId,
929 pub lane_id: LaneId,
930 pub lease_id: LeaseId,
931 pub lease_ttl_ms: u64,
932 /// Current attempt index (for KEYS construction — from exec_core).
933 pub current_attempt_index: AttemptIndex,
934 /// Remaining attempt timeout from before suspension (ms). 0 = no timeout.
935 #[serde(default)]
936 pub remaining_attempt_timeout_ms: Option<u64>,
937 pub now: TimestampMs,
938}
939
940#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
941pub struct ClaimedResumedExecution {
942 pub execution_id: ExecutionId,
943 pub lease_id: LeaseId,
944 pub lease_epoch: LeaseEpoch,
945 pub attempt_index: AttemptIndex,
946 pub attempt_id: AttemptId,
947 pub lease_expires_at: TimestampMs,
948}
949
950#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
951pub enum ClaimResumedExecutionResult {
952 /// Successfully claimed resumed execution (same attempt continues).
953 Claimed(ClaimedResumedExecution),
954}
955
956// ═══════════════════════════════════════════════════════════════════════
957// Phase 4 contracts: stream
958// ═══════════════════════════════════════════════════════════════════════
959
960// ─── append_frame ───
961
962#[derive(Clone, Debug, Serialize, Deserialize)]
963pub struct AppendFrameArgs {
964 pub execution_id: ExecutionId,
965 pub attempt_index: AttemptIndex,
966 pub lease_id: LeaseId,
967 pub lease_epoch: LeaseEpoch,
968 pub attempt_id: AttemptId,
969 pub frame_type: String,
970 pub timestamp: TimestampMs,
971 pub payload: Vec<u8>,
972 #[serde(default)]
973 pub encoding: Option<String>,
974 /// Optional structured metadata for the frame (JSON blob).
975 #[serde(default)]
976 pub metadata_json: Option<String>,
977 #[serde(default)]
978 pub correlation_id: Option<String>,
979 #[serde(default)]
980 pub source: Option<String>,
981 /// MAXLEN for the stream. 0 = no trim.
982 #[serde(default)]
983 pub retention_maxlen: Option<u32>,
984 /// Max payload bytes per frame. Default: 65536.
985 #[serde(default)]
986 pub max_payload_bytes: Option<u32>,
987}
988
989#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
990pub enum AppendFrameResult {
991 /// Frame appended successfully.
992 Appended {
993 /// Valkey Stream entry ID (e.g. "1713100800150-0").
994 entry_id: String,
995 /// Total frame count after this append.
996 frame_count: u64,
997 },
998}
999
1000// ─── StreamCursor (issue #92) ───
1001
1002/// Opaque cursor for attempt-stream reads/tails.
1003///
1004/// Replaces the bare `&str` / `String` stream-id parameters previously
1005/// carried on `read_stream` / `tail_stream` / `ReadStreamParams` /
1006/// `TailStreamParams`. The wire form is a flat string — serde is
1007/// transparent via `try_from`/`into` — so `?from=start&to=end` and
1008/// `?after=123-0` continue to work for REST clients.
1009///
1010/// # Public wire grammar
1011///
1012/// The ONLY accepted tokens are:
1013///
1014/// * `"start"` — first entry in the stream (XRANGE `-` equivalent).
1015/// Valid in `read_stream` / `ReadStreamParams`.
1016/// * `"end"` — latest entry in the stream (XRANGE `+` equivalent).
1017/// Valid in `read_stream` / `ReadStreamParams`.
1018/// * `"<ms>"` or `"<ms>-<seq>"` — a concrete Valkey Stream entry id.
1019/// Valid everywhere.
1020///
1021/// The bare XRANGE/XREAD markers `"-"` and `"+"` are **NOT** accepted
1022/// on the wire. The opaque `StreamCursor` grammar is the public
1023/// contract; the Valkey `-`/`+` markers are an internal implementation
1024/// detail carried only inside the Lua-adjacent [`ReadFramesArgs`] /
1025/// `xread_block` path via [`StreamCursor::to_wire`].
1026///
1027/// For XREAD (tail), the documented "from the beginning" convention is
1028/// `StreamCursor::At("0-0".into())` — use the convenience constructor
1029/// [`StreamCursor::from_beginning`] which returns exactly that value.
1030/// `Start` / `End` are rejected by the SDK's `tail_stream` boundary
1031/// because XREAD does not accept `-` / `+` as cursors. The
1032/// [`StreamCursor::is_concrete`] helper centralises this
1033/// Start/End-vs-At decision for boundary-validation call sites.
1034///
1035/// # Why an enum instead of a string
1036///
1037/// A string parameter lets malformed ids escape to the Lua/Valkey
1038/// layer, surfacing as a script error and HTTP 500. An enum with
1039/// fallible `FromStr` / `TryFrom<String>` catches every malformed input
1040/// at the wire boundary with a structured error, and prevents bare `-`
1041/// / `+` from leaking into consumer code as tacit extensions of the
1042/// public API.
1043#[derive(Clone, Debug, PartialEq, Eq, Hash)]
1044pub enum StreamCursor {
1045 /// First entry in the stream (XRANGE start marker).
1046 Start,
1047 /// Latest entry in the stream (XRANGE end marker).
1048 End,
1049 /// A concrete Valkey Stream entry id (`<ms>` or `<ms>-<seq>`).
1050 ///
1051 /// For XREAD-style tails, the documented "from the beginning"
1052 /// convention is `At("0-0".to_owned())` — see
1053 /// [`StreamCursor::from_beginning`].
1054 At(String),
1055}
1056
1057impl StreamCursor {
1058 /// Convenience constructor for the XREAD-from-beginning convention
1059 /// (`"0-0"`). XREAD's `last_id` is exclusive, so passing this as
1060 /// the `after` cursor returns every entry in the stream.
1061 pub fn from_beginning() -> Self {
1062 Self::At("0-0".to_owned())
1063 }
1064
1065 /// Serde default helper — emits `StreamCursor::Start`. Used as
1066 /// `#[serde(default = "StreamCursor::start")]` on REST query
1067 /// structs.
1068 pub fn start() -> Self {
1069 Self::Start
1070 }
1071
1072 /// Serde default helper — emits `StreamCursor::End`.
1073 pub fn end() -> Self {
1074 Self::End
1075 }
1076
1077 /// Serde default helper — emits
1078 /// `StreamCursor::from_beginning()`. Used as the default for
1079 /// `TailStreamParams::after`.
1080 pub fn beginning() -> Self {
1081 Self::from_beginning()
1082 }
1083
1084 /// Internal-only: lower the cursor to the XRANGE/XREAD marker
1085 /// string Valkey expects. `Start → "-"`, `End → "+"`,
1086 /// `At(s) → s`.
1087 ///
1088 /// Used at the ff-script adapter edge (right before constructing
1089 /// `ReadFramesArgs` or calling `xread_block`) to translate the
1090 /// opaque wire grammar into the Lua-ABI form. NOT part of the
1091 /// public wire — do not emit these raw characters to consumers.
1092 /// Hidden from the generated docs to discourage external use;
1093 /// external consumers should never need to see the raw `-` / `+`.
1094 #[doc(hidden)]
1095 pub fn to_wire(&self) -> &str {
1096 match self {
1097 Self::Start => "-",
1098 Self::End => "+",
1099 Self::At(s) => s.as_str(),
1100 }
1101 }
1102
1103 /// Internal-only owned variant of [`Self::to_wire`] — moves the
1104 /// inner `String` out of `At(s)` without cloning. Use at adapter
1105 /// edges that construct an owned wire string (e.g.
1106 /// `ReadFramesArgs.from_id`) from a `StreamCursor` that is about
1107 /// to be dropped.
1108 #[doc(hidden)]
1109 pub fn into_wire_string(self) -> String {
1110 match self {
1111 Self::Start => "-".to_owned(),
1112 Self::End => "+".to_owned(),
1113 Self::At(s) => s,
1114 }
1115 }
1116
1117 /// True iff this cursor is a concrete entry id
1118 /// (`"<ms>"` / `"<ms>-<seq>"`). False for the open markers
1119 /// `Start` / `End`.
1120 ///
1121 /// Used by boundaries like XREAD (tailing) that do not accept
1122 /// open markers — rejecting a cursor is equivalent to
1123 /// `!cursor.is_concrete()`. Centralised here to keep the SDK and
1124 /// REST guards in lock-step.
1125 pub fn is_concrete(&self) -> bool {
1126 matches!(self, Self::At(_))
1127 }
1128}
1129
1130impl std::fmt::Display for StreamCursor {
1131 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1132 match self {
1133 Self::Start => f.write_str("start"),
1134 Self::End => f.write_str("end"),
1135 Self::At(s) => f.write_str(s),
1136 }
1137 }
1138}
1139
1140/// Error produced when parsing a [`StreamCursor`] from a string.
1141#[derive(Clone, Debug, PartialEq, Eq)]
1142pub enum StreamCursorParseError {
1143 /// Empty input.
1144 Empty,
1145 /// Input matched a rejected bare-marker alias (`"-"`, `"+"`).
1146 /// The public wire requires `"start"` / `"end"`; the raw Valkey
1147 /// markers are internal-only.
1148 BareMarkerRejected(String),
1149 /// Input was neither a recognized keyword nor a well-formed
1150 /// Stream entry id. Entry ids must match `^\d+(?:-\d+)?$`.
1151 Malformed(String),
1152}
1153
1154impl std::fmt::Display for StreamCursorParseError {
1155 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1156 match self {
1157 Self::Empty => f.write_str("stream cursor must not be empty"),
1158 Self::BareMarkerRejected(s) => write!(
1159 f,
1160 "bare marker '{s}' is not a valid stream cursor; use 'start' or 'end'"
1161 ),
1162 Self::Malformed(s) => write!(
1163 f,
1164 "invalid stream cursor '{s}' (expected 'start', 'end', '<ms>', or '<ms>-<seq>')"
1165 ),
1166 }
1167 }
1168}
1169
1170impl std::error::Error for StreamCursorParseError {}
1171
1172/// Shared grammar check — classifies `s` as `Start` / `End` / a
1173/// concrete-id shape / malformed / empty, WITHOUT allocating. The
1174/// owned vs borrowed entry points ([`StreamCursor::from_str`],
1175/// [`StreamCursor::try_from`]) consume this classification and move
1176/// the owned `String` into `At` when applicable, avoiding a
1177/// round-trip `String → &str → String::to_owned` for the common
1178/// REST-query path.
1179enum StreamCursorClass {
1180 Start,
1181 End,
1182 Concrete,
1183 BareMarker,
1184 Empty,
1185 Malformed,
1186}
1187
1188fn classify_stream_cursor(s: &str) -> StreamCursorClass {
1189 if s.is_empty() {
1190 return StreamCursorClass::Empty;
1191 }
1192 if s == "-" || s == "+" {
1193 return StreamCursorClass::BareMarker;
1194 }
1195 if s == "start" {
1196 return StreamCursorClass::Start;
1197 }
1198 if s == "end" {
1199 return StreamCursorClass::End;
1200 }
1201 if !s.is_ascii() {
1202 return StreamCursorClass::Malformed;
1203 }
1204 let (ms_part, seq_part) = match s.split_once('-') {
1205 Some((ms, seq)) => (ms, Some(seq)),
1206 None => (s, None),
1207 };
1208 let ms_ok = !ms_part.is_empty() && ms_part.bytes().all(|b| b.is_ascii_digit());
1209 let seq_ok = seq_part
1210 .map(|p| !p.is_empty() && p.bytes().all(|b| b.is_ascii_digit()))
1211 .unwrap_or(true);
1212 if ms_ok && seq_ok {
1213 StreamCursorClass::Concrete
1214 } else {
1215 StreamCursorClass::Malformed
1216 }
1217}
1218
1219impl std::str::FromStr for StreamCursor {
1220 type Err = StreamCursorParseError;
1221
1222 fn from_str(s: &str) -> Result<Self, Self::Err> {
1223 match classify_stream_cursor(s) {
1224 StreamCursorClass::Start => Ok(Self::Start),
1225 StreamCursorClass::End => Ok(Self::End),
1226 StreamCursorClass::Concrete => Ok(Self::At(s.to_owned())),
1227 StreamCursorClass::BareMarker => {
1228 Err(StreamCursorParseError::BareMarkerRejected(s.to_owned()))
1229 }
1230 StreamCursorClass::Empty => Err(StreamCursorParseError::Empty),
1231 StreamCursorClass::Malformed => Err(StreamCursorParseError::Malformed(s.to_owned())),
1232 }
1233 }
1234}
1235
1236impl TryFrom<String> for StreamCursor {
1237 type Error = StreamCursorParseError;
1238
1239 fn try_from(s: String) -> Result<Self, Self::Error> {
1240 // Owned parsing path — the `At` variant moves `s` in directly,
1241 // avoiding the `&str → String::to_owned` re-allocation that a
1242 // blind forward to `FromStr::from_str(&s)` would force. Error
1243 // paths still pay one allocation to describe the offending
1244 // input.
1245 match classify_stream_cursor(&s) {
1246 StreamCursorClass::Start => Ok(Self::Start),
1247 StreamCursorClass::End => Ok(Self::End),
1248 StreamCursorClass::Concrete => Ok(Self::At(s)),
1249 StreamCursorClass::BareMarker => Err(StreamCursorParseError::BareMarkerRejected(s)),
1250 StreamCursorClass::Empty => Err(StreamCursorParseError::Empty),
1251 StreamCursorClass::Malformed => Err(StreamCursorParseError::Malformed(s)),
1252 }
1253 }
1254}
1255
1256impl From<StreamCursor> for String {
1257 fn from(c: StreamCursor) -> Self {
1258 c.to_string()
1259 }
1260}
1261
1262impl Serialize for StreamCursor {
1263 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
1264 serializer.collect_str(self)
1265 }
1266}
1267
1268impl<'de> Deserialize<'de> for StreamCursor {
1269 fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
1270 let s = String::deserialize(deserializer)?;
1271 Self::try_from(s).map_err(serde::de::Error::custom)
1272 }
1273}
1274
1275// ─── read_attempt_stream / tail_attempt_stream ───
1276
1277/// Hard cap on the number of frames returned by a single read/tail call.
1278///
1279/// Single source of truth across the Rust layer (ff-script, ff-server,
1280/// ff-sdk). The Lua side in `lua/stream.lua` keeps a matching literal with
1281/// an inline reference back here; bump both together if you ever need to
1282/// lift the cap.
1283pub const STREAM_READ_HARD_CAP: u64 = 10_000;
1284
1285/// A single frame read from an attempt-scoped stream.
1286///
1287/// Field set mirrors what `ff_append_frame` writes: `frame_type`, `ts`,
1288/// `payload`, `encoding`, `source`, and optionally `correlation_id`. Stored
1289/// as an ordered map so field order is deterministic across read calls.
1290#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1291pub struct StreamFrame {
1292 /// Valkey Stream entry ID, e.g. "1713100800150-0".
1293 pub id: String,
1294 /// Frame fields in sorted order.
1295 pub fields: std::collections::BTreeMap<String, String>,
1296}
1297
1298/// Inputs to `ff_read_attempt_stream` (XRANGE wrapper).
1299#[derive(Clone, Debug, Serialize, Deserialize)]
1300pub struct ReadFramesArgs {
1301 pub execution_id: ExecutionId,
1302 pub attempt_index: AttemptIndex,
1303 /// XRANGE start ID. Use "-" for earliest.
1304 pub from_id: String,
1305 /// XRANGE end ID. Use "+" for latest.
1306 pub to_id: String,
1307 /// XRANGE COUNT limit. MUST be `>= 1`. The REST and SDK layers reject
1308 /// `0` at the boundary; the Lua side rejects it too. `STREAM_READ_HARD_CAP`
1309 /// is the upper bound.
1310 pub count_limit: u64,
1311}
1312
1313/// Result of reading frames from an attempt stream — frames plus terminal
1314/// signal so consumers can stop polling without a timeout fallback.
1315#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1316pub struct StreamFrames {
1317 /// Entries in the requested range (possibly empty).
1318 pub frames: Vec<StreamFrame>,
1319 /// Timestamp when the upstream writer closed the stream. `None` if the
1320 /// stream is still open (or has never been written).
1321 #[serde(default, skip_serializing_if = "Option::is_none")]
1322 pub closed_at: Option<TimestampMs>,
1323 /// Reason from the closing writer. Current values:
1324 /// `attempt_success`, `attempt_failure`, `attempt_cancelled`,
1325 /// `attempt_interrupted`. `None` iff the stream is still open.
1326 #[serde(default, skip_serializing_if = "Option::is_none")]
1327 pub closed_reason: Option<String>,
1328}
1329
1330impl StreamFrames {
1331 /// Construct an empty open-stream result (no frames, no terminal
1332 /// markers). Useful for fast-path peek helpers.
1333 pub fn empty_open() -> Self {
1334 Self {
1335 frames: Vec::new(),
1336 closed_at: None,
1337 closed_reason: None,
1338 }
1339 }
1340
1341 /// True iff the producer has closed this stream. Consumers should stop
1342 /// polling and drain once this returns true.
1343 pub fn is_closed(&self) -> bool {
1344 self.closed_at.is_some()
1345 }
1346}
1347
1348#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1349pub enum ReadFramesResult {
1350 /// Frames returned (possibly empty) plus optional closed markers.
1351 Frames(StreamFrames),
1352}
1353
1354// ═══════════════════════════════════════════════════════════════════════
1355// Phase 5 contracts: budget, quota, block/unblock
1356// ═══════════════════════════════════════════════════════════════════════
1357
1358// ─── create_budget ───
1359
1360#[derive(Clone, Debug, Serialize, Deserialize)]
1361pub struct CreateBudgetArgs {
1362 pub budget_id: crate::types::BudgetId,
1363 pub scope_type: String,
1364 pub scope_id: String,
1365 pub enforcement_mode: String,
1366 pub on_hard_limit: String,
1367 pub on_soft_limit: String,
1368 pub reset_interval_ms: u64,
1369 /// Dimension names.
1370 pub dimensions: Vec<String>,
1371 /// Hard limits per dimension (parallel with dimensions).
1372 pub hard_limits: Vec<u64>,
1373 /// Soft limits per dimension (parallel with dimensions).
1374 pub soft_limits: Vec<u64>,
1375 pub now: TimestampMs,
1376}
1377
1378#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1379pub enum CreateBudgetResult {
1380 /// Budget created.
1381 Created { budget_id: crate::types::BudgetId },
1382 /// Already exists (idempotent).
1383 AlreadySatisfied { budget_id: crate::types::BudgetId },
1384}
1385
1386// ─── create_quota_policy ───
1387
1388#[derive(Clone, Debug, Serialize, Deserialize)]
1389pub struct CreateQuotaPolicyArgs {
1390 pub quota_policy_id: crate::types::QuotaPolicyId,
1391 pub window_seconds: u64,
1392 pub max_requests_per_window: u64,
1393 pub max_concurrent: u64,
1394 pub now: TimestampMs,
1395}
1396
1397#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1398pub enum CreateQuotaPolicyResult {
1399 /// Quota policy created.
1400 Created {
1401 quota_policy_id: crate::types::QuotaPolicyId,
1402 },
1403 /// Already exists (idempotent).
1404 AlreadySatisfied {
1405 quota_policy_id: crate::types::QuotaPolicyId,
1406 },
1407}
1408
1409// ─── budget_status (read-only) ───
1410
1411/// Operator-facing budget status snapshot (not an FCALL — direct HGETALL reads).
1412#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1413pub struct BudgetStatus {
1414 pub budget_id: String,
1415 pub scope_type: String,
1416 pub scope_id: String,
1417 pub enforcement_mode: String,
1418 /// Current usage per dimension: {dimension_name: current_value}.
1419 pub usage: HashMap<String, u64>,
1420 /// Hard limits per dimension: {dimension_name: limit}.
1421 pub hard_limits: HashMap<String, u64>,
1422 /// Soft limits per dimension: {dimension_name: limit}.
1423 pub soft_limits: HashMap<String, u64>,
1424 pub breach_count: u64,
1425 pub soft_breach_count: u64,
1426 pub last_breach_at: Option<String>,
1427 pub last_breach_dim: Option<String>,
1428 pub next_reset_at: Option<String>,
1429 pub created_at: Option<String>,
1430}
1431
1432// ─── report_usage_and_check ───
1433
1434#[derive(Clone, Debug, Serialize, Deserialize)]
1435pub struct ReportUsageArgs {
1436 /// Dimension names to increment.
1437 pub dimensions: Vec<String>,
1438 /// Increment values (parallel with dimensions).
1439 pub deltas: Vec<u64>,
1440 pub now: TimestampMs,
1441 /// Optional idempotency key to prevent double-counting on retries.
1442 /// Pass the raw dedup id (e.g. `"retry-42"`); the typed FCALL wrapper
1443 /// wraps it into `ff:usagededup:{b:M}:<id>` using the budget
1444 /// partition's hash tag so it co-locates with the other budget keys
1445 /// (#108).
1446 #[serde(default)]
1447 pub dedup_key: Option<String>,
1448}
1449
1450#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1451#[non_exhaustive]
1452pub enum ReportUsageResult {
1453 /// All increments applied, no breach.
1454 Ok,
1455 /// Soft limit breached on a dimension (advisory, increments applied).
1456 SoftBreach {
1457 dimension: String,
1458 current_usage: u64,
1459 soft_limit: u64,
1460 },
1461 /// Hard limit breached (increments NOT applied).
1462 HardBreach {
1463 dimension: String,
1464 current_usage: u64,
1465 hard_limit: u64,
1466 },
1467 /// Dedup key matched — usage already applied in a prior call.
1468 AlreadyApplied,
1469}
1470
1471// ─── reset_budget ───
1472
1473#[derive(Clone, Debug, Serialize, Deserialize)]
1474pub struct ResetBudgetArgs {
1475 pub budget_id: crate::types::BudgetId,
1476 pub now: TimestampMs,
1477}
1478
1479#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1480pub enum ResetBudgetResult {
1481 /// Budget reset successfully.
1482 Reset { next_reset_at: TimestampMs },
1483}
1484
1485// ─── check_admission_and_record ───
1486
1487#[derive(Clone, Debug, Serialize, Deserialize)]
1488pub struct CheckAdmissionArgs {
1489 pub execution_id: ExecutionId,
1490 pub now: TimestampMs,
1491 pub window_seconds: u64,
1492 pub rate_limit: u64,
1493 pub concurrency_cap: u64,
1494 #[serde(default)]
1495 pub jitter_ms: Option<u64>,
1496}
1497
1498#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1499pub enum CheckAdmissionResult {
1500 /// Admitted — execution may proceed.
1501 Admitted,
1502 /// Already admitted in this window (idempotent).
1503 AlreadyAdmitted,
1504 /// Rate limit exceeded.
1505 RateExceeded { retry_after_ms: u64 },
1506 /// Concurrency cap hit.
1507 ConcurrencyExceeded,
1508}
1509
1510// ─── release_admission ───
1511
1512#[derive(Clone, Debug, Serialize, Deserialize)]
1513pub struct ReleaseAdmissionArgs {
1514 pub execution_id: ExecutionId,
1515}
1516
1517#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1518pub enum ReleaseAdmissionResult {
1519 Released,
1520}
1521
1522// ─── block_execution_for_admission ───
1523
1524#[derive(Clone, Debug, Serialize, Deserialize)]
1525pub struct BlockExecutionArgs {
1526 pub execution_id: ExecutionId,
1527 pub blocking_reason: String,
1528 #[serde(default)]
1529 pub blocking_detail: Option<String>,
1530 pub now: TimestampMs,
1531}
1532
1533#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1534pub enum BlockExecutionResult {
1535 /// Execution blocked.
1536 Blocked,
1537}
1538
1539// ─── unblock_execution ───
1540
1541#[derive(Clone, Debug, Serialize, Deserialize)]
1542pub struct UnblockExecutionArgs {
1543 pub execution_id: ExecutionId,
1544 pub now: TimestampMs,
1545 /// Expected blocking reason (prevents stale unblock).
1546 #[serde(default)]
1547 pub expected_blocking_reason: Option<String>,
1548}
1549
1550#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1551pub enum UnblockExecutionResult {
1552 /// Execution unblocked and moved to eligible.
1553 Unblocked,
1554}
1555
1556// ═══════════════════════════════════════════════════════════════════════
1557// Phase 6 contracts: flow coordination and dependencies
1558// ═══════════════════════════════════════════════════════════════════════
1559
1560// ─── create_flow ───
1561
1562#[derive(Clone, Debug, Serialize, Deserialize)]
1563pub struct CreateFlowArgs {
1564 pub flow_id: crate::types::FlowId,
1565 pub flow_kind: String,
1566 pub namespace: Namespace,
1567 pub now: TimestampMs,
1568}
1569
1570#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1571pub enum CreateFlowResult {
1572 /// Flow created successfully.
1573 Created { flow_id: crate::types::FlowId },
1574 /// Flow already exists (idempotent).
1575 AlreadySatisfied { flow_id: crate::types::FlowId },
1576}
1577
1578// ─── add_execution_to_flow ───
1579
1580#[derive(Clone, Debug, Serialize, Deserialize)]
1581pub struct AddExecutionToFlowArgs {
1582 pub flow_id: crate::types::FlowId,
1583 pub execution_id: ExecutionId,
1584 pub now: TimestampMs,
1585}
1586
1587#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1588pub enum AddExecutionToFlowResult {
1589 /// Execution added to flow.
1590 Added {
1591 execution_id: ExecutionId,
1592 new_node_count: u32,
1593 },
1594 /// Already a member (idempotent).
1595 AlreadyMember {
1596 execution_id: ExecutionId,
1597 node_count: u32,
1598 },
1599}
1600
1601// ─── cancel_flow ───
1602
1603#[derive(Clone, Debug, Serialize, Deserialize)]
1604pub struct CancelFlowArgs {
1605 pub flow_id: crate::types::FlowId,
1606 pub reason: String,
1607 pub cancellation_policy: String,
1608 pub now: TimestampMs,
1609}
1610
1611#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1612pub enum CancelFlowResult {
1613 /// Flow cancelled and all member cancellations (if any) have completed
1614 /// synchronously. Used when `cancellation_policy != "cancel_all"`, when
1615 /// the flow has no members, when the caller opted into synchronous
1616 /// dispatch (e.g. `?wait=true`), or when the flow was already in a
1617 /// terminal state (idempotent retry).
1618 ///
1619 /// On the idempotent-retry path `member_execution_ids` may be *capped*
1620 /// at the server (default 1000 entries) to bound response bandwidth on
1621 /// flows with very large membership. The first (non-idempotent) call
1622 /// always returns the full list, so clients that need every member id
1623 /// should persist the initial response.
1624 Cancelled {
1625 cancellation_policy: String,
1626 member_execution_ids: Vec<String>,
1627 },
1628 /// Flow state was flipped to cancelled atomically, but member
1629 /// cancellations are dispatched asynchronously in the background.
1630 /// Clients may poll `GET /v1/executions/{id}/state` for each member
1631 /// execution id to track terminal state.
1632 CancellationScheduled {
1633 cancellation_policy: String,
1634 member_count: u32,
1635 member_execution_ids: Vec<String>,
1636 },
1637 /// `?wait=true` dispatch completed but one or more member cancellations
1638 /// failed mid-loop (e.g. ghost member, Lua error, transport fault after
1639 /// retries exhausted). The flow itself is still flipped to cancelled
1640 /// (atomic Lua already ran); callers SHOULD inspect
1641 /// `failed_member_execution_ids` and either retry those ids directly
1642 /// via `cancel_execution` or wait for the cancel-backlog reconciler
1643 /// to retry them in the background.
1644 ///
1645 /// Only emitted by the synchronous wait path
1646 /// ([`crate::CancelFlowArgs`] via `?wait=true`). The async path returns
1647 /// [`CancelFlowResult::CancellationScheduled`] and delegates retries
1648 /// to the reconciler — there is no visible "partial" state on the
1649 /// async path because the dispatch result is not observed inline.
1650 PartiallyCancelled {
1651 cancellation_policy: String,
1652 /// All member execution ids that the cancel_flow FCALL returned
1653 /// (i.e. the full membership at the moment of cancellation).
1654 member_execution_ids: Vec<String>,
1655 /// Strict subset of `member_execution_ids` whose per-member cancel
1656 /// FCALL returned an error. Order is deterministic (matches the
1657 /// iteration order over `member_execution_ids`).
1658 failed_member_execution_ids: Vec<String>,
1659 },
1660}
1661
1662/// RFC-017 Stage E2: result of the "header" portion of a cancel_flow
1663/// operation — the atomic flow-state flip + member enumeration.
1664///
1665/// The Server composes this with its own wait/async member-dispatch
1666/// machinery to build the wire-level [`CancelFlowResult`]. Backends
1667/// implement [`crate::engine_backend::EngineBackend::cancel_flow_header`]
1668/// (default: `Unavailable`) so the Valkey-native `ff_cancel_flow`
1669/// FCALL (with its `flow_already_terminal` idempotency branch) can be
1670/// driven through the trait without re-shaping the existing public
1671/// `cancel_flow(id, policy, wait)` signature.
1672#[derive(Clone, Debug, PartialEq, Eq)]
1673#[non_exhaustive]
1674pub enum CancelFlowHeader {
1675 /// Flow-state flipped this call. `member_execution_ids` is the
1676 /// full (uncapped) membership at flip time.
1677 Cancelled {
1678 cancellation_policy: String,
1679 member_execution_ids: Vec<String>,
1680 },
1681 /// Flow was already in a terminal state on entry. The backend has
1682 /// surfaced the *stored* `cancellation_policy`, `cancel_reason`,
1683 /// and full membership so the Server can return an idempotent
1684 /// [`CancelFlowResult::Cancelled`] without re-doing the flip.
1685 AlreadyTerminal {
1686 /// `None` only for flows cancelled by pre-E2 Lua that never
1687 /// persisted the policy field.
1688 stored_cancellation_policy: Option<String>,
1689 /// `None` when the flow was never cancel-reason-stamped.
1690 stored_cancel_reason: Option<String>,
1691 /// Full membership. Server caps to
1692 /// `ALREADY_TERMINAL_MEMBER_CAP` before wiring.
1693 member_execution_ids: Vec<String>,
1694 },
1695}
1696
1697// ─── stage_dependency_edge ───
1698
1699#[derive(Clone, Debug, Serialize, Deserialize)]
1700pub struct StageDependencyEdgeArgs {
1701 pub flow_id: crate::types::FlowId,
1702 pub edge_id: crate::types::EdgeId,
1703 pub upstream_execution_id: ExecutionId,
1704 pub downstream_execution_id: ExecutionId,
1705 #[serde(default = "default_dependency_kind")]
1706 pub dependency_kind: String,
1707 #[serde(default)]
1708 pub data_passing_ref: Option<String>,
1709 pub expected_graph_revision: u64,
1710 pub now: TimestampMs,
1711}
1712
1713fn default_dependency_kind() -> String {
1714 "success_only".to_owned()
1715}
1716
1717#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1718pub enum StageDependencyEdgeResult {
1719 /// Edge staged, new graph revision.
1720 Staged {
1721 edge_id: crate::types::EdgeId,
1722 new_graph_revision: u64,
1723 },
1724}
1725
1726// ─── apply_dependency_to_child ───
1727
1728#[derive(Clone, Debug, Serialize, Deserialize)]
1729pub struct ApplyDependencyToChildArgs {
1730 pub flow_id: crate::types::FlowId,
1731 pub edge_id: crate::types::EdgeId,
1732 /// The child execution that receives the dependency.
1733 pub downstream_execution_id: ExecutionId,
1734 pub upstream_execution_id: ExecutionId,
1735 pub graph_revision: u64,
1736 #[serde(default = "default_dependency_kind")]
1737 pub dependency_kind: String,
1738 #[serde(default)]
1739 pub data_passing_ref: Option<String>,
1740 pub now: TimestampMs,
1741}
1742
1743#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1744pub enum ApplyDependencyToChildResult {
1745 /// Dependency applied, N unsatisfied deps remaining.
1746 Applied { unsatisfied_count: u32 },
1747 /// Already applied (idempotent).
1748 AlreadyApplied,
1749}
1750
1751// ─── resolve_dependency ───
1752
1753#[derive(Clone, Debug, Serialize, Deserialize)]
1754pub struct ResolveDependencyArgs {
1755 pub edge_id: crate::types::EdgeId,
1756 /// "success", "failed", "cancelled", "expired", "skipped"
1757 pub upstream_outcome: String,
1758 pub now: TimestampMs,
1759}
1760
1761#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1762pub enum ResolveDependencyResult {
1763 /// Edge satisfied — downstream may become eligible.
1764 Satisfied,
1765 /// Edge made impossible — downstream becomes skipped.
1766 Impossible,
1767 /// Already resolved (idempotent).
1768 AlreadyResolved,
1769}
1770
1771// ─── promote_blocked_to_eligible ───
1772
1773#[derive(Clone, Debug, Serialize, Deserialize)]
1774pub struct PromoteBlockedToEligibleArgs {
1775 pub execution_id: ExecutionId,
1776 pub now: TimestampMs,
1777}
1778
1779#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1780pub enum PromoteBlockedToEligibleResult {
1781 Promoted,
1782}
1783
1784// ─── evaluate_flow_eligibility ───
1785
1786#[derive(Clone, Debug, Serialize, Deserialize)]
1787pub struct EvaluateFlowEligibilityArgs {
1788 pub execution_id: ExecutionId,
1789}
1790
1791#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1792pub enum EvaluateFlowEligibilityResult {
1793 /// Execution eligibility status.
1794 Status { status: String },
1795}
1796
1797// ─── replay_execution ───
1798
1799#[derive(Clone, Debug, Serialize, Deserialize)]
1800pub struct ReplayExecutionArgs {
1801 pub execution_id: ExecutionId,
1802 pub now: TimestampMs,
1803}
1804
1805#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1806pub enum ReplayExecutionResult {
1807 /// Replayed to runnable.
1808 Replayed { public_state: PublicState },
1809}
1810
1811// ─── get_execution (full read) ───
1812
1813/// Full execution info returned by `Server::get_execution`.
1814#[derive(Clone, Debug, Serialize, Deserialize)]
1815pub struct ExecutionInfo {
1816 pub execution_id: ExecutionId,
1817 pub namespace: String,
1818 pub lane_id: String,
1819 pub priority: i32,
1820 pub execution_kind: String,
1821 pub state_vector: StateVector,
1822 pub public_state: PublicState,
1823 pub created_at: String,
1824 /// TimestampMs (ms since epoch) when the execution's first attempt
1825 /// was started by a worker claim. Empty string until the first
1826 /// claim lands. Serialised as `Option<String>` so pre-claim reads
1827 /// deserialise cleanly even if the field is absent from the wire.
1828 #[serde(default, skip_serializing_if = "Option::is_none")]
1829 pub started_at: Option<String>,
1830 /// TimestampMs when the execution reached a terminal
1831 /// `completed`/`failed`/`cancelled`/`expired` state. Empty /
1832 /// absent while still in flight.
1833 #[serde(default, skip_serializing_if = "Option::is_none")]
1834 pub completed_at: Option<String>,
1835 pub current_attempt_index: u32,
1836 pub flow_id: Option<String>,
1837 pub blocking_detail: String,
1838}
1839
1840// ─── set_execution_tags / set_flow_tags (issue #58.4) ───
1841
1842/// Args for `ff_set_execution_tags`. Tag keys MUST match
1843/// `^[a-z][a-z0-9_]*\.` — the caller-namespace rule — or the FCALL
1844/// returns `invalid_tag_key`. Values are arbitrary strings. The map is
1845/// ordered (`BTreeMap`) so two callers submitting the same logical set
1846/// of tags produce identical ARGV.
1847#[derive(Clone, Debug, Serialize, Deserialize)]
1848pub struct SetExecutionTagsArgs {
1849 pub execution_id: ExecutionId,
1850 pub tags: BTreeMap<String, String>,
1851}
1852
1853/// Result of `ff_set_execution_tags`.
1854#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1855pub enum SetExecutionTagsResult {
1856 /// Tags written. `count` is the number of key-value pairs applied.
1857 Ok { count: u32 },
1858}
1859
1860/// Args for `ff_set_flow_tags`. Same namespace rule as
1861/// [`SetExecutionTagsArgs`]. The Lua function also lazy-migrates any
1862/// pre-58.4 reserved-namespace fields stashed inline on `flow_core` into
1863/// the new tags key.
1864#[derive(Clone, Debug, Serialize, Deserialize)]
1865pub struct SetFlowTagsArgs {
1866 pub flow_id: FlowId,
1867 pub tags: BTreeMap<String, String>,
1868}
1869
1870/// Result of `ff_set_flow_tags`.
1871#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1872pub enum SetFlowTagsResult {
1873 /// Tags written. `count` is the number of key-value pairs applied.
1874 Ok { count: u32 },
1875}
1876
1877// ─── describe_execution (issue #58.1) ───
1878
1879/// Engine-decoupled read-model for one execution.
1880///
1881/// Returned by `ff_sdk::FlowFabricWorker::describe_execution`. Consumers
1882/// consult this struct instead of reaching into Valkey's exec_core hash
1883/// directly — the engine is free to rename fields or restructure storage
1884/// under this surface.
1885///
1886/// `#[non_exhaustive]` — FF may add fields in minor releases without a
1887/// semver break. Match with `..` or use field-by-field construction.
1888#[derive(Clone, Debug, PartialEq, Eq)]
1889#[non_exhaustive]
1890pub struct ExecutionSnapshot {
1891 pub execution_id: ExecutionId,
1892 pub flow_id: Option<FlowId>,
1893 pub lane_id: LaneId,
1894 pub namespace: Namespace,
1895 pub public_state: PublicState,
1896 /// Blocking reason string (e.g. `"waiting_for_worker"`,
1897 /// `"waiting_for_delay"`, `"waiting_for_dependencies"`). `None` when
1898 /// the exec_core field is empty.
1899 pub blocking_reason: Option<String>,
1900 /// Free-form operator-readable detail explaining `blocking_reason`.
1901 /// `None` when the exec_core field is empty.
1902 pub blocking_detail: Option<String>,
1903 /// Summary of the execution's currently-active attempt. `None` when
1904 /// no attempt has been started (pre-claim) or when the exec_core
1905 /// attempt fields are all empty.
1906 pub current_attempt: Option<AttemptSummary>,
1907 /// Summary of the execution's currently-held lease. `None` when the
1908 /// execution is not held by a worker.
1909 pub current_lease: Option<LeaseSummary>,
1910 /// The waitpoint this execution is currently suspended on, if any.
1911 pub current_waitpoint: Option<WaitpointId>,
1912 pub created_at: TimestampMs,
1913 /// Timestamp of the last write that mutated exec_core. Engine-maintained.
1914 pub last_mutation_at: TimestampMs,
1915 pub total_attempt_count: u32,
1916 /// Caller-owned labels. The prefix `^[a-z][a-z0-9_]*\.` is reserved for
1917 /// consumer metadata (e.g. `cairn.task_id`); FF guarantees it will not
1918 /// write keys matching that shape. FF's own fields stay in snake_case
1919 /// without dots. Empty when no tags are set.
1920 pub tags: BTreeMap<String, String>,
1921}
1922
1923impl ExecutionSnapshot {
1924 /// Construct an [`ExecutionSnapshot`]. Present so downstream crates
1925 /// (ff-sdk's `describe_execution`) can assemble the struct despite
1926 /// the `#[non_exhaustive]` marker. Prefer adding builder-style
1927 /// helpers here over loosening `non_exhaustive`.
1928 #[allow(clippy::too_many_arguments)]
1929 pub fn new(
1930 execution_id: ExecutionId,
1931 flow_id: Option<FlowId>,
1932 lane_id: LaneId,
1933 namespace: Namespace,
1934 public_state: PublicState,
1935 blocking_reason: Option<String>,
1936 blocking_detail: Option<String>,
1937 current_attempt: Option<AttemptSummary>,
1938 current_lease: Option<LeaseSummary>,
1939 current_waitpoint: Option<WaitpointId>,
1940 created_at: TimestampMs,
1941 last_mutation_at: TimestampMs,
1942 total_attempt_count: u32,
1943 tags: BTreeMap<String, String>,
1944 ) -> Self {
1945 Self {
1946 execution_id,
1947 flow_id,
1948 lane_id,
1949 namespace,
1950 public_state,
1951 blocking_reason,
1952 blocking_detail,
1953 current_attempt,
1954 current_lease,
1955 current_waitpoint,
1956 created_at,
1957 last_mutation_at,
1958 total_attempt_count,
1959 tags,
1960 }
1961 }
1962}
1963
1964/// Currently-active attempt summary inside an [`ExecutionSnapshot`].
1965///
1966/// `#[non_exhaustive]`.
1967#[derive(Clone, Debug, PartialEq, Eq)]
1968#[non_exhaustive]
1969pub struct AttemptSummary {
1970 pub attempt_id: AttemptId,
1971 pub attempt_index: AttemptIndex,
1972}
1973
1974impl AttemptSummary {
1975 /// Construct an [`AttemptSummary`]. See [`ExecutionSnapshot::new`]
1976 /// for the rationale — `#[non_exhaustive]` blocks cross-crate
1977 /// struct-literal construction.
1978 pub fn new(attempt_id: AttemptId, attempt_index: AttemptIndex) -> Self {
1979 Self {
1980 attempt_id,
1981 attempt_index,
1982 }
1983 }
1984}
1985
1986/// Currently-held lease summary inside an [`ExecutionSnapshot`].
1987///
1988/// `#[non_exhaustive]`.
1989#[derive(Clone, Debug, PartialEq, Eq)]
1990#[non_exhaustive]
1991pub struct LeaseSummary {
1992 pub lease_epoch: LeaseEpoch,
1993 pub worker_instance_id: WorkerInstanceId,
1994 pub expires_at: TimestampMs,
1995}
1996
1997impl LeaseSummary {
1998 /// Construct a [`LeaseSummary`]. See [`ExecutionSnapshot::new`]
1999 /// for the rationale.
2000 pub fn new(
2001 lease_epoch: LeaseEpoch,
2002 worker_instance_id: WorkerInstanceId,
2003 expires_at: TimestampMs,
2004 ) -> Self {
2005 Self {
2006 lease_epoch,
2007 worker_instance_id,
2008 expires_at,
2009 }
2010 }
2011}
2012
2013// ─── Common sub-types ───
2014
2015// ─── describe_flow (issue #58.2) ───
2016
2017/// Engine-decoupled read-model for one flow.
2018///
2019/// Returned by `ff_sdk::FlowFabricWorker::describe_flow`. Consumers
2020/// consult this struct instead of reaching into Valkey's flow_core hash
2021/// directly — the engine is free to rename fields or restructure storage
2022/// under this surface.
2023///
2024/// `#[non_exhaustive]` — FF may add fields in minor releases without a
2025/// semver break. Match with `..` or use [`FlowSnapshot::new`].
2026///
2027/// # `public_flow_state`
2028///
2029/// Stored as an engine-written string literal on `flow_core`. Known
2030/// values today: `open`, `running`, `blocked`, `cancelled`, `completed`,
2031/// `failed`. Surfaced as `String` (not a typed enum) because FF does
2032/// not yet expose a `PublicFlowState` type — callers that need to act
2033/// on specific values should match on the literal. The flow_projector
2034/// writes a parallel `public_flow_state` into the flow's summary hash;
2035/// this field reflects the authoritative value on `flow_core`, which
2036/// is what mutation guards (cancel/add-member) consult.
2037///
2038/// # `tags`
2039///
2040/// Unlike [`ExecutionSnapshot::tags`] (which has a dedicated tags
2041/// hash), flow tags live inline on `flow_core`. FF's own fields are
2042/// snake_case without a `.`; any field whose name starts with
2043/// `<lowercase>.` (e.g. `cairn.task_id`) is treated as consumer-owned
2044/// metadata and routed here. An empty map means no namespaced tags
2045/// were written. The prefix convention mirrors
2046/// [`ExecutionSnapshot::tags`] — consumers should keep tag keys
2047/// namespaced (`cairn.*`, `operator.*`, etc.) so future FF field
2048/// additions don't collide.
2049#[derive(Clone, Debug, PartialEq, Eq)]
2050#[non_exhaustive]
2051pub struct FlowSnapshot {
2052 pub flow_id: FlowId,
2053 /// The `flow_kind` literal passed to `create_flow` (e.g. `dag`,
2054 /// `pipeline`). Preserved as-is; FF does not interpret it.
2055 pub flow_kind: String,
2056 pub namespace: Namespace,
2057 /// Authoritative flow state on `flow_core`. See the struct-level
2058 /// docs for the set of known values.
2059 pub public_flow_state: String,
2060 /// Monotonically increasing revision bumped on every structural
2061 /// mutation (add-member, stage-edge). Used by optimistic-concurrency
2062 /// writers via `expected_graph_revision`.
2063 pub graph_revision: u64,
2064 /// Number of member executions added so far. Never decremented.
2065 pub node_count: u32,
2066 /// Number of dependency edges staged so far. Never decremented.
2067 pub edge_count: u32,
2068 pub created_at: TimestampMs,
2069 /// Timestamp of the last write that mutated `flow_core`.
2070 /// Engine-maintained.
2071 pub last_mutation_at: TimestampMs,
2072 /// When the flow reached a terminal state via `cancel_flow`. `None`
2073 /// while the flow is live. Only written by the cancel path today;
2074 /// `completed`/`failed` terminal states do not populate this field
2075 /// (the flow_projector derives them from membership).
2076 pub cancelled_at: Option<TimestampMs>,
2077 /// Operator-supplied reason from the `cancel_flow` call. `None`
2078 /// when the flow has not been cancelled.
2079 pub cancel_reason: Option<String>,
2080 /// The `cancellation_policy` value persisted by `cancel_flow`
2081 /// (e.g. `cancel_all`, `cancel_flow_only`). `None` for flows
2082 /// cancelled before this field was persisted, or not yet cancelled.
2083 pub cancellation_policy: Option<String>,
2084 /// Consumer-owned namespaced metadata (e.g. `cairn.task_id`). See
2085 /// the struct-level docs for the routing rule.
2086 pub tags: BTreeMap<String, String>,
2087 /// RFC-016 Stage A: inbound edge groups known on this flow.
2088 ///
2089 /// One entry per downstream execution that has at least one staged
2090 /// inbound dependency edge. Populated from the
2091 /// `ff:flow:{fp:N}:<flow_id>:edgegroup:<downstream_eid>` hash —
2092 /// when that hash is absent (existing flows created before Stage A),
2093 /// the backend falls back to reading the legacy
2094 /// `deps_meta.unsatisfied_required_count` counter on the
2095 /// downstream's exec partition and reports the group as
2096 /// [`EdgeDependencyPolicy::AllOf`] with the derived counters
2097 /// (backward-compat shim — see RFC-016 §11 Stage A).
2098 ///
2099 /// Every entry in Stage A reports `policy = AllOf`; Stage B/C/D/E
2100 /// extend the variants and wire the quorum counters.
2101 pub edge_groups: Vec<EdgeGroupSnapshot>,
2102}
2103
2104impl FlowSnapshot {
2105 /// Construct a [`FlowSnapshot`]. Present so downstream crates
2106 /// (ff-sdk's `describe_flow`) can assemble the struct despite the
2107 /// `#[non_exhaustive]` marker.
2108 #[allow(clippy::too_many_arguments)]
2109 pub fn new(
2110 flow_id: FlowId,
2111 flow_kind: String,
2112 namespace: Namespace,
2113 public_flow_state: String,
2114 graph_revision: u64,
2115 node_count: u32,
2116 edge_count: u32,
2117 created_at: TimestampMs,
2118 last_mutation_at: TimestampMs,
2119 cancelled_at: Option<TimestampMs>,
2120 cancel_reason: Option<String>,
2121 cancellation_policy: Option<String>,
2122 tags: BTreeMap<String, String>,
2123 edge_groups: Vec<EdgeGroupSnapshot>,
2124 ) -> Self {
2125 Self {
2126 flow_id,
2127 flow_kind,
2128 namespace,
2129 public_flow_state,
2130 graph_revision,
2131 node_count,
2132 edge_count,
2133 created_at,
2134 last_mutation_at,
2135 cancelled_at,
2136 cancel_reason,
2137 cancellation_policy,
2138 tags,
2139 edge_groups,
2140 }
2141 }
2142}
2143
2144// ─── describe_edge / list_*_edges (issue #58.3) ───
2145
2146/// Engine-decoupled read-model for one dependency edge.
2147///
2148/// Returned by `ff_sdk::FlowFabricWorker::describe_edge`,
2149/// `list_incoming_edges`, and `list_outgoing_edges`. Consumers consult
2150/// this struct instead of reaching into Valkey's per-flow `edge:` hash
2151/// directly — the engine is free to rename hash fields or restructure
2152/// key layout under this surface.
2153///
2154/// `#[non_exhaustive]` — FF may add fields in minor releases without a
2155/// semver break. Match with `..` or use [`EdgeSnapshot::new`].
2156///
2157/// # Fields
2158///
2159/// The struct mirrors the immutable edge record written by
2160/// `ff_stage_dependency_edge` (see `lua/flow.lua`). The flow-scoped
2161/// edge hash is only ever written once, at staging time; per-execution
2162/// resolution state lives on a separate `dep:<edge_id>` hash and is not
2163/// surfaced here. The `edge_state` field therefore reflects the
2164/// staging-time literal (currently `pending`), not the downstream
2165/// execution's dep-edge state.
2166#[derive(Clone, Debug, PartialEq, Eq)]
2167#[non_exhaustive]
2168pub struct EdgeSnapshot {
2169 pub edge_id: EdgeId,
2170 pub flow_id: FlowId,
2171 pub upstream_execution_id: ExecutionId,
2172 pub downstream_execution_id: ExecutionId,
2173 /// The `dependency_kind` literal (e.g. `success_only`) from
2174 /// `stage_dependency_edge`. Preserved as-is; FF does not interpret
2175 /// it on reads.
2176 pub dependency_kind: String,
2177 /// The satisfaction-condition literal stamped at staging time
2178 /// (e.g. `all_required`).
2179 pub satisfaction_condition: String,
2180 /// Optional opaque handle to a data-passing artifact. `None` when
2181 /// the stored field is empty (the most common case).
2182 pub data_passing_ref: Option<String>,
2183 /// Edge-state literal on the flow-scoped edge hash. Written once
2184 /// at staging as `pending`; this hash is immutable on the flow
2185 /// side. Per-execution resolution state is tracked separately on
2186 /// the child's `dep:<edge_id>` hash.
2187 pub edge_state: String,
2188 pub created_at: TimestampMs,
2189 /// Origin of the edge (e.g. `engine`). Preserved as-is.
2190 pub created_by: String,
2191}
2192
2193/// Direction marker for [`crate::engine_backend::EngineBackend::list_edges`].
2194///
2195/// Carries the subject execution whose adjacency side the caller wants
2196/// to list — mirrors the internal `AdjacencySide + subject_eid` pair
2197/// the ff-sdk free-fn `list_edges_from_set` already uses. Keeping
2198/// direction + subject fused in one enum means the trait method has a
2199/// single `direction` parameter rather than a `(side, eid)` pair, and
2200/// the backend impl can't forget one of the two.
2201///
2202/// * `Outgoing { from_node }` — the caller wants every edge whose
2203/// `upstream_execution_id == from_node`. Corresponds to the
2204/// `out:<execution_id>` adjacency SET under the execution's flow
2205/// partition.
2206/// * `Incoming { to_node }` — the caller wants every edge whose
2207/// `downstream_execution_id == to_node`. Corresponds to the
2208/// `in:<execution_id>` adjacency SET under the execution's flow
2209/// partition.
2210#[derive(Clone, Debug, PartialEq, Eq)]
2211pub enum EdgeDirection {
2212 /// Edges leaving `from_node` — `out:` adjacency SET.
2213 Outgoing {
2214 /// The subject execution whose outgoing edges to list.
2215 from_node: ExecutionId,
2216 },
2217 /// Edges landing on `to_node` — `in:` adjacency SET.
2218 Incoming {
2219 /// The subject execution whose incoming edges to list.
2220 to_node: ExecutionId,
2221 },
2222}
2223
2224impl EdgeDirection {
2225 /// Return the subject execution id regardless of direction. Shared
2226 /// helper for backend impls that need the execution id for the
2227 /// initial `HGET exec_core.flow_id` lookup (flow routing) before
2228 /// they know which adjacency SET to read.
2229 pub fn subject(&self) -> &ExecutionId {
2230 match self {
2231 Self::Outgoing { from_node } => from_node,
2232 Self::Incoming { to_node } => to_node,
2233 }
2234 }
2235}
2236
2237impl EdgeSnapshot {
2238 /// Construct an [`EdgeSnapshot`]. Present so downstream crates
2239 /// (ff-sdk's `describe_edge` / `list_*_edges`) can assemble the
2240 /// struct despite the `#[non_exhaustive]` marker.
2241 #[allow(clippy::too_many_arguments)]
2242 pub fn new(
2243 edge_id: EdgeId,
2244 flow_id: FlowId,
2245 upstream_execution_id: ExecutionId,
2246 downstream_execution_id: ExecutionId,
2247 dependency_kind: String,
2248 satisfaction_condition: String,
2249 data_passing_ref: Option<String>,
2250 edge_state: String,
2251 created_at: TimestampMs,
2252 created_by: String,
2253 ) -> Self {
2254 Self {
2255 edge_id,
2256 flow_id,
2257 upstream_execution_id,
2258 downstream_execution_id,
2259 dependency_kind,
2260 satisfaction_condition,
2261 data_passing_ref,
2262 edge_state,
2263 created_at,
2264 created_by,
2265 }
2266 }
2267}
2268
2269// ─── RFC-016 edge-group policy (Stage A) ───
2270
2271/// Policy controlling how an inbound edge group's satisfaction is
2272/// decided.
2273///
2274/// Stage A honours only [`EdgeDependencyPolicy::AllOf`] — the two
2275/// quorum variants exist so the wire/snapshot surface is stable for
2276/// Stage B/C/D's resolver extensions, but
2277/// [`crate::engine_backend::EngineBackend::set_edge_group_policy`]
2278/// rejects them with [`crate::engine_error::EngineError::Validation`]
2279/// until Stage B lands.
2280///
2281/// `#[non_exhaustive]` — future stages may add variants (e.g.
2282/// `Threshold` — see RFC-016 §10.3) without a semver break. Construct
2283/// via the [`EdgeDependencyPolicy::all_of`], [`EdgeDependencyPolicy::any_of`],
2284/// and [`EdgeDependencyPolicy::quorum`] helpers.
2285#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2286#[non_exhaustive]
2287#[serde(tag = "kind", rename_all = "snake_case")]
2288pub enum EdgeDependencyPolicy {
2289 /// Today's behavior: every edge in the inbound group must be
2290 /// satisfied (RFC-007 `all_required` + `success_only`).
2291 AllOf,
2292 /// k-of-n where k==1 — satisfied on the first upstream success.
2293 /// Stage A: rejected on
2294 /// [`crate::engine_backend::EngineBackend::set_edge_group_policy`];
2295 /// resolver emits nothing for this variant yet.
2296 AnyOf {
2297 #[serde(rename = "on_satisfied")]
2298 on_satisfied: OnSatisfied,
2299 },
2300 /// k-of-n quorum. Stage A: rejected on
2301 /// [`crate::engine_backend::EngineBackend::set_edge_group_policy`].
2302 Quorum {
2303 k: u32,
2304 #[serde(rename = "on_satisfied")]
2305 on_satisfied: OnSatisfied,
2306 },
2307}
2308
2309impl EdgeDependencyPolicy {
2310 /// Construct the default all-of policy (RFC-007 behavior).
2311 pub fn all_of() -> Self {
2312 Self::AllOf
2313 }
2314
2315 /// Construct an any-of policy — reserved for Stage B.
2316 pub fn any_of(on_satisfied: OnSatisfied) -> Self {
2317 Self::AnyOf { on_satisfied }
2318 }
2319
2320 /// Construct a quorum policy — reserved for Stage B.
2321 pub fn quorum(k: u32, on_satisfied: OnSatisfied) -> Self {
2322 Self::Quorum { k, on_satisfied }
2323 }
2324
2325 /// Stable string label used for wire format + metric labels.
2326 /// `all_of` | `any_of` | `quorum`.
2327 pub fn variant_str(&self) -> &'static str {
2328 match self {
2329 Self::AllOf => "all_of",
2330 Self::AnyOf { .. } => "any_of",
2331 Self::Quorum { .. } => "quorum",
2332 }
2333 }
2334}
2335
2336/// Policy for unfinished sibling upstreams once the quorum is met.
2337///
2338/// `#[non_exhaustive]` — RFC-016 §10.5 rejects a third variant today
2339/// but keeps the door open. Construct via [`OnSatisfied::cancel_remaining`]
2340/// / [`OnSatisfied::let_run`].
2341#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2342#[non_exhaustive]
2343#[serde(rename_all = "snake_case")]
2344pub enum OnSatisfied {
2345 /// Default. Cancel any still-running siblings once quorum met.
2346 CancelRemaining,
2347 /// Let stragglers finish; their terminals update counters for
2348 /// observability only (one-shot downstream).
2349 LetRun,
2350}
2351
2352impl OnSatisfied {
2353 /// Construct the default `cancel_remaining` disposition.
2354 pub fn cancel_remaining() -> Self {
2355 Self::CancelRemaining
2356 }
2357
2358 /// Construct the `let_run` disposition.
2359 pub fn let_run() -> Self {
2360 Self::LetRun
2361 }
2362
2363 /// Stable string label for wire format.
2364 pub fn variant_str(&self) -> &'static str {
2365 match self {
2366 Self::CancelRemaining => "cancel_remaining",
2367 Self::LetRun => "let_run",
2368 }
2369 }
2370}
2371
2372/// Edge-group lifecycle state (Stage A exposes only `pending` +
2373/// `satisfied` + `impossible`; `cancelled` reserved for Stage C).
2374#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
2375#[non_exhaustive]
2376#[serde(rename_all = "snake_case")]
2377pub enum EdgeGroupState {
2378 Pending,
2379 Satisfied,
2380 Impossible,
2381 Cancelled,
2382}
2383
2384impl EdgeGroupState {
2385 pub fn from_literal(s: &str) -> Self {
2386 match s {
2387 "satisfied" => Self::Satisfied,
2388 "impossible" => Self::Impossible,
2389 "cancelled" => Self::Cancelled,
2390 _ => Self::Pending,
2391 }
2392 }
2393
2394 pub fn as_str(&self) -> &'static str {
2395 match self {
2396 Self::Pending => "pending",
2397 Self::Satisfied => "satisfied",
2398 Self::Impossible => "impossible",
2399 Self::Cancelled => "cancelled",
2400 }
2401 }
2402}
2403
2404/// Snapshot of one inbound edge group (per downstream execution).
2405///
2406/// Exposed via [`FlowSnapshot::edge_groups`]. Stage A only populates
2407/// `AllOf` groups and their counters; Stage B/C add `failed` /
2408/// `skipped` / `satisfied_at` wiring for the quorum variants.
2409///
2410/// `#[non_exhaustive]` — future stages will add fields (`satisfied_at`,
2411/// `failed_count` write-path, `cancel_siblings_pending`). Match with
2412/// `..` or use [`EdgeGroupSnapshot::new`].
2413#[derive(Clone, Debug, PartialEq, Eq)]
2414#[non_exhaustive]
2415pub struct EdgeGroupSnapshot {
2416 pub downstream_execution_id: ExecutionId,
2417 pub policy: EdgeDependencyPolicy,
2418 pub total_deps: u32,
2419 pub satisfied_count: u32,
2420 pub failed_count: u32,
2421 pub skipped_count: u32,
2422 pub running_count: u32,
2423 pub group_state: EdgeGroupState,
2424}
2425
2426impl EdgeGroupSnapshot {
2427 #[allow(clippy::too_many_arguments)]
2428 pub fn new(
2429 downstream_execution_id: ExecutionId,
2430 policy: EdgeDependencyPolicy,
2431 total_deps: u32,
2432 satisfied_count: u32,
2433 failed_count: u32,
2434 skipped_count: u32,
2435 running_count: u32,
2436 group_state: EdgeGroupState,
2437 ) -> Self {
2438 Self {
2439 downstream_execution_id,
2440 policy,
2441 total_deps,
2442 satisfied_count,
2443 failed_count,
2444 skipped_count,
2445 running_count,
2446 group_state,
2447 }
2448 }
2449}
2450
2451// ─── set_edge_group_policy (RFC-016 §6.1) ───
2452
2453#[derive(Clone, Debug, Serialize, Deserialize)]
2454pub struct SetEdgeGroupPolicyArgs {
2455 pub flow_id: FlowId,
2456 pub downstream_execution_id: ExecutionId,
2457 pub policy: EdgeDependencyPolicy,
2458 pub now: TimestampMs,
2459}
2460
2461#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2462pub enum SetEdgeGroupPolicyResult {
2463 /// Policy stored (fresh write).
2464 Set,
2465 /// Policy already stored with an identical value (idempotent).
2466 AlreadySet,
2467}
2468
2469// ─── list_flows (issue #185) ───
2470
2471/// Typed flow-lifecycle status surfaced on [`FlowSummary`].
2472///
2473/// Mirrors the free-form `public_flow_state` literal that FF's flow
2474/// lifecycle writes onto `flow_core` (known values: `open`, `running`,
2475/// `blocked`, `cancelled`, `completed`, `failed` — see [`FlowSnapshot`]).
2476/// The three "active" runtime states (`open`, `running`, `blocked`)
2477/// collapse to [`FlowStatus::Active`] here — callers that need the
2478/// exact runtime sub-state should use [`FlowSnapshot::public_flow_state`]
2479/// via [`crate::engine_backend::EngineBackend::describe_flow`]. `failed`
2480/// maps to [`FlowStatus::Failed`].
2481///
2482/// `#[non_exhaustive]` so future lifecycle states (if FF introduces
2483/// any) can be added without a semver break.
2484#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
2485#[non_exhaustive]
2486pub enum FlowStatus {
2487 /// `open` / `running` / `blocked` — flow is still live on the engine.
2488 Active,
2489 /// Terminal success: all members reached a successful terminal state
2490 /// and the flow projector flipped `public_flow_state` to `completed`.
2491 Completed,
2492 /// Terminal failure: one or more members failed and the flow
2493 /// projector flipped `public_flow_state` to `failed`.
2494 Failed,
2495 /// Cancelled by an operator via `cancel_flow`.
2496 Cancelled,
2497 /// The stored `public_flow_state` literal is present but not a
2498 /// known value. The raw literal is preserved on
2499 /// [`FlowSnapshot::public_flow_state`] — callers that need to act
2500 /// on it should fall back to [`crate::engine_backend::EngineBackend::describe_flow`].
2501 Unknown,
2502}
2503
2504impl FlowStatus {
2505 /// Map the raw `public_flow_state` literal stored on `flow_core`
2506 /// to a typed [`FlowStatus`]. Unknown literals surface as
2507 /// [`FlowStatus::Unknown`] so the list surface stays forwards-
2508 /// compatible with future engine-side state additions.
2509 pub fn from_public_flow_state(raw: &str) -> Self {
2510 match raw {
2511 "open" | "running" | "blocked" => Self::Active,
2512 "completed" => Self::Completed,
2513 "failed" => Self::Failed,
2514 "cancelled" => Self::Cancelled,
2515 _ => Self::Unknown,
2516 }
2517 }
2518}
2519
2520/// Lightweight per-flow projection returned by
2521/// [`crate::engine_backend::EngineBackend::list_flows`].
2522///
2523/// Deliberately narrower than [`FlowSnapshot`] — listing pages serve
2524/// dashboard-style enumerations where the caller does not want to pay
2525/// for the full `flow_core` hash on every row. Consumers that need
2526/// revision / node-count / tags / cancel metadata should fan out to
2527/// [`crate::engine_backend::EngineBackend::describe_flow`] for the
2528/// specific ids they care about.
2529///
2530/// `#[non_exhaustive]` — FF may add fields in minor releases without
2531/// a semver break. Match with `..` or use [`FlowSummary::new`].
2532#[derive(Clone, Debug, PartialEq, Eq)]
2533#[non_exhaustive]
2534pub struct FlowSummary {
2535 pub flow_id: FlowId,
2536 /// Timestamp (ms since unix epoch) `flow_core.created_at` was
2537 /// stamped. Mirrors [`FlowSnapshot::created_at`]; kept typed so
2538 /// callers that want raw millis can read `.0`.
2539 pub created_at: TimestampMs,
2540 /// Typed projection of `flow_core.public_flow_state`. See
2541 /// [`FlowStatus`] for the mapping.
2542 pub status: FlowStatus,
2543}
2544
2545impl FlowSummary {
2546 /// Construct a [`FlowSummary`]. Present so downstream crates can
2547 /// assemble the struct despite the `#[non_exhaustive]` marker.
2548 pub fn new(flow_id: FlowId, created_at: TimestampMs, status: FlowStatus) -> Self {
2549 Self {
2550 flow_id,
2551 created_at,
2552 status,
2553 }
2554 }
2555}
2556
2557/// One page of [`FlowSummary`] rows returned by
2558/// [`crate::engine_backend::EngineBackend::list_flows`].
2559///
2560/// `next_cursor` is `Some(last_flow_id)` when at least one more row
2561/// may exist on the partition — callers forward it verbatim as the
2562/// next call's `cursor` argument to continue iteration. `None` means
2563/// the listing is exhausted. Cursor semantics match the Postgres
2564/// `WHERE flow_id > $cursor ORDER BY flow_id LIMIT $limit` pattern
2565/// (see the trait method's rustdoc).
2566///
2567/// `#[non_exhaustive]` — FF may add summary-level fields (total count,
2568/// partition hint) in future minor releases without a semver break.
2569#[derive(Clone, Debug, PartialEq, Eq)]
2570#[non_exhaustive]
2571pub struct ListFlowsPage {
2572 pub flows: Vec<FlowSummary>,
2573 pub next_cursor: Option<FlowId>,
2574}
2575
2576impl ListFlowsPage {
2577 /// Construct a [`ListFlowsPage`]. Present so downstream crates can
2578 /// assemble the struct despite the `#[non_exhaustive]` marker.
2579 pub fn new(flows: Vec<FlowSummary>, next_cursor: Option<FlowId>) -> Self {
2580 Self { flows, next_cursor }
2581 }
2582}
2583
2584/// Summary of state after a mutation, returned by many functions.
2585#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2586pub struct StateSummary {
2587 pub state_vector: StateVector,
2588 pub current_attempt_index: AttemptIndex,
2589}
2590
2591#[cfg(test)]
2592mod tests {
2593 use super::*;
2594 use crate::types::FlowId;
2595
2596 #[test]
2597 fn create_execution_args_serde() {
2598 let config = crate::partition::PartitionConfig::default();
2599 let args = CreateExecutionArgs {
2600 execution_id: ExecutionId::for_flow(&FlowId::new(), &config),
2601 namespace: Namespace::new("test"),
2602 lane_id: LaneId::new("default"),
2603 execution_kind: "llm_call".to_owned(),
2604 input_payload: b"hello".to_vec(),
2605 payload_encoding: Some("json".to_owned()),
2606 priority: 0,
2607 creator_identity: "test-user".to_owned(),
2608 idempotency_key: None,
2609 tags: HashMap::new(),
2610 policy: None,
2611 delay_until: None,
2612 execution_deadline_at: None,
2613 partition_id: 42,
2614 now: TimestampMs::now(),
2615 };
2616 let json = serde_json::to_string(&args).unwrap();
2617 let parsed: CreateExecutionArgs = serde_json::from_str(&json).unwrap();
2618 assert_eq!(args.execution_id, parsed.execution_id);
2619 }
2620
2621 #[test]
2622 fn claim_result_serde() {
2623 let config = crate::partition::PartitionConfig::default();
2624 let result = ClaimExecutionResult::Claimed(ClaimedExecution {
2625 execution_id: ExecutionId::for_flow(&FlowId::new(), &config),
2626 lease_id: LeaseId::new(),
2627 lease_epoch: LeaseEpoch::new(1),
2628 attempt_index: AttemptIndex::new(0),
2629 attempt_id: AttemptId::new(),
2630 attempt_type: AttemptType::Initial,
2631 lease_expires_at: TimestampMs::from_millis(1000),
2632 });
2633 let json = serde_json::to_string(&result).unwrap();
2634 let parsed: ClaimExecutionResult = serde_json::from_str(&json).unwrap();
2635 assert_eq!(result, parsed);
2636 }
2637
2638 // ── StreamCursor (issue #92) ──
2639
2640 #[test]
2641 fn stream_cursor_display_matches_wire_tokens() {
2642 assert_eq!(StreamCursor::Start.to_string(), "start");
2643 assert_eq!(StreamCursor::End.to_string(), "end");
2644 assert_eq!(StreamCursor::At("123".into()).to_string(), "123");
2645 assert_eq!(StreamCursor::At("123-4".into()).to_string(), "123-4");
2646 }
2647
2648 #[test]
2649 fn stream_cursor_to_wire_maps_to_valkey_markers() {
2650 assert_eq!(StreamCursor::Start.to_wire(), "-");
2651 assert_eq!(StreamCursor::End.to_wire(), "+");
2652 assert_eq!(StreamCursor::At("0-0".into()).to_wire(), "0-0");
2653 assert_eq!(StreamCursor::At("17-3".into()).to_wire(), "17-3");
2654 }
2655
2656 #[test]
2657 fn stream_cursor_from_str_accepts_wire_tokens() {
2658 use std::str::FromStr;
2659 assert_eq!(
2660 StreamCursor::from_str("start").unwrap(),
2661 StreamCursor::Start
2662 );
2663 assert_eq!(StreamCursor::from_str("end").unwrap(), StreamCursor::End);
2664 assert_eq!(
2665 StreamCursor::from_str("123").unwrap(),
2666 StreamCursor::At("123".into())
2667 );
2668 assert_eq!(
2669 StreamCursor::from_str("0-0").unwrap(),
2670 StreamCursor::At("0-0".into())
2671 );
2672 assert_eq!(
2673 StreamCursor::from_str("1713100800150-0").unwrap(),
2674 StreamCursor::At("1713100800150-0".into())
2675 );
2676 }
2677
2678 #[test]
2679 fn stream_cursor_from_str_rejects_bare_markers() {
2680 use std::str::FromStr;
2681 assert!(matches!(
2682 StreamCursor::from_str("-"),
2683 Err(StreamCursorParseError::BareMarkerRejected(s)) if s == "-"
2684 ));
2685 assert!(matches!(
2686 StreamCursor::from_str("+"),
2687 Err(StreamCursorParseError::BareMarkerRejected(s)) if s == "+"
2688 ));
2689 }
2690
2691 #[test]
2692 fn stream_cursor_from_str_rejects_empty() {
2693 use std::str::FromStr;
2694 assert_eq!(
2695 StreamCursor::from_str(""),
2696 Err(StreamCursorParseError::Empty)
2697 );
2698 }
2699
2700 #[test]
2701 fn stream_cursor_from_str_rejects_malformed() {
2702 use std::str::FromStr;
2703 for bad in [
2704 "abc", "-1", "1-", "-1-2", "1-2-3", "1.2", "1 2", "Start", "END",
2705 ] {
2706 assert!(
2707 matches!(
2708 StreamCursor::from_str(bad),
2709 Err(StreamCursorParseError::Malformed(_))
2710 ),
2711 "must reject {bad:?}",
2712 );
2713 }
2714 }
2715
2716 #[test]
2717 fn stream_cursor_from_str_rejects_non_ascii() {
2718 use std::str::FromStr;
2719 assert!(matches!(
2720 StreamCursor::from_str("1\u{2013}2"),
2721 Err(StreamCursorParseError::Malformed(_))
2722 ));
2723 }
2724
2725 #[test]
2726 fn stream_cursor_serde_round_trip() {
2727 for c in [
2728 StreamCursor::Start,
2729 StreamCursor::End,
2730 StreamCursor::At("0-0".into()),
2731 StreamCursor::At("1713100800150-0".into()),
2732 ] {
2733 let json = serde_json::to_string(&c).unwrap();
2734 let back: StreamCursor = serde_json::from_str(&json).unwrap();
2735 assert_eq!(back, c);
2736 }
2737 }
2738
2739 #[test]
2740 fn stream_cursor_serializes_as_bare_string() {
2741 assert_eq!(
2742 serde_json::to_string(&StreamCursor::Start).unwrap(),
2743 r#""start""#
2744 );
2745 assert_eq!(
2746 serde_json::to_string(&StreamCursor::End).unwrap(),
2747 r#""end""#
2748 );
2749 assert_eq!(
2750 serde_json::to_string(&StreamCursor::At("123-0".into())).unwrap(),
2751 r#""123-0""#
2752 );
2753 }
2754
2755 #[test]
2756 fn stream_cursor_deserialize_rejects_bare_markers() {
2757 assert!(serde_json::from_str::<StreamCursor>(r#""-""#).is_err());
2758 assert!(serde_json::from_str::<StreamCursor>(r#""+""#).is_err());
2759 }
2760
2761 #[test]
2762 fn stream_cursor_from_beginning_is_zero_zero() {
2763 assert_eq!(
2764 StreamCursor::from_beginning(),
2765 StreamCursor::At("0-0".into())
2766 );
2767 }
2768
2769 #[test]
2770 fn stream_cursor_is_concrete_classifies_variants() {
2771 assert!(!StreamCursor::Start.is_concrete());
2772 assert!(!StreamCursor::End.is_concrete());
2773 assert!(StreamCursor::At("0-0".into()).is_concrete());
2774 assert!(StreamCursor::At("123-0".into()).is_concrete());
2775 assert!(StreamCursor::from_beginning().is_concrete());
2776 }
2777
2778 #[test]
2779 fn stream_cursor_into_wire_string_moves_without_cloning() {
2780 assert_eq!(StreamCursor::Start.into_wire_string(), "-");
2781 assert_eq!(StreamCursor::End.into_wire_string(), "+");
2782 assert_eq!(StreamCursor::At("17-3".into()).into_wire_string(), "17-3");
2783 }
2784}
2785
2786// ─── list_executions ───
2787
2788/// Summary of an execution for list views.
2789#[derive(Clone, Debug, Serialize, Deserialize)]
2790pub struct ExecutionSummary {
2791 pub execution_id: ExecutionId,
2792 pub namespace: String,
2793 pub lane_id: String,
2794 pub execution_kind: String,
2795 pub public_state: String,
2796 pub priority: i32,
2797 pub created_at: String,
2798}
2799
2800/// Result of a list_executions query.
2801#[derive(Clone, Debug, Serialize, Deserialize)]
2802pub struct ListExecutionsResult {
2803 pub executions: Vec<ExecutionSummary>,
2804 pub total_returned: usize,
2805}
2806
2807// ─── list_lanes (issue #184) ───
2808
2809/// One page of lane ids returned by
2810/// [`crate::engine_backend::EngineBackend::list_lanes`].
2811///
2812/// Lanes are global (not partition-scoped) — the backend enumerates
2813/// every registered lane, sorts by [`LaneId`] name, and returns a
2814/// `limit`-sized slice starting after `cursor` (exclusive).
2815///
2816/// `next_cursor` is `Some(last_lane_in_page)` when more pages remain
2817/// and `None` when the caller has read the final page. Callers that
2818/// want the full list loop until `next_cursor` is `None`, threading
2819/// the previous page's `next_cursor` into the next call's `cursor`
2820/// argument.
2821///
2822/// `#[non_exhaustive]` — FF may add fields (e.g. a `total` hint) in
2823/// minor releases without a semver break.
2824#[derive(Clone, Debug, PartialEq, Eq)]
2825#[non_exhaustive]
2826pub struct ListLanesPage {
2827 /// The lanes in this page, sorted by [`LaneId`] name.
2828 pub lanes: Vec<LaneId>,
2829 /// Cursor for the next page (exclusive). `None` ⇒ final page.
2830 pub next_cursor: Option<LaneId>,
2831}
2832
2833impl ListLanesPage {
2834 /// Construct a [`ListLanesPage`]. Present so downstream crates
2835 /// (ff-backend-valkey's `list_lanes` impl) can assemble the
2836 /// struct despite the `#[non_exhaustive]` marker.
2837 pub fn new(lanes: Vec<LaneId>, next_cursor: Option<LaneId>) -> Self {
2838 Self { lanes, next_cursor }
2839 }
2840}
2841
2842// ─── list_suspended ───
2843
2844/// One entry in a [`ListSuspendedPage`] — a suspended execution and
2845/// the reason it is blocked, answering an operator's "what's this
2846/// waiting on?" without a follow-up round-trip.
2847///
2848/// `reason` carries the free-form `reason_code` recorded by the
2849/// suspending worker at `lua/suspension.lua` (HSET `suspension:current
2850/// reason_code`). It is a `String`, not a closed enum: the suspension
2851/// pipeline accepts arbitrary caller-supplied codes (typical values
2852/// are `"signal"`, `"timer"`, `"children"`, `"join"`, but consumers
2853/// embed bespoke codes). A future enum projection can classify
2854/// known codes once the set is frozen; until then, callers that want
2855/// structured routing MUST match on the string explicitly.
2856#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2857#[non_exhaustive]
2858pub struct SuspendedExecutionEntry {
2859 /// Execution currently in `lifecycle_phase=suspended`.
2860 pub execution_id: ExecutionId,
2861 /// Score stored on the per-lane suspended ZSET — the scheduled
2862 /// `timeout_at` in milliseconds, or the `9999999999999` sentinel
2863 /// when no timeout was set (see `lua/suspension.lua`).
2864 pub suspended_at_ms: i64,
2865 /// Free-form reason code from `suspension:current.reason_code`.
2866 /// Empty string when the suspension hash is absent or does not
2867 /// carry a `reason_code` field (older records). See the struct
2868 /// rustdoc for the deliberate-String rationale.
2869 pub reason: String,
2870}
2871
2872impl SuspendedExecutionEntry {
2873 /// Construct a new entry. Preferred over direct field init for
2874 /// `#[non_exhaustive]` forward-compat.
2875 pub fn new(execution_id: ExecutionId, suspended_at_ms: i64, reason: String) -> Self {
2876 Self {
2877 execution_id,
2878 suspended_at_ms,
2879 reason,
2880 }
2881 }
2882}
2883
2884/// One cursor-paginated page of suspended executions.
2885///
2886/// Pagination is cursor-based (not offset/limit) so a Valkey backend
2887/// can resume a partition scan from the last seen execution id and a
2888/// future Postgres backend can do keyset pagination on
2889/// `executions WHERE state='suspended'`. The cursor is opaque to
2890/// callers: pass `next_cursor` back as the `cursor` argument to the
2891/// next [`EngineBackend::list_suspended`] call to fetch the next
2892/// page. `None` means the stream is exhausted.
2893///
2894/// [`EngineBackend::list_suspended`]: crate::engine_backend::EngineBackend::list_suspended
2895#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2896#[non_exhaustive]
2897pub struct ListSuspendedPage {
2898 /// Entries on this page, ordered by ascending `suspended_at_ms`
2899 /// (timeout order) with `execution_id` as a lex tiebreak.
2900 pub entries: Vec<SuspendedExecutionEntry>,
2901 /// Resume-point for the next page. `None` when no further
2902 /// entries remain in the partition.
2903 pub next_cursor: Option<ExecutionId>,
2904}
2905
2906impl ListSuspendedPage {
2907 /// Construct a new page. Preferred over direct field init for
2908 /// `#[non_exhaustive]` forward-compat.
2909 pub fn new(entries: Vec<SuspendedExecutionEntry>, next_cursor: Option<ExecutionId>) -> Self {
2910 Self {
2911 entries,
2912 next_cursor,
2913 }
2914 }
2915}
2916
2917// ─── list_executions ───
2918
2919/// One page of partition-scoped execution ids returned by
2920/// [`EngineBackend::list_executions`](crate::engine_backend::EngineBackend::list_executions).
2921///
2922/// Pagination is forward-only and cursor-based. `next_cursor` carries
2923/// the last `ExecutionId` emitted in `executions` iff another page is
2924/// available; callers pass that id back as the next call's `cursor`
2925/// (exclusive start). `next_cursor = None` signals end-of-stream.
2926///
2927/// `#[non_exhaustive]` — FF may add fields (e.g. `approximate_total`)
2928/// in minor releases without a semver break. Use
2929/// [`ListExecutionsPage::new`] for cross-crate construction.
2930#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2931#[non_exhaustive]
2932pub struct ListExecutionsPage {
2933 /// Execution ids on this page, in ascending lexicographic order.
2934 pub executions: Vec<ExecutionId>,
2935 /// Exclusive cursor to request the next page. `None` ⇒ no more
2936 /// results.
2937 pub next_cursor: Option<ExecutionId>,
2938}
2939
2940impl ListExecutionsPage {
2941 /// Construct a [`ListExecutionsPage`]. Present so downstream
2942 /// crates can assemble the struct despite the `#[non_exhaustive]`
2943 /// marker.
2944 pub fn new(executions: Vec<ExecutionId>, next_cursor: Option<ExecutionId>) -> Self {
2945 Self { executions, next_cursor }
2946 }
2947}
2948
2949// ─── rotate_waitpoint_hmac_secret ───
2950
2951/// Args for `ff_rotate_waitpoint_hmac_secret`. Rotates the HMAC signing
2952/// kid on ONE partition. Callers fan out across every partition themselves
2953/// (ff-server does the parallel fan-out in `rotate_waitpoint_secret`;
2954/// direct-Valkey consumers mirror the pattern).
2955///
2956/// "now" is derived server-side from `redis.call("TIME")` inside the FCALL
2957/// (consistency with `validate_waitpoint_token` and flow scanners).
2958/// `grace_ms` is a duration, not a clock value, so carrying it from the
2959/// caller is safe.
2960#[derive(Clone, Debug)]
2961pub struct RotateWaitpointHmacSecretArgs {
2962 pub new_kid: String,
2963 pub new_secret_hex: String,
2964 /// Grace window in ms. Must be non-negative. Tokens signed by the
2965 /// outgoing kid remain valid for `grace_ms` after this rotation.
2966 pub grace_ms: u64,
2967}
2968
2969/// Outcome of a single-partition rotation.
2970#[derive(Clone, Debug, PartialEq, Eq)]
2971pub enum RotateWaitpointHmacSecretOutcome {
2972 /// Installed the new kid. `previous_kid` is `None` on bootstrap
2973 /// (no prior `current_kid`). `gc_count` counts expired kids reaped
2974 /// during this rotation.
2975 Rotated {
2976 previous_kid: Option<String>,
2977 new_kid: String,
2978 gc_count: u32,
2979 },
2980 /// Exact replay — same kid + same secret already installed. Safe
2981 /// operator retry; no state change.
2982 Noop { kid: String },
2983}
2984
2985// ─── rotate_waitpoint_hmac_secret_all ───
2986
2987/// Args for [`EngineBackend::rotate_waitpoint_hmac_secret_all`] — the
2988/// cluster-wide / backend-native rotation of the waitpoint HMAC
2989/// signing kid.
2990///
2991/// **v0.7 migration-master Q4:** a single additive trait method
2992/// replaces the per-partition fan-out that direct-Valkey consumers
2993/// hand-rolled via
2994/// [`ff_sdk::admin::rotate_waitpoint_hmac_secret_all_partitions`].
2995/// On Valkey it fans out N FCALLs (one per execution partition);
2996/// on Postgres (Wave 4) it resolves to a single INSERT against the
2997/// global `ff_waitpoint_hmac(kid, secret, rotated_at)` table (no
2998/// partition_id column). Consumers prefer this method for clarity —
2999/// the pre-existing free-fn + per-partition surface stays available
3000/// for backwards compat.
3001///
3002/// `#[non_exhaustive]` with a [`Self::new`] constructor per the
3003/// project memory-rule (unbuildable non_exhaustive types are a dead
3004/// API).
3005#[derive(Clone, Debug)]
3006#[non_exhaustive]
3007pub struct RotateWaitpointHmacSecretAllArgs {
3008 pub new_kid: String,
3009 pub new_secret_hex: String,
3010 /// Grace window in ms for tokens signed by the outgoing kid.
3011 /// Duration (not a clock value), identical to
3012 /// [`RotateWaitpointHmacSecretArgs::grace_ms`].
3013 pub grace_ms: u64,
3014}
3015
3016impl RotateWaitpointHmacSecretAllArgs {
3017 /// Build the args. Keeping the constructor so consumers don't
3018 /// struct-literal past the `#[non_exhaustive]` marker.
3019 pub fn new(
3020 new_kid: impl Into<String>,
3021 new_secret_hex: impl Into<String>,
3022 grace_ms: u64,
3023 ) -> Self {
3024 Self {
3025 new_kid: new_kid.into(),
3026 new_secret_hex: new_secret_hex.into(),
3027 grace_ms,
3028 }
3029 }
3030}
3031
3032/// Per-partition entry of [`RotateWaitpointHmacSecretAllResult`].
3033/// Mirrors [`ff_sdk::admin::PartitionRotationOutcome`] but typed at
3034/// the `ff-core` layer so both Valkey and Postgres backends return
3035/// the same shape without a Postgres→ferriskey dep.
3036///
3037/// On backends with no partition concept (Postgres) the entry list
3038/// has length 1 with `partition = 0` and the outcome of the global
3039/// row write.
3040#[derive(Debug)]
3041#[non_exhaustive]
3042pub struct RotateWaitpointHmacSecretAllEntry {
3043 pub partition: u16,
3044 /// The per-partition (or global) rotation outcome. Per-partition
3045 /// failures are surfaced as inner `Err` so the fan-out can report
3046 /// partial success — matching the existing SDK free-fn contract.
3047 pub result: Result<RotateWaitpointHmacSecretOutcome, crate::engine_error::EngineError>,
3048}
3049
3050impl RotateWaitpointHmacSecretAllEntry {
3051 pub fn new(
3052 partition: u16,
3053 result: Result<RotateWaitpointHmacSecretOutcome, crate::engine_error::EngineError>,
3054 ) -> Self {
3055 Self { partition, result }
3056 }
3057}
3058
3059/// Result of [`EngineBackend::rotate_waitpoint_hmac_secret_all`].
3060///
3061/// The Valkey backend returns one entry per execution partition. The
3062/// Postgres backend (Wave 4) will return a single-entry vec with
3063/// `partition = 0` since the Postgres schema stores one global row
3064/// per kid (Q4 §adjudication). Consumers that want a uniform "did
3065/// ALL rotations succeed?" view inspect each entry's `.result`.
3066#[derive(Debug)]
3067#[non_exhaustive]
3068pub struct RotateWaitpointHmacSecretAllResult {
3069 pub entries: Vec<RotateWaitpointHmacSecretAllEntry>,
3070}
3071
3072impl RotateWaitpointHmacSecretAllResult {
3073 pub fn new(entries: Vec<RotateWaitpointHmacSecretAllEntry>) -> Self {
3074 Self { entries }
3075 }
3076}
3077
3078// ─── list_waitpoint_hmac_kids ───
3079
3080#[derive(Clone, Debug, PartialEq, Eq)]
3081pub struct ListWaitpointHmacKidsArgs {}
3082
3083/// Snapshot of the waitpoint HMAC keystore on ONE partition.
3084#[derive(Clone, Debug, PartialEq, Eq)]
3085pub struct WaitpointHmacKids {
3086 /// The currently-signing kid. `None` if uninitialized.
3087 pub current_kid: Option<String>,
3088 /// Kids that still validate existing tokens but no longer sign
3089 /// new ones. Order is Lua HGETALL traversal order — callers that
3090 /// need a stable sort should sort by `expires_at_ms`.
3091 pub verifying: Vec<VerifyingKid>,
3092}
3093
3094#[derive(Clone, Debug, PartialEq, Eq)]
3095pub struct VerifyingKid {
3096 pub kid: String,
3097 pub expires_at_ms: i64,
3098}
3099
3100// ═══════════════════════════════════════════════════════════════════════
3101// RFC-013 Stage 1d: EngineBackend::suspend typed args + outcome
3102// ═══════════════════════════════════════════════════════════════════════
3103//
3104// `SuspendExecutionArgs` / `SuspendExecutionResult` above remain the
3105// wire-level Lua-ARGV mirror used by the backend serializer. The types
3106// below are the public trait-surface shapes RFC-013 §2.2–§2.6 specifies.
3107//
3108// Every type in this block is `#[non_exhaustive]` per the RFC §2.2.1
3109// memory-rule compliance note; each gets a constructor so external-crate
3110// consumers can build them without struct-literal access.
3111
3112use crate::backend::WaitpointHmac;
3113
3114/// Partition-scoped idempotency key for retry-safe `EngineBackend::suspend`.
3115///
3116/// See RFC-013 §2.2 — when set on [`SuspendArgs::idempotency_key`], the
3117/// backend dedups the call on `(partition, execution_id, idempotency_key)`
3118/// and a second `suspend` with the same triple returns the first call's
3119/// [`SuspendOutcome`] verbatim. Absent a key, `suspend` is NOT retry-
3120/// idempotent; callers must describe-and-reconcile per §3.1.
3121///
3122/// Follows the `UsageDimensions::dedup_key` pattern — opaque to the
3123/// engine, byte-compared at the partition scope.
3124#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
3125#[serde(transparent)]
3126pub struct IdempotencyKey(String);
3127
3128impl IdempotencyKey {
3129 /// Construct from any stringy input. Empty strings are accepted;
3130 /// the backend treats an empty key as "no dedup" at the serialize
3131 /// step so `Some(IdempotencyKey::new(""))` is functionally the same
3132 /// as `None`.
3133 pub fn new(key: impl Into<String>) -> Self {
3134 Self(key.into())
3135 }
3136
3137 /// Borrow the underlying string.
3138 pub fn as_str(&self) -> &str {
3139 &self.0
3140 }
3141}
3142
3143impl std::fmt::Display for IdempotencyKey {
3144 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3145 f.write_str(&self.0)
3146 }
3147}
3148
3149/// v1 signal-match predicate inside [`ResumeCondition::Single`].
3150///
3151/// RFC-013 §2.4 — `ByName(String)` matches a single concrete signal
3152/// name; `Wildcard` matches any delivered signal. RFC-014 may extend
3153/// (payload predicates, pattern matching) — `#[non_exhaustive]`.
3154#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
3155#[non_exhaustive]
3156pub enum SignalMatcher {
3157 /// Match by exact signal name.
3158 ByName(String),
3159 /// Match any signal delivered to the waitpoint.
3160 Wildcard,
3161}
3162
3163/// Hard cap on composite-condition nesting depth (RFC-014 §5.4
3164/// invariant 4; §5.5 cap rationale). Soft-cap: bumping requires only
3165/// this constant + the cap-rationale paragraph in RFC-014 §5.5 — no
3166/// wire-format change. Keep in sync.
3167pub const MAX_COMPOSITE_DEPTH: usize = 4;
3168
3169/// RFC-013 reserves this enum slot; RFC-014 populates it with the
3170/// concrete composition vocabulary (`AllOf` + `Count`). The enum is
3171/// `#[non_exhaustive]` so RFC-016 or later RFCs may add variants
3172/// (`AnyOf` has been explicitly rejected per RFC-014 §2.3 in favour of
3173/// `Count { n: 1, .. }`; the guard exists for orthogonal future work).
3174#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
3175#[serde(tag = "kind")]
3176#[non_exhaustive]
3177pub enum CompositeBody {
3178 /// All listed sub-conditions must be satisfied. Order-independent.
3179 /// Once satisfied, further signals to member waitpoints are observed
3180 /// but do not re-open satisfaction. RFC-014 §2.1.
3181 AllOf {
3182 members: Vec<ResumeCondition>,
3183 },
3184 /// At least `n` distinct satisfiers (by [`CountKind`]) must match.
3185 /// `matcher` optionally constrains participating signals; `None`
3186 /// lets any signal on any of `waitpoints` count. RFC-014 §2.1.
3187 Count {
3188 n: u32,
3189 count_kind: CountKind,
3190 #[serde(default, skip_serializing_if = "Option::is_none")]
3191 matcher: Option<SignalMatcher>,
3192 waitpoints: Vec<String>,
3193 },
3194}
3195
3196/// How `Count` nodes distinguish satisfiers. RFC-014 §2.1 + §3.2.
3197#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
3198#[non_exhaustive]
3199pub enum CountKind {
3200 /// n distinct `waitpoint_id`s in `waitpoints` must fire.
3201 DistinctWaitpoints,
3202 /// n distinct `signal_id`s across the waitpoint set.
3203 DistinctSignals,
3204 /// n distinct `source_type:source_identity` tuples.
3205 DistinctSources,
3206}
3207
3208impl CompositeBody {
3209 /// `AllOf { members }` constructor (RFC-014 §10.3 SDK surface).
3210 pub fn all_of(members: impl IntoIterator<Item = ResumeCondition>) -> Self {
3211 Self::AllOf {
3212 members: members.into_iter().collect(),
3213 }
3214 }
3215
3216 /// `Count` constructor with explicit kind + waitpoint set.
3217 pub fn count(
3218 n: u32,
3219 count_kind: CountKind,
3220 matcher: Option<SignalMatcher>,
3221 waitpoints: impl IntoIterator<Item = String>,
3222 ) -> Self {
3223 Self::Count {
3224 n,
3225 count_kind,
3226 matcher,
3227 waitpoints: waitpoints.into_iter().collect(),
3228 }
3229 }
3230}
3231
3232/// Declarative resume condition for [`SuspendArgs::resume_condition`].
3233///
3234/// RFC-013 §2.4 — typed replacement for the SDK's former
3235/// `ConditionMatcher` / `resume_condition_json` pair.
3236#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
3237#[non_exhaustive]
3238pub enum ResumeCondition {
3239 /// Single waitpoint-key match with a predicate. `matcher` is
3240 /// evaluated against every signal delivered to `waitpoint_key`.
3241 Single {
3242 waitpoint_key: String,
3243 matcher: SignalMatcher,
3244 },
3245 /// Operator-only resume — no signal satisfies; only an explicit
3246 /// operator resume closes the waitpoint.
3247 OperatorOnly,
3248 /// Pure-timeout suspension. No signal satisfier; the waitpoint
3249 /// resolves only via `timeout_behavior` at `timeout_at`. Requires
3250 /// `SuspendArgs::timeout_at` to be `Some(_)` — otherwise the
3251 /// Rust-side validator rejects as `timeout_only_without_deadline`.
3252 TimeoutOnly,
3253 /// Multi-condition composition; RFC-014 defines the body.
3254 Composite(CompositeBody),
3255}
3256
3257/// RFC-014 §5.1 validation error shape. Emitted by
3258/// [`ResumeCondition::validate_composite`] when a composite fails a
3259/// structural / cardinality invariant at suspend-time, before any Valkey
3260/// call. Carries a human-readable `detail` per §5.1.1.
3261#[derive(Clone, Debug, PartialEq, Eq)]
3262pub struct CompositeValidationError {
3263 pub detail: String,
3264}
3265
3266impl CompositeValidationError {
3267 fn new(detail: impl Into<String>) -> Self {
3268 Self {
3269 detail: detail.into(),
3270 }
3271 }
3272}
3273
3274impl ResumeCondition {
3275 /// RFC-014 §10.3 builder — `AllOf` across N distinct waitpoints,
3276 /// each member a `Single { matcher: Wildcard }` leaf. Canonical
3277 /// Pattern 3 shape for heterogeneous-subsystem "all fired"
3278 /// semantics (e.g. `db-migration-complete` + `cache-warmed` +
3279 /// `feature-flag-set`).
3280 ///
3281 /// Callers that need per-waitpoint matchers should construct the
3282 /// tree directly via
3283 /// [`ResumeCondition::Composite(CompositeBody::all_of(..))`].
3284 pub fn all_of_waitpoints<I, S>(waitpoint_keys: I) -> Self
3285 where
3286 I: IntoIterator<Item = S>,
3287 S: Into<String>,
3288 {
3289 let members: Vec<ResumeCondition> = waitpoint_keys
3290 .into_iter()
3291 .map(|k| ResumeCondition::Single {
3292 waitpoint_key: k.into(),
3293 matcher: SignalMatcher::Wildcard,
3294 })
3295 .collect();
3296 ResumeCondition::Composite(CompositeBody::AllOf { members })
3297 }
3298
3299 /// Collect every distinct `waitpoint_key` the condition targets.
3300 /// Used at suspend-time to validate the condition's wp set against
3301 /// `SuspendArgs.waitpoints` (RFC-014 §5.1 multi-binding cross-
3302 /// check). Order follows tree DFS, de-duplicated preserving first
3303 /// occurrence.
3304 pub fn referenced_waitpoint_keys(&self) -> Vec<String> {
3305 let mut out: Vec<String> = Vec::new();
3306 let mut push = |k: &str| {
3307 if !out.iter().any(|e| e == k) {
3308 out.push(k.to_owned());
3309 }
3310 };
3311 fn walk(cond: &ResumeCondition, push: &mut dyn FnMut(&str)) {
3312 match cond {
3313 ResumeCondition::Single { waitpoint_key, .. } => push(waitpoint_key),
3314 ResumeCondition::Composite(body) => walk_body(body, push),
3315 _ => {}
3316 }
3317 }
3318 fn walk_body(body: &CompositeBody, push: &mut dyn FnMut(&str)) {
3319 match body {
3320 CompositeBody::AllOf { members } => {
3321 for m in members {
3322 walk(m, push);
3323 }
3324 }
3325 CompositeBody::Count { waitpoints, .. } => {
3326 for w in waitpoints {
3327 push(w.as_str());
3328 }
3329 }
3330 }
3331 }
3332 walk(self, &mut push);
3333 out
3334 }
3335
3336 /// Validate RFC-014 structural invariants on a composite condition.
3337 /// Single / OperatorOnly / TimeoutOnly return Ok — they carry no
3338 /// composite body. Checks cover:
3339 /// * `AllOf { members: [] }` — §5.1 `allof_empty_members`
3340 /// * `Count { n: 0 }` — §5.1 `count_n_zero`
3341 /// * `Count { waitpoints: [] }` — §5.1 `count_waitpoints_empty`
3342 /// * `Count { n > waitpoints.len(), DistinctWaitpoints }` — §5.1
3343 /// `count_exceeds_waitpoint_set`
3344 /// * depth > [`MAX_COMPOSITE_DEPTH`] — §5.1 `condition_depth_exceeded`
3345 pub fn validate_composite(&self) -> Result<(), CompositeValidationError> {
3346 match self {
3347 ResumeCondition::Composite(body) => validate_body(body, 1, ""),
3348 _ => Ok(()),
3349 }
3350 }
3351}
3352
3353fn validate_body(
3354 body: &CompositeBody,
3355 depth: usize,
3356 path: &str,
3357) -> Result<(), CompositeValidationError> {
3358 if depth > MAX_COMPOSITE_DEPTH {
3359 return Err(CompositeValidationError::new(format!(
3360 "depth {} exceeds cap {} at path {}",
3361 depth,
3362 MAX_COMPOSITE_DEPTH,
3363 if path.is_empty() { "<root>" } else { path }
3364 )));
3365 }
3366 match body {
3367 CompositeBody::AllOf { members } => {
3368 if members.is_empty() {
3369 return Err(CompositeValidationError::new(format!(
3370 "allof_empty_members at path {}",
3371 if path.is_empty() { "<root>" } else { path }
3372 )));
3373 }
3374 for (i, m) in members.iter().enumerate() {
3375 let child_path = if path.is_empty() {
3376 format!("members[{i}]")
3377 } else {
3378 format!("{path}.members[{i}]")
3379 };
3380 if let ResumeCondition::Composite(inner) = m {
3381 validate_body(inner, depth + 1, &child_path)?;
3382 }
3383 // Leaf `Single` / operator / timeout needs no further
3384 // structural checks — RFC-013 already constrains them.
3385 }
3386 Ok(())
3387 }
3388 CompositeBody::Count {
3389 n,
3390 count_kind,
3391 waitpoints,
3392 ..
3393 } => {
3394 if *n == 0 {
3395 return Err(CompositeValidationError::new(format!(
3396 "count_n_zero at path {}",
3397 if path.is_empty() { "<root>" } else { path }
3398 )));
3399 }
3400 if waitpoints.is_empty() {
3401 return Err(CompositeValidationError::new(format!(
3402 "count_waitpoints_empty at path {}",
3403 if path.is_empty() { "<root>" } else { path }
3404 )));
3405 }
3406 if matches!(count_kind, CountKind::DistinctWaitpoints)
3407 && (*n as usize) > waitpoints.len()
3408 {
3409 return Err(CompositeValidationError::new(format!(
3410 "count_exceeds_waitpoint_set: n={} > waitpoints.len()={} at path {}",
3411 n,
3412 waitpoints.len(),
3413 if path.is_empty() { "<root>" } else { path }
3414 )));
3415 }
3416 Ok(())
3417 }
3418 }
3419}
3420
3421/// Where a satisfied suspension routes back to.
3422///
3423/// v1 ships only [`ResumeTarget::Runnable`] — execution returns to
3424/// `runnable` and goes through normal scheduling.
3425#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
3426#[non_exhaustive]
3427pub enum ResumeTarget {
3428 Runnable,
3429}
3430
3431/// Resume-side policy carried alongside [`ResumeCondition`].
3432///
3433/// RFC-013 §2.5 — what happens when the condition is satisfied. Fields
3434/// mirror the `resume_policy_json` the backend serializer writes to Lua
3435/// (RFC-004 §Resume policy fields).
3436#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
3437#[non_exhaustive]
3438pub struct ResumePolicy {
3439 pub resume_target: ResumeTarget,
3440 pub consume_matched_signals: bool,
3441 pub retain_signal_buffer_until_closed: bool,
3442 #[serde(default, skip_serializing_if = "Option::is_none")]
3443 pub resume_delay_ms: Option<u64>,
3444 pub close_waitpoint_on_resume: bool,
3445}
3446
3447impl ResumePolicy {
3448 /// Canonical v1 defaults (RFC-013 §2.2.1):
3449 /// * `resume_target = Runnable`
3450 /// * `consume_matched_signals = true`
3451 /// * `retain_signal_buffer_until_closed = false`
3452 /// * `resume_delay_ms = None`
3453 /// * `close_waitpoint_on_resume = true`
3454 pub fn normal() -> Self {
3455 Self {
3456 resume_target: ResumeTarget::Runnable,
3457 consume_matched_signals: true,
3458 retain_signal_buffer_until_closed: false,
3459 resume_delay_ms: None,
3460 close_waitpoint_on_resume: true,
3461 }
3462 }
3463}
3464
3465/// Timeout behavior at the suspension deadline (RFC-004 §Timeout Behavior).
3466#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
3467#[non_exhaustive]
3468pub enum TimeoutBehavior {
3469 Fail,
3470 Cancel,
3471 Expire,
3472 AutoResumeWithTimeoutSignal,
3473 /// v2 per RFC-004 Implementation Notes; enum slot present for
3474 /// additive RFC-014/RFC-015 landing.
3475 Escalate,
3476}
3477
3478impl TimeoutBehavior {
3479 /// Lua-side string encoding. Matches the wire values Lua's
3480 /// `ff_expire_suspension` matches on.
3481 pub fn as_wire_str(self) -> &'static str {
3482 match self {
3483 Self::Fail => "fail",
3484 Self::Cancel => "cancel",
3485 Self::Expire => "expire",
3486 Self::AutoResumeWithTimeoutSignal => "auto_resume_with_timeout_signal",
3487 Self::Escalate => "escalate",
3488 }
3489 }
3490}
3491
3492/// Reason category for a suspension (RFC-004 §Suspension Reason Categories).
3493#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
3494#[non_exhaustive]
3495pub enum SuspensionReasonCode {
3496 WaitingForSignal,
3497 WaitingForApproval,
3498 WaitingForCallback,
3499 WaitingForToolResult,
3500 WaitingForOperatorReview,
3501 PausedByPolicy,
3502 PausedByBudget,
3503 StepBoundary,
3504 ManualPause,
3505}
3506
3507impl SuspensionReasonCode {
3508 pub fn as_wire_str(self) -> &'static str {
3509 match self {
3510 Self::WaitingForSignal => "waiting_for_signal",
3511 Self::WaitingForApproval => "waiting_for_approval",
3512 Self::WaitingForCallback => "waiting_for_callback",
3513 Self::WaitingForToolResult => "waiting_for_tool_result",
3514 Self::WaitingForOperatorReview => "waiting_for_operator_review",
3515 Self::PausedByPolicy => "paused_by_policy",
3516 Self::PausedByBudget => "paused_by_budget",
3517 Self::StepBoundary => "step_boundary",
3518 Self::ManualPause => "manual_pause",
3519 }
3520 }
3521}
3522
3523/// Who requested the suspension.
3524#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
3525#[non_exhaustive]
3526pub enum SuspensionRequester {
3527 Worker,
3528 Operator,
3529 Policy,
3530 SystemTimeoutPolicy,
3531}
3532
3533impl SuspensionRequester {
3534 pub fn as_wire_str(self) -> &'static str {
3535 match self {
3536 Self::Worker => "worker",
3537 Self::Operator => "operator",
3538 Self::Policy => "policy",
3539 Self::SystemTimeoutPolicy => "system_timeout_policy",
3540 }
3541 }
3542}
3543
3544/// How the waitpoint resource backing a [`SuspendArgs`] is obtained.
3545///
3546/// RFC-013 §2.2 — `Fresh` mints a new waitpoint as part of `suspend`;
3547/// `UsePending` activates a waitpoint previously issued via
3548/// `EngineBackend::create_waitpoint`.
3549#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
3550#[non_exhaustive]
3551pub enum WaitpointBinding {
3552 Fresh {
3553 waitpoint_id: WaitpointId,
3554 waitpoint_key: String,
3555 },
3556 UsePending {
3557 waitpoint_id: WaitpointId,
3558 },
3559}
3560
3561impl WaitpointBinding {
3562 /// Mint a fresh binding with a random `waitpoint_id` (UUID v4) and
3563 /// `waitpoint_key = "wpk:<uuid>"`.
3564 pub fn fresh() -> Self {
3565 let wp_id = WaitpointId::new();
3566 let key = format!("wpk:{wp_id}");
3567 Self::Fresh {
3568 waitpoint_id: wp_id,
3569 waitpoint_key: key,
3570 }
3571 }
3572
3573 /// Construct a `UsePending` binding from a pending waitpoint
3574 /// previously issued by `create_waitpoint`. The HMAC token is
3575 /// resolved Lua-side from the partition's waitpoint hash at
3576 /// `suspend` time (RFC-013 §5.1).
3577 pub fn use_pending(pending: &crate::backend::PendingWaitpoint) -> Self {
3578 Self::UsePending {
3579 waitpoint_id: pending.waitpoint_id.clone(),
3580 }
3581 }
3582}
3583
3584/// Trait-surface input to [`EngineBackend::suspend`] (RFC-013 §2.2 +
3585/// RFC-014 Pattern 3 widening).
3586///
3587/// Built via [`SuspendArgs::new`] + `with_*` setters; direct struct-
3588/// literal construction across crate boundaries is not possible
3589/// (`#[non_exhaustive]`).
3590///
3591/// ## Waitpoints
3592///
3593/// `waitpoints` is a non-empty `Vec<WaitpointBinding>`. The first entry
3594/// is the "primary" binding (accessible via [`primary`](Self::primary))
3595/// and carries the `current_waitpoint_id` written onto `exec_core` for
3596/// operator visibility. Additional entries land in Valkey as their own
3597/// waitpoint hashes / signal streams / HMAC tokens, enabling RFC-014
3598/// Pattern 3 `AllOf { members: [Single{wp1}, Single{wp2}, ...] }` across
3599/// distinct heterogeneous subsystems.
3600///
3601/// [`SuspendArgs::new`] takes exactly the primary binding; call
3602/// [`with_waitpoint`](Self::with_waitpoint) to append further bindings
3603/// (the RFC-014 builder API).
3604#[derive(Clone, Debug, Serialize, Deserialize)]
3605#[non_exhaustive]
3606pub struct SuspendArgs {
3607 pub suspension_id: SuspensionId,
3608 /// RFC-014 Pattern 3: all waitpoint bindings for this suspension.
3609 /// Guaranteed non-empty; `waitpoints[0]` is the primary.
3610 pub waitpoints: Vec<WaitpointBinding>,
3611 pub resume_condition: ResumeCondition,
3612 pub resume_policy: ResumePolicy,
3613 pub reason_code: SuspensionReasonCode,
3614 pub requested_by: SuspensionRequester,
3615 #[serde(default, skip_serializing_if = "Option::is_none")]
3616 pub timeout_at: Option<TimestampMs>,
3617 pub timeout_behavior: TimeoutBehavior,
3618 #[serde(default, skip_serializing_if = "Option::is_none")]
3619 pub continuation_metadata_pointer: Option<String>,
3620 pub now: TimestampMs,
3621 #[serde(default, skip_serializing_if = "Option::is_none")]
3622 pub idempotency_key: Option<IdempotencyKey>,
3623}
3624
3625impl SuspendArgs {
3626 /// Build a minimal `SuspendArgs` for a worker-originated suspension.
3627 ///
3628 /// `waitpoint` becomes the primary binding. Append additional
3629 /// bindings with [`with_waitpoint`](Self::with_waitpoint) (RFC-014
3630 /// Pattern 3) or replace the set with
3631 /// [`with_waitpoints`](Self::with_waitpoints).
3632 ///
3633 /// Defaults: `requested_by = Worker`, `timeout_at = None`,
3634 /// `timeout_behavior = Fail`, `continuation_metadata_pointer = None`,
3635 /// `idempotency_key = None`.
3636 pub fn new(
3637 suspension_id: SuspensionId,
3638 waitpoint: WaitpointBinding,
3639 resume_condition: ResumeCondition,
3640 resume_policy: ResumePolicy,
3641 reason_code: SuspensionReasonCode,
3642 now: TimestampMs,
3643 ) -> Self {
3644 Self {
3645 suspension_id,
3646 waitpoints: vec![waitpoint],
3647 resume_condition,
3648 resume_policy,
3649 reason_code,
3650 requested_by: SuspensionRequester::Worker,
3651 timeout_at: None,
3652 timeout_behavior: TimeoutBehavior::Fail,
3653 continuation_metadata_pointer: None,
3654 now,
3655 idempotency_key: None,
3656 }
3657 }
3658
3659 /// Primary binding — `waitpoints[0]`. Guaranteed present by
3660 /// construction.
3661 pub fn primary(&self) -> &WaitpointBinding {
3662 &self.waitpoints[0]
3663 }
3664
3665 pub fn with_timeout(mut self, at: TimestampMs, behavior: TimeoutBehavior) -> Self {
3666 self.timeout_at = Some(at);
3667 self.timeout_behavior = behavior;
3668 self
3669 }
3670
3671 pub fn with_requester(mut self, requester: SuspensionRequester) -> Self {
3672 self.requested_by = requester;
3673 self
3674 }
3675
3676 pub fn with_continuation_metadata_pointer(mut self, p: String) -> Self {
3677 self.continuation_metadata_pointer = Some(p);
3678 self
3679 }
3680
3681 pub fn with_idempotency_key(mut self, key: IdempotencyKey) -> Self {
3682 self.idempotency_key = Some(key);
3683 self
3684 }
3685
3686 /// RFC-014 Pattern 3 — append a further waitpoint binding to this
3687 /// suspension. Each additional binding yields its own waitpoint
3688 /// hash, signal stream, condition hash and HMAC token in Valkey,
3689 /// but all share the suspension record and composite evaluator
3690 /// under one `suspension:current`.
3691 ///
3692 /// Ordering: the primary (from [`SuspendArgs::new`]) stays at
3693 /// `waitpoints[0]`; subsequent `with_waitpoint` calls append at the
3694 /// tail.
3695 pub fn with_waitpoint(mut self, binding: WaitpointBinding) -> Self {
3696 self.waitpoints.push(binding);
3697 self
3698 }
3699
3700 /// RFC-014 Pattern 3 — replace the full binding vector in one call.
3701 /// Must be non-empty; an empty Vec is a programmer error and will
3702 /// be rejected by the backend's `validate_suspend_args` with
3703 /// `waitpoints_empty`.
3704 pub fn with_waitpoints(mut self, bindings: Vec<WaitpointBinding>) -> Self {
3705 self.waitpoints = bindings;
3706 self
3707 }
3708}
3709
3710/// Shared "what happened on the waitpoint" payload carried in both
3711/// [`SuspendOutcome`] variants.
3712///
3713/// For Pattern 3 (RFC-014) — multi-waitpoint suspensions — the primary
3714/// binding's identity lives at the top level (`waitpoint_id` /
3715/// `waitpoint_key` / `waitpoint_token`) and remaining bindings are
3716/// exposed via `additional_waitpoints`, each carrying its own minted
3717/// HMAC token so external signallers can deliver to any of the N
3718/// waitpoints the suspension is listening on.
3719#[derive(Clone, Debug, PartialEq, Eq)]
3720#[non_exhaustive]
3721pub struct SuspendOutcomeDetails {
3722 pub suspension_id: SuspensionId,
3723 pub waitpoint_id: WaitpointId,
3724 pub waitpoint_key: String,
3725 pub waitpoint_token: WaitpointHmac,
3726 /// RFC-014 Pattern 3 extras (beyond the primary). Empty for
3727 /// single-waitpoint suspensions (patterns 1 + 2); carries one
3728 /// entry per additional binding for Pattern 3.
3729 pub additional_waitpoints: Vec<AdditionalWaitpointBinding>,
3730}
3731
3732/// RFC-014 Pattern 3 — per-binding identity + HMAC token for
3733/// waitpoints beyond the primary. Structure mirrors the top-level
3734/// fields on [`SuspendOutcomeDetails`].
3735#[derive(Clone, Debug, PartialEq, Eq)]
3736#[non_exhaustive]
3737pub struct AdditionalWaitpointBinding {
3738 pub waitpoint_id: WaitpointId,
3739 pub waitpoint_key: String,
3740 pub waitpoint_token: WaitpointHmac,
3741}
3742
3743impl AdditionalWaitpointBinding {
3744 pub fn new(
3745 waitpoint_id: WaitpointId,
3746 waitpoint_key: String,
3747 waitpoint_token: WaitpointHmac,
3748 ) -> Self {
3749 Self {
3750 waitpoint_id,
3751 waitpoint_key,
3752 waitpoint_token,
3753 }
3754 }
3755}
3756
3757impl SuspendOutcomeDetails {
3758 pub fn new(
3759 suspension_id: SuspensionId,
3760 waitpoint_id: WaitpointId,
3761 waitpoint_key: String,
3762 waitpoint_token: WaitpointHmac,
3763 ) -> Self {
3764 Self {
3765 suspension_id,
3766 waitpoint_id,
3767 waitpoint_key,
3768 waitpoint_token,
3769 additional_waitpoints: Vec::new(),
3770 }
3771 }
3772
3773 /// Attach RFC-014 Pattern 3 additional-waitpoint bindings. The
3774 /// primary binding stays at the top-level fields; `extras` lands
3775 /// in [`additional_waitpoints`](Self::additional_waitpoints).
3776 pub fn with_additional_waitpoints(
3777 mut self,
3778 extras: Vec<AdditionalWaitpointBinding>,
3779 ) -> Self {
3780 self.additional_waitpoints = extras;
3781 self
3782 }
3783}
3784
3785/// Trait-surface output from [`EngineBackend::suspend`] (RFC-013 §2.3).
3786///
3787/// Two variants encode the "lease released" vs "lease retained" split.
3788/// See §2.3 for the runtime-enforcement semantics.
3789#[derive(Clone, Debug, PartialEq, Eq)]
3790#[non_exhaustive]
3791pub enum SuspendOutcome {
3792 /// The worker's pre-suspend handle is no longer lease-bearing; a
3793 /// fresh `HandleKind::Suspended` handle supersedes it.
3794 Suspended {
3795 details: SuspendOutcomeDetails,
3796 handle: crate::backend::Handle,
3797 },
3798 /// Buffered signals on a pending waitpoint already satisfied the
3799 /// condition at suspension time; the lease is retained and the
3800 /// caller's pre-suspend handle remains valid.
3801 AlreadySatisfied { details: SuspendOutcomeDetails },
3802}
3803
3804impl SuspendOutcome {
3805 /// Borrow the shared details regardless of variant.
3806 pub fn details(&self) -> &SuspendOutcomeDetails {
3807 match self {
3808 Self::Suspended { details, .. } => details,
3809 Self::AlreadySatisfied { details } => details,
3810 }
3811 }
3812}
3813
3814// `EngineBackend::suspend` type re-exports for `ff_core::backend::*`
3815// consumers. The `backend` module re-exports these below so external
3816// crates can reach them via the idiomatic `ff_core::backend` path that
3817// already sources the other trait-surface types (RFC-013 §9.1).
3818
3819// ─── RFC-017 Stage A — trait-expansion Args/Result types ─────────────
3820//
3821// Per RFC-017 §5.1.1: every struct/enum introduced here is
3822// `#[non_exhaustive]` and ships with a `pub fn new(...)` constructor so
3823// additive field growth post-v0.8 does not force cross-crate churn.
3824
3825// ─── claim_for_worker ───
3826
3827/// Inputs to `EngineBackend::claim_for_worker` (RFC-017 §5, §7). The
3828/// Valkey impl forwards to `ff_scheduler::Scheduler::claim_for_worker`;
3829/// the Postgres impl forwards to its own scheduler module. The trait
3830/// method hides the backend-specific dispatch behind one shape.
3831#[non_exhaustive]
3832#[derive(Clone, Debug)]
3833pub struct ClaimForWorkerArgs {
3834 pub lane_id: LaneId,
3835 pub worker_id: WorkerId,
3836 pub worker_instance_id: WorkerInstanceId,
3837 pub worker_capabilities: std::collections::BTreeSet<String>,
3838 pub grant_ttl_ms: u64,
3839}
3840
3841impl ClaimForWorkerArgs {
3842 /// Required-field constructor. Optional fields today: none — kept
3843 /// for forward-compat so a future optional (e.g. `deadline_ms`)
3844 /// does not break callers using the builder pattern.
3845 pub fn new(
3846 lane_id: LaneId,
3847 worker_id: WorkerId,
3848 worker_instance_id: WorkerInstanceId,
3849 worker_capabilities: std::collections::BTreeSet<String>,
3850 grant_ttl_ms: u64,
3851 ) -> Self {
3852 Self {
3853 lane_id,
3854 worker_id,
3855 worker_instance_id,
3856 worker_capabilities,
3857 grant_ttl_ms,
3858 }
3859 }
3860}
3861
3862/// Outcome of `EngineBackend::claim_for_worker`. `None`-like shape
3863/// modelled as an enum so additive variants (e.g. `BackPressured {
3864/// retry_after_ms }`) do not force a wire break.
3865#[non_exhaustive]
3866#[derive(Clone, Debug, PartialEq, Eq)]
3867pub enum ClaimForWorkerOutcome {
3868 /// No eligible execution on this lane at this scan cycle.
3869 NoWork,
3870 /// Grant issued — worker proceeds to `claim_from_grant`.
3871 Granted(ClaimGrant),
3872}
3873
3874impl ClaimForWorkerOutcome {
3875 /// Build the `NoWork` variant.
3876 pub fn no_work() -> Self {
3877 Self::NoWork
3878 }
3879 /// Build the `Granted` variant.
3880 pub fn granted(grant: ClaimGrant) -> Self {
3881 Self::Granted(grant)
3882 }
3883}
3884
3885// ─── list_pending_waitpoints ───
3886
3887/// Inputs to `EngineBackend::list_pending_waitpoints` (RFC-017 §5, §8).
3888/// Pagination is part of the signature so a flow with 10k pending
3889/// waitpoints cannot force a single-round-trip read regardless of
3890/// backend.
3891#[non_exhaustive]
3892#[derive(Clone, Debug)]
3893pub struct ListPendingWaitpointsArgs {
3894 pub execution_id: ExecutionId,
3895 /// Exclusive cursor — `None` starts from the beginning.
3896 pub after: Option<WaitpointId>,
3897 /// Max page size. `None` → backend default (100). Backend-enforced
3898 /// cap: 1000.
3899 pub limit: Option<u32>,
3900}
3901
3902impl ListPendingWaitpointsArgs {
3903 pub fn new(execution_id: ExecutionId) -> Self {
3904 Self {
3905 execution_id,
3906 after: None,
3907 limit: None,
3908 }
3909 }
3910 pub fn with_after(mut self, after: WaitpointId) -> Self {
3911 self.after = Some(after);
3912 self
3913 }
3914 pub fn with_limit(mut self, limit: u32) -> Self {
3915 self.limit = Some(limit);
3916 self
3917 }
3918}
3919
3920/// Page of pending-waitpoint entries. Stage A preserves the existing
3921/// `PendingWaitpointInfo` shape; the §8 schema rewrite (HMAC
3922/// sanitisation + `(token_kid, token_fingerprint)` additive fields)
3923/// ships in Stage D alongside the HTTP wire-format deprecation.
3924#[non_exhaustive]
3925#[derive(Clone, Debug)]
3926pub struct ListPendingWaitpointsResult {
3927 pub entries: Vec<PendingWaitpointInfo>,
3928 /// Forward-only continuation cursor — `None` signals end-of-stream.
3929 pub next_cursor: Option<WaitpointId>,
3930}
3931
3932impl ListPendingWaitpointsResult {
3933 pub fn new(entries: Vec<PendingWaitpointInfo>) -> Self {
3934 Self {
3935 entries,
3936 next_cursor: None,
3937 }
3938 }
3939 pub fn with_next_cursor(mut self, cursor: WaitpointId) -> Self {
3940 self.next_cursor = Some(cursor);
3941 self
3942 }
3943}
3944
3945// ─── report_usage_admin ───
3946
3947/// Inputs to `EngineBackend::report_usage_admin` (RFC-017 §5 budget+
3948/// quota admin §5, round-1 F4). Admin-path peer of `report_usage` —
3949/// both wrap `ff_report_usage_and_check` on the Valkey side but the
3950/// admin call is worker-less, so it cannot reuse the lease-bound
3951/// `report_usage(&Handle, ...)` signature. `ReportUsageAdminArgs`
3952/// carries the same fields as [`ReportUsageArgs`] without a worker
3953/// handle — kept as a distinct type so future admin-only fields (e.g.
3954/// `actor_identity`, `audit_reason`) don't pollute the worker path.
3955#[non_exhaustive]
3956#[derive(Clone, Debug)]
3957pub struct ReportUsageAdminArgs {
3958 pub dimensions: Vec<String>,
3959 pub deltas: Vec<u64>,
3960 pub dedup_key: Option<String>,
3961 pub now: TimestampMs,
3962}
3963
3964impl ReportUsageAdminArgs {
3965 pub fn new(dimensions: Vec<String>, deltas: Vec<u64>, now: TimestampMs) -> Self {
3966 Self {
3967 dimensions,
3968 deltas,
3969 dedup_key: None,
3970 now,
3971 }
3972 }
3973 pub fn with_dedup_key(mut self, key: String) -> Self {
3974 self.dedup_key = Some(key);
3975 self
3976 }
3977}
3978
3979#[cfg(test)]
3980mod rfc_014_validation_tests {
3981 use super::*;
3982
3983 fn single(wp: &str) -> ResumeCondition {
3984 ResumeCondition::Single {
3985 waitpoint_key: wp.to_owned(),
3986 matcher: SignalMatcher::ByName("x".to_owned()),
3987 }
3988 }
3989
3990 #[test]
3991 fn single_passes_validate() {
3992 assert!(single("wpk:a").validate_composite().is_ok());
3993 }
3994
3995 #[test]
3996 fn allof_empty_members_rejected() {
3997 let c = ResumeCondition::Composite(CompositeBody::AllOf { members: vec![] });
3998 let e = c.validate_composite().unwrap_err();
3999 assert!(e.detail.contains("allof_empty_members"), "{}", e.detail);
4000 }
4001
4002 #[test]
4003 fn count_n_zero_rejected() {
4004 let c = ResumeCondition::Composite(CompositeBody::Count {
4005 n: 0,
4006 count_kind: CountKind::DistinctWaitpoints,
4007 matcher: None,
4008 waitpoints: vec!["wpk:a".to_owned()],
4009 });
4010 let e = c.validate_composite().unwrap_err();
4011 assert!(e.detail.contains("count_n_zero"), "{}", e.detail);
4012 }
4013
4014 #[test]
4015 fn count_waitpoints_empty_rejected() {
4016 let c = ResumeCondition::Composite(CompositeBody::Count {
4017 n: 1,
4018 count_kind: CountKind::DistinctSources,
4019 matcher: None,
4020 waitpoints: vec![],
4021 });
4022 let e = c.validate_composite().unwrap_err();
4023 assert!(e.detail.contains("count_waitpoints_empty"), "{}", e.detail);
4024 }
4025
4026 #[test]
4027 fn count_exceeds_waitpoint_set_rejected_only_for_distinct_waitpoints() {
4028 // n=3, only 2 waitpoints, DistinctWaitpoints → reject.
4029 let c = ResumeCondition::Composite(CompositeBody::Count {
4030 n: 3,
4031 count_kind: CountKind::DistinctWaitpoints,
4032 matcher: None,
4033 waitpoints: vec!["a".into(), "b".into()],
4034 });
4035 let e = c.validate_composite().unwrap_err();
4036 assert!(e.detail.contains("count_exceeds_waitpoint_set"), "{}", e.detail);
4037
4038 // Same cardinality, DistinctSignals → allowed (no upper bound).
4039 let c2 = ResumeCondition::Composite(CompositeBody::Count {
4040 n: 3,
4041 count_kind: CountKind::DistinctSignals,
4042 matcher: None,
4043 waitpoints: vec!["a".into(), "b".into()],
4044 });
4045 assert!(c2.validate_composite().is_ok());
4046 }
4047
4048 #[test]
4049 fn depth_4_accepted_depth_5_rejected() {
4050 // Build Depth-4: AllOf { AllOf { AllOf { AllOf { Single } } } }
4051 let leaf = single("wpk:leaf");
4052 let d4 = ResumeCondition::Composite(CompositeBody::AllOf {
4053 members: vec![ResumeCondition::Composite(CompositeBody::AllOf {
4054 members: vec![ResumeCondition::Composite(CompositeBody::AllOf {
4055 members: vec![ResumeCondition::Composite(CompositeBody::AllOf {
4056 members: vec![leaf.clone()],
4057 })],
4058 })],
4059 })],
4060 });
4061 assert!(d4.validate_composite().is_ok());
4062
4063 // Depth-5 → reject.
4064 let d5 = ResumeCondition::Composite(CompositeBody::AllOf {
4065 members: vec![d4],
4066 });
4067 let e = d5.validate_composite().unwrap_err();
4068 assert!(e.detail.contains("exceeds cap"), "{}", e.detail);
4069 }
4070}