ff_core/contracts/mod.rs
1//! Phase 1 function contracts — Args + Result types for each FCALL.
2//!
3//! Each Args struct defines the typed inputs to a Valkey Function.
4//! Each Result enum defines the possible outcomes (success variants + error codes).
5
6pub mod decode;
7
8use crate::policy::ExecutionPolicy;
9use crate::state::{AttemptType, PublicState, StateVector};
10use crate::types::{
11 AttemptId, AttemptIndex, CancelSource, EdgeId, ExecutionId, FlowId, LaneId, LeaseEpoch,
12 LeaseFence, LeaseId, Namespace, SignalId, SuspensionId, TimestampMs, WaitpointId,
13 WaitpointToken, WorkerId, WorkerInstanceId,
14};
15use serde::{Deserialize, Serialize};
16use std::collections::{BTreeMap, BTreeSet, HashMap};
17
18// ─── create_execution ───
19
20#[derive(Clone, Debug, Serialize, Deserialize)]
21pub struct CreateExecutionArgs {
22 pub execution_id: ExecutionId,
23 pub namespace: Namespace,
24 pub lane_id: LaneId,
25 pub execution_kind: String,
26 pub input_payload: Vec<u8>,
27 #[serde(default)]
28 pub payload_encoding: Option<String>,
29 pub priority: i32,
30 pub creator_identity: String,
31 #[serde(default)]
32 pub idempotency_key: Option<String>,
33 #[serde(default)]
34 pub tags: HashMap<String, String>,
35 /// Execution policy (retry, timeout, suspension, routing, etc.).
36 #[serde(default)]
37 pub policy: Option<ExecutionPolicy>,
38 /// If set and in the future, execution starts delayed.
39 #[serde(default)]
40 pub delay_until: Option<TimestampMs>,
41 /// Absolute deadline timestamp (ms). Execution expires if not complete by this time.
42 #[serde(default)]
43 pub execution_deadline_at: Option<TimestampMs>,
44 /// Partition ID (pre-computed).
45 pub partition_id: u16,
46 pub now: TimestampMs,
47}
48
49#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
50pub enum CreateExecutionResult {
51 /// Execution created successfully.
52 Created {
53 execution_id: ExecutionId,
54 public_state: PublicState,
55 },
56 /// Idempotent duplicate — existing execution returned.
57 Duplicate { execution_id: ExecutionId },
58}
59
60// ─── issue_claim_grant ───
61
62#[derive(Clone, Debug, Serialize, Deserialize)]
63pub struct IssueClaimGrantArgs {
64 pub execution_id: ExecutionId,
65 pub lane_id: LaneId,
66 pub worker_id: WorkerId,
67 pub worker_instance_id: WorkerInstanceId,
68 #[serde(default)]
69 pub capability_hash: Option<String>,
70 #[serde(default)]
71 pub route_snapshot_json: Option<String>,
72 #[serde(default)]
73 pub admission_summary: Option<String>,
74 /// Capabilities this worker advertises. Serialized as a sorted,
75 /// comma-separated string to the Lua FCALL (see scheduling.lua
76 /// ff_issue_claim_grant). An empty set matches only executions whose
77 /// `required_capabilities` is also empty.
78 #[serde(default)]
79 pub worker_capabilities: BTreeSet<String>,
80 pub grant_ttl_ms: u64,
81 /// Caller-side timestamp for bookkeeping. NOT passed to the Lua FCALL —
82 /// ff_issue_claim_grant uses `redis.call("TIME")` for grant_expires_at.
83 pub now: TimestampMs,
84}
85
86#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
87pub enum IssueClaimGrantResult {
88 /// Grant issued.
89 Granted { execution_id: ExecutionId },
90}
91
92/// A claim grant issued by the scheduler for a specific execution.
93///
94/// The worker uses this to call `ff_claim_execution` (or
95/// `ff_acquire_lease`), which atomically consumes the grant and
96/// creates the lease.
97///
98/// Shared wire-level type between `ff-scheduler` (issuer) and
99/// `ff-sdk` (consumer, via `FlowFabricWorker::claim_from_grant`).
100/// Lives in `ff-core` so neither crate needs a dep on the other.
101///
102/// **Lane asymmetry with [`ReclaimGrant`]:** `ClaimGrant` does NOT
103/// carry `lane_id`. The issuing scheduler's caller already picked
104/// a lane (that's how admission reached this grant) and passes it
105/// through to `claim_from_grant` as a separate argument. The grant
106/// handle stays narrow to what uniquely identifies the admission
107/// decision. The matching field on [`ReclaimGrant`] is an
108/// intentional divergence — see the note on that type.
109#[derive(Clone, Debug, PartialEq, Eq)]
110pub struct ClaimGrant {
111 /// The execution that was granted.
112 pub execution_id: ExecutionId,
113 /// Opaque partition handle for this execution's hash-tag slot.
114 ///
115 /// Public wire type: consumers pass it back to FlowFabric but
116 /// must not parse the interior hash tag for routing decisions.
117 /// Internal consumers that need the typed
118 /// [`crate::partition::Partition`] call [`Self::partition`].
119 pub partition_key: crate::partition::PartitionKey,
120 /// The Valkey key holding the grant hash (for the worker to
121 /// reference).
122 pub grant_key: String,
123 /// When the grant expires if not consumed.
124 pub expires_at_ms: u64,
125}
126
127impl ClaimGrant {
128 /// Parse `partition_key` into a typed
129 /// [`crate::partition::Partition`]. Intended for internal
130 /// consumers (scheduler emitter, SDK worker claim path) that
131 /// need the family/index pair. Fails only on malformed keys
132 /// (which indicates a producer bug).
133 ///
134 /// Alias collapse applies: a grant issued against `Execution`
135 /// family round-trips to `Flow` (see [`crate::partition::PartitionKey`]
136 /// for the rationale — routing is preserved, only the metadata
137 /// family label normalises).
138 pub fn partition(
139 &self,
140 ) -> Result<crate::partition::Partition, crate::partition::PartitionKeyParseError> {
141 self.partition_key.parse()
142 }
143}
144
145/// A reclaim grant issued for a resumed (attempt_interrupted) execution.
146///
147/// Issued by a producer (typically `ff-scheduler` once a Batch-C
148/// reclaim scanner is in place; test fixtures in the interim — no
149/// production Rust caller exists in-tree today). Consumed by
150/// [`FlowFabricWorker::claim_from_reclaim_grant`], which calls
151/// `ff_claim_resumed_execution` atomically: that FCALL validates the
152/// grant, consumes it, and transitions `attempt_interrupted` →
153/// `started` while preserving the existing `attempt_index` +
154/// `attempt_id` (a resumed execution re-uses its attempt; it does
155/// not start a new one).
156///
157/// Mirrors [`ClaimGrant`] for the resume path. Differences:
158///
159/// * [`ClaimGrant`] is issued against a freshly-eligible
160/// execution and `ff_claim_execution` creates a new attempt.
161/// * [`ReclaimGrant`] is issued against an `attempt_interrupted`
162/// execution; `ff_claim_resumed_execution` re-uses the existing
163/// attempt and bumps the lease epoch.
164///
165/// The grant itself is written to the same `claim_grant` Valkey key
166/// that [`ClaimGrant`] uses; the distinction is which Lua FCALL
167/// consumes it (`ff_claim_execution` for new attempts,
168/// `ff_claim_resumed_execution` for resumes).
169///
170/// **Lane asymmetry with [`ClaimGrant`]:** `ReclaimGrant` CARRIES
171/// `lane_id` as a field. The issuing path already knows the lane
172/// (it's read from `exec_core` at grant time); carrying it here
173/// spares the consumer a `HGET exec_core lane_id` round trip on
174/// the hot claim path. The asymmetry is intentional — prefer
175/// one-fewer-HGET on a type that already lives with the resumer's
176/// lifecycle over strict handle symmetry with `ClaimGrant`.
177///
178/// Shared wire-level type between the eventual `ff-scheduler`
179/// producer (Batch-C reclaim scanner — not yet in-tree; test
180/// fixtures construct this type today) and `ff-sdk` (consumer, via
181/// `FlowFabricWorker::claim_from_reclaim_grant`). Lives in
182/// `ff-core` so neither crate needs a dep on the other.
183///
184/// [`FlowFabricWorker::claim_from_reclaim_grant`]: https://docs.rs/ff-sdk
185#[derive(Clone, Debug, PartialEq, Eq)]
186pub struct ReclaimGrant {
187 /// The execution granted for resumption.
188 pub execution_id: ExecutionId,
189 /// Opaque partition handle for this execution's hash-tag slot.
190 ///
191 /// Same wire-opacity contract as [`ClaimGrant::partition_key`].
192 /// Internal consumers call [`Self::partition`] for the parsed
193 /// form.
194 pub partition_key: crate::partition::PartitionKey,
195 /// Valkey key of the grant hash — same key shape as
196 /// [`ClaimGrant`].
197 pub grant_key: String,
198 /// Monotonic ms when the grant expires; unconsumed grants
199 /// vanish.
200 pub expires_at_ms: u64,
201 /// Lane the execution belongs to. Needed by
202 /// `ff_claim_resumed_execution` for `KEYS[3]` (eligible_zset)
203 /// and `KEYS[9]` (active_index).
204 pub lane_id: LaneId,
205}
206
207impl ReclaimGrant {
208 /// Parse `partition_key` into a typed
209 /// [`crate::partition::Partition`]. See [`ClaimGrant::partition`]
210 /// for the alias-collapse note.
211 pub fn partition(
212 &self,
213 ) -> Result<crate::partition::Partition, crate::partition::PartitionKeyParseError> {
214 self.partition_key.parse()
215 }
216}
217
218// ─── claim_execution ───
219
220#[derive(Clone, Debug, Serialize, Deserialize)]
221pub struct ClaimExecutionArgs {
222 pub execution_id: ExecutionId,
223 pub worker_id: WorkerId,
224 pub worker_instance_id: WorkerInstanceId,
225 pub lane_id: LaneId,
226 pub lease_id: LeaseId,
227 pub lease_ttl_ms: u64,
228 pub attempt_id: AttemptId,
229 /// Expected attempt index (pre-read from exec_core.total_attempt_count).
230 /// Used for KEYS construction — must match what the Lua computes.
231 pub expected_attempt_index: AttemptIndex,
232 /// JSON-encoded attempt policy snapshot.
233 #[serde(default)]
234 pub attempt_policy_json: String,
235 /// Per-attempt timeout in ms.
236 #[serde(default)]
237 pub attempt_timeout_ms: Option<u64>,
238 /// Total execution deadline (absolute timestamp ms).
239 #[serde(default)]
240 pub execution_deadline_at: Option<i64>,
241 pub now: TimestampMs,
242}
243
244#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
245pub struct ClaimedExecution {
246 pub execution_id: ExecutionId,
247 pub lease_id: LeaseId,
248 pub lease_epoch: LeaseEpoch,
249 pub attempt_index: AttemptIndex,
250 pub attempt_id: AttemptId,
251 pub attempt_type: AttemptType,
252 pub lease_expires_at: TimestampMs,
253}
254
255#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
256pub enum ClaimExecutionResult {
257 /// Successfully claimed.
258 Claimed(ClaimedExecution),
259}
260
261// ─── complete_execution ───
262
263#[derive(Clone, Debug, Serialize, Deserialize)]
264pub struct CompleteExecutionArgs {
265 pub execution_id: ExecutionId,
266 /// RFC #58.5 — fence triple. `Some` for SDK worker paths (standard
267 /// stale-lease fence). `None` for operator overrides, in which case
268 /// `source` must be `CancelSource::OperatorOverride` or the Lua
269 /// returns `fence_required`.
270 #[serde(default)]
271 pub fence: Option<LeaseFence>,
272 pub attempt_index: AttemptIndex,
273 #[serde(default)]
274 pub result_payload: Option<Vec<u8>>,
275 #[serde(default)]
276 pub result_encoding: Option<String>,
277 /// RFC #58.5 — unfenced-call gate. Ignored when `fence` is `Some`.
278 #[serde(default)]
279 pub source: CancelSource,
280 pub now: TimestampMs,
281}
282
283#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
284pub enum CompleteExecutionResult {
285 /// Execution completed successfully.
286 Completed {
287 execution_id: ExecutionId,
288 public_state: PublicState,
289 },
290}
291
292// ─── renew_lease ───
293
294#[derive(Clone, Debug, Serialize, Deserialize)]
295pub struct RenewLeaseArgs {
296 pub execution_id: ExecutionId,
297 pub attempt_index: AttemptIndex,
298 /// RFC #58.5 — fence triple. Required (no operator override path for
299 /// renew). `None` returns `fence_required`.
300 pub fence: Option<LeaseFence>,
301 /// How long to extend the lease (milliseconds).
302 pub lease_ttl_ms: u64,
303 /// Grace period after lease_expires_at before the lease_current key is auto-deleted.
304 #[serde(default = "default_lease_history_grace_ms")]
305 pub lease_history_grace_ms: u64,
306}
307
308fn default_lease_history_grace_ms() -> u64 {
309 60_000
310}
311
312#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
313pub enum RenewLeaseResult {
314 /// Lease renewed.
315 Renewed { expires_at: TimestampMs },
316}
317
318// ─── mark_lease_expired_if_due ───
319
320#[derive(Clone, Debug, Serialize, Deserialize)]
321pub struct MarkLeaseExpiredArgs {
322 pub execution_id: ExecutionId,
323}
324
325#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
326pub enum MarkLeaseExpiredResult {
327 /// Lease was marked as expired.
328 MarkedExpired,
329 /// No action needed (already expired, not yet due, not active, etc.).
330 AlreadySatisfied { reason: String },
331}
332
333// ─── cancel_execution ───
334
335#[derive(Clone, Debug, Serialize, Deserialize)]
336pub struct CancelExecutionArgs {
337 pub execution_id: ExecutionId,
338 pub reason: String,
339 #[serde(default)]
340 pub source: CancelSource,
341 /// Required if not operator_override and execution is active.
342 #[serde(default)]
343 pub lease_id: Option<LeaseId>,
344 #[serde(default)]
345 pub lease_epoch: Option<LeaseEpoch>,
346 /// Required if not operator_override and execution is active.
347 #[serde(default)]
348 pub attempt_id: Option<AttemptId>,
349 pub now: TimestampMs,
350}
351
352#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
353pub enum CancelExecutionResult {
354 /// Execution cancelled.
355 Cancelled {
356 execution_id: ExecutionId,
357 public_state: PublicState,
358 },
359}
360
361// ─── revoke_lease ───
362
363#[derive(Clone, Debug, Serialize, Deserialize)]
364pub struct RevokeLeaseArgs {
365 pub execution_id: ExecutionId,
366 /// If set, only revoke if this matches the current lease. Empty string skips check.
367 #[serde(default)]
368 pub expected_lease_id: Option<String>,
369 /// Worker instance whose lease set to clean up. Read from exec_core before calling.
370 pub worker_instance_id: WorkerInstanceId,
371 pub reason: String,
372}
373
374#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
375pub enum RevokeLeaseResult {
376 /// Lease revoked.
377 Revoked {
378 lease_id: String,
379 lease_epoch: String,
380 },
381 /// Already revoked or expired — no action needed.
382 AlreadySatisfied { reason: String },
383}
384
385// ─── delay_execution ───
386
387#[derive(Clone, Debug, Serialize, Deserialize)]
388pub struct DelayExecutionArgs {
389 pub execution_id: ExecutionId,
390 /// RFC #58.5 — fence triple. `None` requires `source ==
391 /// CancelSource::OperatorOverride`.
392 #[serde(default)]
393 pub fence: Option<LeaseFence>,
394 pub attempt_index: AttemptIndex,
395 pub delay_until: TimestampMs,
396 #[serde(default)]
397 pub source: CancelSource,
398 pub now: TimestampMs,
399}
400
401#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
402pub enum DelayExecutionResult {
403 /// Execution delayed.
404 Delayed {
405 execution_id: ExecutionId,
406 public_state: PublicState,
407 },
408}
409
410// ─── move_to_waiting_children ───
411
412#[derive(Clone, Debug, Serialize, Deserialize)]
413pub struct MoveToWaitingChildrenArgs {
414 pub execution_id: ExecutionId,
415 /// RFC #58.5 — fence triple. `None` requires `source ==
416 /// CancelSource::OperatorOverride`.
417 #[serde(default)]
418 pub fence: Option<LeaseFence>,
419 pub attempt_index: AttemptIndex,
420 #[serde(default)]
421 pub source: CancelSource,
422 pub now: TimestampMs,
423}
424
425#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
426pub enum MoveToWaitingChildrenResult {
427 /// Moved to waiting children.
428 Moved {
429 execution_id: ExecutionId,
430 public_state: PublicState,
431 },
432}
433
434// ─── change_priority ───
435
436#[derive(Clone, Debug, Serialize, Deserialize)]
437pub struct ChangePriorityArgs {
438 pub execution_id: ExecutionId,
439 pub new_priority: i32,
440 pub lane_id: LaneId,
441 pub now: TimestampMs,
442}
443
444#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
445pub enum ChangePriorityResult {
446 /// Priority changed and re-scored.
447 Changed { execution_id: ExecutionId },
448}
449
450// ─── update_progress ───
451
452#[derive(Clone, Debug, Serialize, Deserialize)]
453pub struct UpdateProgressArgs {
454 pub execution_id: ExecutionId,
455 pub lease_id: LeaseId,
456 pub lease_epoch: LeaseEpoch,
457 pub attempt_id: AttemptId,
458 #[serde(default)]
459 pub progress_pct: Option<u8>,
460 #[serde(default)]
461 pub progress_message: Option<String>,
462 pub now: TimestampMs,
463}
464
465#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
466pub enum UpdateProgressResult {
467 /// Progress updated.
468 Updated,
469}
470
471// ═══════════════════════════════════════════════════════════════════════
472// Phase 2 contracts: fail, reclaim, expire
473// ═══════════════════════════════════════════════════════════════════════
474
475// ─── fail_execution ───
476
477#[derive(Clone, Debug, Serialize, Deserialize)]
478pub struct FailExecutionArgs {
479 pub execution_id: ExecutionId,
480 /// RFC #58.5 — fence triple. `None` requires `source ==
481 /// CancelSource::OperatorOverride`.
482 #[serde(default)]
483 pub fence: Option<LeaseFence>,
484 pub attempt_index: AttemptIndex,
485 pub failure_reason: String,
486 pub failure_category: String,
487 /// JSON-encoded retry policy (from execution policy). Empty = no retries.
488 #[serde(default)]
489 pub retry_policy_json: String,
490 /// JSON-encoded attempt policy for the next retry attempt.
491 #[serde(default)]
492 pub next_attempt_policy_json: String,
493 #[serde(default)]
494 pub source: CancelSource,
495}
496
497/// Outcome of a fail_execution call.
498#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
499pub enum FailExecutionResult {
500 /// Retry was scheduled — execution is delayed with backoff.
501 RetryScheduled {
502 delay_until: TimestampMs,
503 next_attempt_index: AttemptIndex,
504 },
505 /// No retries left — execution is terminal failed.
506 TerminalFailed,
507}
508
509// ─── issue_reclaim_grant ───
510
511#[derive(Clone, Debug, Serialize, Deserialize)]
512pub struct IssueReclaimGrantArgs {
513 pub execution_id: ExecutionId,
514 pub worker_id: WorkerId,
515 pub worker_instance_id: WorkerInstanceId,
516 pub lane_id: LaneId,
517 #[serde(default)]
518 pub capability_hash: Option<String>,
519 pub grant_ttl_ms: u64,
520 #[serde(default)]
521 pub route_snapshot_json: Option<String>,
522 #[serde(default)]
523 pub admission_summary: Option<String>,
524 /// Caller-side timestamp for bookkeeping. NOT passed to the Lua FCALL —
525 /// ff_issue_reclaim_grant uses `redis.call("TIME")` for grant_expires_at
526 /// (same as ff_issue_claim_grant). Kept for contract symmetry with
527 /// IssueClaimGrantArgs and scheduler audit logging.
528 pub now: TimestampMs,
529}
530
531#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
532pub enum IssueReclaimGrantResult {
533 /// Reclaim grant issued.
534 Granted { expires_at_ms: TimestampMs },
535}
536
537// ─── reclaim_execution ───
538
539#[derive(Clone, Debug, Serialize, Deserialize)]
540pub struct ReclaimExecutionArgs {
541 pub execution_id: ExecutionId,
542 pub worker_id: WorkerId,
543 pub worker_instance_id: WorkerInstanceId,
544 pub lane_id: LaneId,
545 #[serde(default)]
546 pub capability_hash: Option<String>,
547 pub lease_id: LeaseId,
548 pub lease_ttl_ms: u64,
549 pub attempt_id: AttemptId,
550 /// JSON-encoded attempt policy for the reclaim attempt.
551 #[serde(default)]
552 pub attempt_policy_json: String,
553 /// Maximum reclaim count before terminal failure. Default: 100.
554 #[serde(default = "default_max_reclaim_count")]
555 pub max_reclaim_count: u32,
556 /// Old worker instance (for old_worker_leases key construction).
557 pub old_worker_instance_id: WorkerInstanceId,
558 /// Current attempt index (for old_attempt/old_stream_meta key construction).
559 pub current_attempt_index: AttemptIndex,
560}
561
562fn default_max_reclaim_count() -> u32 {
563 100
564}
565
566#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
567pub enum ReclaimExecutionResult {
568 /// Execution reclaimed — new attempt + new lease.
569 Reclaimed {
570 new_attempt_index: AttemptIndex,
571 new_attempt_id: AttemptId,
572 new_lease_id: LeaseId,
573 new_lease_epoch: LeaseEpoch,
574 lease_expires_at: TimestampMs,
575 },
576 /// Max reclaims exceeded — execution moved to terminal.
577 MaxReclaimsExceeded,
578}
579
580// ─── expire_execution ───
581
582#[derive(Clone, Debug, Serialize, Deserialize)]
583pub struct ExpireExecutionArgs {
584 pub execution_id: ExecutionId,
585 /// "attempt_timeout" or "execution_deadline"
586 pub expire_reason: String,
587}
588
589#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
590pub enum ExpireExecutionResult {
591 /// Execution expired.
592 Expired { execution_id: ExecutionId },
593 /// Already terminal — no-op.
594 AlreadyTerminal,
595}
596
597// ═══════════════════════════════════════════════════════════════════════
598// Phase 3 contracts: suspend, signal, resume, waitpoint
599// ═══════════════════════════════════════════════════════════════════════
600
601// ─── suspend_execution ───
602
603#[derive(Clone, Debug, Serialize, Deserialize)]
604pub struct SuspendExecutionArgs {
605 pub execution_id: ExecutionId,
606 /// RFC #58.5 — fence triple. Required (no operator override path for
607 /// suspend). `None` returns `fence_required`.
608 pub fence: Option<LeaseFence>,
609 pub attempt_index: AttemptIndex,
610 pub suspension_id: SuspensionId,
611 pub waitpoint_id: WaitpointId,
612 pub waitpoint_key: String,
613 pub reason_code: String,
614 pub requested_by: String,
615 pub resume_condition_json: String,
616 pub resume_policy_json: String,
617 #[serde(default)]
618 pub continuation_metadata_pointer: Option<String>,
619 #[serde(default)]
620 pub timeout_at: Option<TimestampMs>,
621 /// true to activate a pending waitpoint, false to create new.
622 #[serde(default)]
623 pub use_pending_waitpoint: bool,
624 /// Timeout behavior: "fail", "cancel", "expire", "auto_resume", "escalate".
625 #[serde(default = "default_timeout_behavior")]
626 pub timeout_behavior: String,
627}
628
629fn default_timeout_behavior() -> String {
630 "fail".to_owned()
631}
632
633#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
634pub enum SuspendExecutionResult {
635 /// Execution suspended, waitpoint active.
636 Suspended {
637 suspension_id: SuspensionId,
638 waitpoint_id: WaitpointId,
639 waitpoint_key: String,
640 /// HMAC-SHA1 token bound to (waitpoint_id, waitpoint_key, created_at).
641 /// Required by signal-delivery callers to authenticate against this
642 /// waitpoint (RFC-004 §Waitpoint Security).
643 waitpoint_token: WaitpointToken,
644 },
645 /// Buffered signals already satisfied the condition — suspension skipped.
646 /// Lease is still held. Token comes from the pending waitpoint record.
647 AlreadySatisfied {
648 suspension_id: SuspensionId,
649 waitpoint_id: WaitpointId,
650 waitpoint_key: String,
651 waitpoint_token: WaitpointToken,
652 },
653}
654
655// ─── resume_execution ───
656
657#[derive(Clone, Debug, Serialize, Deserialize)]
658pub struct ResumeExecutionArgs {
659 pub execution_id: ExecutionId,
660 /// "signal", "operator", "auto_resume"
661 #[serde(default = "default_trigger_type")]
662 pub trigger_type: String,
663 /// Optional delay before becoming eligible (ms).
664 #[serde(default)]
665 pub resume_delay_ms: u64,
666}
667
668fn default_trigger_type() -> String {
669 "signal".to_owned()
670}
671
672#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
673pub enum ResumeExecutionResult {
674 /// Execution resumed to runnable.
675 Resumed { public_state: PublicState },
676}
677
678// ─── create_pending_waitpoint ───
679
680#[derive(Clone, Debug, Serialize, Deserialize)]
681pub struct CreatePendingWaitpointArgs {
682 pub execution_id: ExecutionId,
683 pub lease_id: LeaseId,
684 pub lease_epoch: LeaseEpoch,
685 pub attempt_index: AttemptIndex,
686 pub attempt_id: AttemptId,
687 pub waitpoint_id: WaitpointId,
688 pub waitpoint_key: String,
689 /// Short expiry for the pending waitpoint (ms).
690 pub expires_in_ms: u64,
691}
692
693#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
694pub enum CreatePendingWaitpointResult {
695 /// Pending waitpoint created.
696 Created {
697 waitpoint_id: WaitpointId,
698 waitpoint_key: String,
699 /// HMAC-SHA1 token bound to the pending waitpoint. Required for
700 /// `buffer_signal_for_pending_waitpoint` and carried forward when
701 /// the waitpoint is activated by `suspend_execution`.
702 waitpoint_token: WaitpointToken,
703 },
704}
705
706// ─── close_waitpoint ───
707
708#[derive(Clone, Debug, Serialize, Deserialize)]
709pub struct CloseWaitpointArgs {
710 pub waitpoint_id: WaitpointId,
711 pub reason: String,
712}
713
714#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
715pub enum CloseWaitpointResult {
716 /// Waitpoint closed.
717 Closed,
718}
719
720// ─── deliver_signal ───
721
722#[derive(Clone, Debug, Serialize, Deserialize)]
723pub struct DeliverSignalArgs {
724 pub execution_id: ExecutionId,
725 pub waitpoint_id: WaitpointId,
726 pub signal_id: SignalId,
727 pub signal_name: String,
728 pub signal_category: String,
729 pub source_type: String,
730 pub source_identity: String,
731 #[serde(default)]
732 pub payload: Option<Vec<u8>>,
733 #[serde(default)]
734 pub payload_encoding: Option<String>,
735 #[serde(default)]
736 pub correlation_id: Option<String>,
737 #[serde(default)]
738 pub idempotency_key: Option<String>,
739 pub target_scope: String,
740 #[serde(default)]
741 pub created_at: Option<TimestampMs>,
742 /// Dedup TTL for idempotency key (ms).
743 #[serde(default)]
744 pub dedup_ttl_ms: Option<u64>,
745 /// Resume delay after signal satisfaction (ms).
746 #[serde(default)]
747 pub resume_delay_ms: Option<u64>,
748 /// Max signals per execution (default 10000).
749 #[serde(default)]
750 pub max_signals_per_execution: Option<u64>,
751 /// MAXLEN for the waitpoint signal stream.
752 #[serde(default)]
753 pub signal_maxlen: Option<u64>,
754 /// HMAC-SHA1 token issued when the waitpoint was created. Required for
755 /// signal delivery; missing/tampered/rotated-past-grace tokens are
756 /// rejected with `invalid_token` or `token_expired` (RFC-004).
757 ///
758 /// Defense-in-depth: `WaitpointToken` is a transparent string newtype,
759 /// so an empty string deserializes successfully from JSON. The
760 /// validation boundary is in Lua (`validate_waitpoint_token` returns
761 /// `missing_token` on empty input); this type intentionally does NOT
762 /// pre-reject at the Rust layer so callers get a consistent typed
763 /// error regardless of how they constructed the args.
764 pub waitpoint_token: WaitpointToken,
765 pub now: TimestampMs,
766}
767
768#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
769pub enum DeliverSignalResult {
770 /// Signal accepted with the given effect.
771 Accepted { signal_id: SignalId, effect: String },
772 /// Duplicate signal (idempotency key matched).
773 Duplicate { existing_signal_id: SignalId },
774}
775
776// ─── buffer_signal_for_pending_waitpoint ───
777
778#[derive(Clone, Debug, Serialize, Deserialize)]
779pub struct BufferSignalArgs {
780 pub execution_id: ExecutionId,
781 pub waitpoint_id: WaitpointId,
782 pub signal_id: SignalId,
783 pub signal_name: String,
784 pub signal_category: String,
785 pub source_type: String,
786 pub source_identity: String,
787 #[serde(default)]
788 pub payload: Option<Vec<u8>>,
789 #[serde(default)]
790 pub payload_encoding: Option<String>,
791 #[serde(default)]
792 pub idempotency_key: Option<String>,
793 pub target_scope: String,
794 /// HMAC-SHA1 token issued when `create_pending_waitpoint` ran. Required
795 /// to authenticate early signals targeting the pending waitpoint.
796 pub waitpoint_token: WaitpointToken,
797 pub now: TimestampMs,
798}
799
800#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
801pub enum BufferSignalResult {
802 /// Signal buffered for pending waitpoint.
803 Buffered { signal_id: SignalId },
804 /// Duplicate signal.
805 Duplicate { existing_signal_id: SignalId },
806}
807
808// ─── list_pending_waitpoints ───
809
810/// One entry in the read-only view of an execution's active waitpoints.
811///
812/// Returned by `Server::list_pending_waitpoints` (and the
813/// `GET /v1/executions/{id}/pending-waitpoints` REST endpoint). The
814/// `waitpoint_token` is the same HMAC-SHA1 credential a suspending worker
815/// receives in `SuspendOutcome::Suspended` — a reviewer that needs to
816/// deliver a signal against this waitpoint must present it in
817/// `DeliverSignalArgs::waitpoint_token`.
818///
819/// Exposing the token here is a deliberate API gap closure: a
820/// human-in-the-loop reviewer has no other path to the token, since only
821/// the suspending worker sees the `SuspendOutcome`. Access is gated by
822/// the same bearer-auth middleware as every other REST endpoint.
823#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
824pub struct PendingWaitpointInfo {
825 pub waitpoint_id: WaitpointId,
826 pub waitpoint_key: String,
827 /// Current waitpoint state: `pending`, `active`, `closed`. Callers
828 /// typically filter to `pending` or `active`.
829 pub state: String,
830 /// HMAC-SHA1 token minted at create time; required by
831 /// `ff_deliver_signal` and `ff_buffer_signal_for_pending_waitpoint`.
832 pub waitpoint_token: WaitpointToken,
833 /// Signal names the resume condition is waiting for. Reviewers that
834 /// need to drive a specific waitpoint — particularly when multiple
835 /// concurrent waitpoints exist on one execution — filter on this to
836 /// pick the right target.
837 ///
838 /// An EMPTY vec means the condition matches any signal (wildcard, per
839 /// `lua/helpers.lua` `initialize_condition`). Callers must not infer
840 /// "no waitpoint" from empty; check `state` / length of the outer
841 /// list for that.
842 #[serde(default)]
843 pub required_signal_names: Vec<String>,
844 /// Timestamp when the waitpoint record was first written.
845 pub created_at: TimestampMs,
846 /// Timestamp when the waitpoint was activated (suspension landed).
847 /// `None` while the waitpoint is still `pending`.
848 #[serde(default, skip_serializing_if = "Option::is_none")]
849 pub activated_at: Option<TimestampMs>,
850 /// Scheduled expiration timestamp. `None` if no timeout configured.
851 #[serde(default, skip_serializing_if = "Option::is_none")]
852 pub expires_at: Option<TimestampMs>,
853}
854
855// ─── expire_suspension ───
856
857#[derive(Clone, Debug, Serialize, Deserialize)]
858pub struct ExpireSuspensionArgs {
859 pub execution_id: ExecutionId,
860}
861
862#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
863pub enum ExpireSuspensionResult {
864 /// Suspension expired with the given behavior applied.
865 Expired { behavior_applied: String },
866 /// Already resolved — no action needed.
867 AlreadySatisfied { reason: String },
868}
869
870// ─── claim_resumed_execution ───
871
872#[derive(Clone, Debug, Serialize, Deserialize)]
873pub struct ClaimResumedExecutionArgs {
874 pub execution_id: ExecutionId,
875 pub worker_id: WorkerId,
876 pub worker_instance_id: WorkerInstanceId,
877 pub lane_id: LaneId,
878 pub lease_id: LeaseId,
879 pub lease_ttl_ms: u64,
880 /// Current attempt index (for KEYS construction — from exec_core).
881 pub current_attempt_index: AttemptIndex,
882 /// Remaining attempt timeout from before suspension (ms). 0 = no timeout.
883 #[serde(default)]
884 pub remaining_attempt_timeout_ms: Option<u64>,
885 pub now: TimestampMs,
886}
887
888#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
889pub struct ClaimedResumedExecution {
890 pub execution_id: ExecutionId,
891 pub lease_id: LeaseId,
892 pub lease_epoch: LeaseEpoch,
893 pub attempt_index: AttemptIndex,
894 pub attempt_id: AttemptId,
895 pub lease_expires_at: TimestampMs,
896}
897
898#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
899pub enum ClaimResumedExecutionResult {
900 /// Successfully claimed resumed execution (same attempt continues).
901 Claimed(ClaimedResumedExecution),
902}
903
904// ═══════════════════════════════════════════════════════════════════════
905// Phase 4 contracts: stream
906// ═══════════════════════════════════════════════════════════════════════
907
908// ─── append_frame ───
909
910#[derive(Clone, Debug, Serialize, Deserialize)]
911pub struct AppendFrameArgs {
912 pub execution_id: ExecutionId,
913 pub attempt_index: AttemptIndex,
914 pub lease_id: LeaseId,
915 pub lease_epoch: LeaseEpoch,
916 pub attempt_id: AttemptId,
917 pub frame_type: String,
918 pub timestamp: TimestampMs,
919 pub payload: Vec<u8>,
920 #[serde(default)]
921 pub encoding: Option<String>,
922 /// Optional structured metadata for the frame (JSON blob).
923 #[serde(default)]
924 pub metadata_json: Option<String>,
925 #[serde(default)]
926 pub correlation_id: Option<String>,
927 #[serde(default)]
928 pub source: Option<String>,
929 /// MAXLEN for the stream. 0 = no trim.
930 #[serde(default)]
931 pub retention_maxlen: Option<u32>,
932 /// Max payload bytes per frame. Default: 65536.
933 #[serde(default)]
934 pub max_payload_bytes: Option<u32>,
935}
936
937#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
938pub enum AppendFrameResult {
939 /// Frame appended successfully.
940 Appended {
941 /// Valkey Stream entry ID (e.g. "1713100800150-0").
942 entry_id: String,
943 /// Total frame count after this append.
944 frame_count: u64,
945 },
946}
947
948// ─── StreamCursor (issue #92) ───
949
950/// Opaque cursor for attempt-stream reads/tails.
951///
952/// Replaces the bare `&str` / `String` stream-id parameters previously
953/// carried on `read_stream` / `tail_stream` / `ReadStreamParams` /
954/// `TailStreamParams`. The wire form is a flat string — serde is
955/// transparent via `try_from`/`into` — so `?from=start&to=end` and
956/// `?after=123-0` continue to work for REST clients.
957///
958/// # Public wire grammar
959///
960/// The ONLY accepted tokens are:
961///
962/// * `"start"` — first entry in the stream (XRANGE `-` equivalent).
963/// Valid in `read_stream` / `ReadStreamParams`.
964/// * `"end"` — latest entry in the stream (XRANGE `+` equivalent).
965/// Valid in `read_stream` / `ReadStreamParams`.
966/// * `"<ms>"` or `"<ms>-<seq>"` — a concrete Valkey Stream entry id.
967/// Valid everywhere.
968///
969/// The bare XRANGE/XREAD markers `"-"` and `"+"` are **NOT** accepted
970/// on the wire. The opaque `StreamCursor` grammar is the public
971/// contract; the Valkey `-`/`+` markers are an internal implementation
972/// detail carried only inside the Lua-adjacent [`ReadFramesArgs`] /
973/// `xread_block` path via [`StreamCursor::to_wire`].
974///
975/// For XREAD (tail), the documented "from the beginning" convention is
976/// `StreamCursor::At("0-0".into())` — use the convenience constructor
977/// [`StreamCursor::from_beginning`] which returns exactly that value.
978/// `Start` / `End` are rejected by the SDK's `tail_stream` boundary
979/// because XREAD does not accept `-` / `+` as cursors. The
980/// [`StreamCursor::is_concrete`] helper centralises this
981/// Start/End-vs-At decision for boundary-validation call sites.
982///
983/// # Why an enum instead of a string
984///
985/// A string parameter lets malformed ids escape to the Lua/Valkey
986/// layer, surfacing as a script error and HTTP 500. An enum with
987/// fallible `FromStr` / `TryFrom<String>` catches every malformed input
988/// at the wire boundary with a structured error, and prevents bare `-`
989/// / `+` from leaking into consumer code as tacit extensions of the
990/// public API.
991#[derive(Clone, Debug, PartialEq, Eq, Hash)]
992pub enum StreamCursor {
993 /// First entry in the stream (XRANGE start marker).
994 Start,
995 /// Latest entry in the stream (XRANGE end marker).
996 End,
997 /// A concrete Valkey Stream entry id (`<ms>` or `<ms>-<seq>`).
998 ///
999 /// For XREAD-style tails, the documented "from the beginning"
1000 /// convention is `At("0-0".to_owned())` — see
1001 /// [`StreamCursor::from_beginning`].
1002 At(String),
1003}
1004
1005impl StreamCursor {
1006 /// Convenience constructor for the XREAD-from-beginning convention
1007 /// (`"0-0"`). XREAD's `last_id` is exclusive, so passing this as
1008 /// the `after` cursor returns every entry in the stream.
1009 pub fn from_beginning() -> Self {
1010 Self::At("0-0".to_owned())
1011 }
1012
1013 /// Serde default helper — emits `StreamCursor::Start`. Used as
1014 /// `#[serde(default = "StreamCursor::start")]` on REST query
1015 /// structs.
1016 pub fn start() -> Self {
1017 Self::Start
1018 }
1019
1020 /// Serde default helper — emits `StreamCursor::End`.
1021 pub fn end() -> Self {
1022 Self::End
1023 }
1024
1025 /// Serde default helper — emits
1026 /// `StreamCursor::from_beginning()`. Used as the default for
1027 /// `TailStreamParams::after`.
1028 pub fn beginning() -> Self {
1029 Self::from_beginning()
1030 }
1031
1032 /// Internal-only: lower the cursor to the XRANGE/XREAD marker
1033 /// string Valkey expects. `Start → "-"`, `End → "+"`,
1034 /// `At(s) → s`.
1035 ///
1036 /// Used at the ff-script adapter edge (right before constructing
1037 /// `ReadFramesArgs` or calling `xread_block`) to translate the
1038 /// opaque wire grammar into the Lua-ABI form. NOT part of the
1039 /// public wire — do not emit these raw characters to consumers.
1040 /// Hidden from the generated docs to discourage external use;
1041 /// external consumers should never need to see the raw `-` / `+`.
1042 #[doc(hidden)]
1043 pub fn to_wire(&self) -> &str {
1044 match self {
1045 Self::Start => "-",
1046 Self::End => "+",
1047 Self::At(s) => s.as_str(),
1048 }
1049 }
1050
1051 /// Internal-only owned variant of [`Self::to_wire`] — moves the
1052 /// inner `String` out of `At(s)` without cloning. Use at adapter
1053 /// edges that construct an owned wire string (e.g.
1054 /// `ReadFramesArgs.from_id`) from a `StreamCursor` that is about
1055 /// to be dropped.
1056 #[doc(hidden)]
1057 pub fn into_wire_string(self) -> String {
1058 match self {
1059 Self::Start => "-".to_owned(),
1060 Self::End => "+".to_owned(),
1061 Self::At(s) => s,
1062 }
1063 }
1064
1065 /// True iff this cursor is a concrete entry id
1066 /// (`"<ms>"` / `"<ms>-<seq>"`). False for the open markers
1067 /// `Start` / `End`.
1068 ///
1069 /// Used by boundaries like XREAD (tailing) that do not accept
1070 /// open markers — rejecting a cursor is equivalent to
1071 /// `!cursor.is_concrete()`. Centralised here to keep the SDK and
1072 /// REST guards in lock-step.
1073 pub fn is_concrete(&self) -> bool {
1074 matches!(self, Self::At(_))
1075 }
1076}
1077
1078impl std::fmt::Display for StreamCursor {
1079 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1080 match self {
1081 Self::Start => f.write_str("start"),
1082 Self::End => f.write_str("end"),
1083 Self::At(s) => f.write_str(s),
1084 }
1085 }
1086}
1087
1088/// Error produced when parsing a [`StreamCursor`] from a string.
1089#[derive(Clone, Debug, PartialEq, Eq)]
1090pub enum StreamCursorParseError {
1091 /// Empty input.
1092 Empty,
1093 /// Input matched a rejected bare-marker alias (`"-"`, `"+"`).
1094 /// The public wire requires `"start"` / `"end"`; the raw Valkey
1095 /// markers are internal-only.
1096 BareMarkerRejected(String),
1097 /// Input was neither a recognized keyword nor a well-formed
1098 /// Stream entry id. Entry ids must match `^\d+(?:-\d+)?$`.
1099 Malformed(String),
1100}
1101
1102impl std::fmt::Display for StreamCursorParseError {
1103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1104 match self {
1105 Self::Empty => f.write_str("stream cursor must not be empty"),
1106 Self::BareMarkerRejected(s) => write!(
1107 f,
1108 "bare marker '{s}' is not a valid stream cursor; use 'start' or 'end'"
1109 ),
1110 Self::Malformed(s) => write!(
1111 f,
1112 "invalid stream cursor '{s}' (expected 'start', 'end', '<ms>', or '<ms>-<seq>')"
1113 ),
1114 }
1115 }
1116}
1117
1118impl std::error::Error for StreamCursorParseError {}
1119
1120/// Shared grammar check — classifies `s` as `Start` / `End` / a
1121/// concrete-id shape / malformed / empty, WITHOUT allocating. The
1122/// owned vs borrowed entry points ([`StreamCursor::from_str`],
1123/// [`StreamCursor::try_from`]) consume this classification and move
1124/// the owned `String` into `At` when applicable, avoiding a
1125/// round-trip `String → &str → String::to_owned` for the common
1126/// REST-query path.
1127enum StreamCursorClass {
1128 Start,
1129 End,
1130 Concrete,
1131 BareMarker,
1132 Empty,
1133 Malformed,
1134}
1135
1136fn classify_stream_cursor(s: &str) -> StreamCursorClass {
1137 if s.is_empty() {
1138 return StreamCursorClass::Empty;
1139 }
1140 if s == "-" || s == "+" {
1141 return StreamCursorClass::BareMarker;
1142 }
1143 if s == "start" {
1144 return StreamCursorClass::Start;
1145 }
1146 if s == "end" {
1147 return StreamCursorClass::End;
1148 }
1149 if !s.is_ascii() {
1150 return StreamCursorClass::Malformed;
1151 }
1152 let (ms_part, seq_part) = match s.split_once('-') {
1153 Some((ms, seq)) => (ms, Some(seq)),
1154 None => (s, None),
1155 };
1156 let ms_ok = !ms_part.is_empty() && ms_part.bytes().all(|b| b.is_ascii_digit());
1157 let seq_ok = seq_part
1158 .map(|p| !p.is_empty() && p.bytes().all(|b| b.is_ascii_digit()))
1159 .unwrap_or(true);
1160 if ms_ok && seq_ok {
1161 StreamCursorClass::Concrete
1162 } else {
1163 StreamCursorClass::Malformed
1164 }
1165}
1166
1167impl std::str::FromStr for StreamCursor {
1168 type Err = StreamCursorParseError;
1169
1170 fn from_str(s: &str) -> Result<Self, Self::Err> {
1171 match classify_stream_cursor(s) {
1172 StreamCursorClass::Start => Ok(Self::Start),
1173 StreamCursorClass::End => Ok(Self::End),
1174 StreamCursorClass::Concrete => Ok(Self::At(s.to_owned())),
1175 StreamCursorClass::BareMarker => {
1176 Err(StreamCursorParseError::BareMarkerRejected(s.to_owned()))
1177 }
1178 StreamCursorClass::Empty => Err(StreamCursorParseError::Empty),
1179 StreamCursorClass::Malformed => Err(StreamCursorParseError::Malformed(s.to_owned())),
1180 }
1181 }
1182}
1183
1184impl TryFrom<String> for StreamCursor {
1185 type Error = StreamCursorParseError;
1186
1187 fn try_from(s: String) -> Result<Self, Self::Error> {
1188 // Owned parsing path — the `At` variant moves `s` in directly,
1189 // avoiding the `&str → String::to_owned` re-allocation that a
1190 // blind forward to `FromStr::from_str(&s)` would force. Error
1191 // paths still pay one allocation to describe the offending
1192 // input.
1193 match classify_stream_cursor(&s) {
1194 StreamCursorClass::Start => Ok(Self::Start),
1195 StreamCursorClass::End => Ok(Self::End),
1196 StreamCursorClass::Concrete => Ok(Self::At(s)),
1197 StreamCursorClass::BareMarker => Err(StreamCursorParseError::BareMarkerRejected(s)),
1198 StreamCursorClass::Empty => Err(StreamCursorParseError::Empty),
1199 StreamCursorClass::Malformed => Err(StreamCursorParseError::Malformed(s)),
1200 }
1201 }
1202}
1203
1204impl From<StreamCursor> for String {
1205 fn from(c: StreamCursor) -> Self {
1206 c.to_string()
1207 }
1208}
1209
1210impl Serialize for StreamCursor {
1211 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
1212 serializer.collect_str(self)
1213 }
1214}
1215
1216impl<'de> Deserialize<'de> for StreamCursor {
1217 fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
1218 let s = String::deserialize(deserializer)?;
1219 Self::try_from(s).map_err(serde::de::Error::custom)
1220 }
1221}
1222
1223// ─── read_attempt_stream / tail_attempt_stream ───
1224
1225/// Hard cap on the number of frames returned by a single read/tail call.
1226///
1227/// Single source of truth across the Rust layer (ff-script, ff-server,
1228/// ff-sdk). The Lua side in `lua/stream.lua` keeps a matching literal with
1229/// an inline reference back here; bump both together if you ever need to
1230/// lift the cap.
1231pub const STREAM_READ_HARD_CAP: u64 = 10_000;
1232
1233/// A single frame read from an attempt-scoped stream.
1234///
1235/// Field set mirrors what `ff_append_frame` writes: `frame_type`, `ts`,
1236/// `payload`, `encoding`, `source`, and optionally `correlation_id`. Stored
1237/// as an ordered map so field order is deterministic across read calls.
1238#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1239pub struct StreamFrame {
1240 /// Valkey Stream entry ID, e.g. "1713100800150-0".
1241 pub id: String,
1242 /// Frame fields in sorted order.
1243 pub fields: std::collections::BTreeMap<String, String>,
1244}
1245
1246/// Inputs to `ff_read_attempt_stream` (XRANGE wrapper).
1247#[derive(Clone, Debug, Serialize, Deserialize)]
1248pub struct ReadFramesArgs {
1249 pub execution_id: ExecutionId,
1250 pub attempt_index: AttemptIndex,
1251 /// XRANGE start ID. Use "-" for earliest.
1252 pub from_id: String,
1253 /// XRANGE end ID. Use "+" for latest.
1254 pub to_id: String,
1255 /// XRANGE COUNT limit. MUST be `>= 1`. The REST and SDK layers reject
1256 /// `0` at the boundary; the Lua side rejects it too. `STREAM_READ_HARD_CAP`
1257 /// is the upper bound.
1258 pub count_limit: u64,
1259}
1260
1261/// Result of reading frames from an attempt stream — frames plus terminal
1262/// signal so consumers can stop polling without a timeout fallback.
1263#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1264pub struct StreamFrames {
1265 /// Entries in the requested range (possibly empty).
1266 pub frames: Vec<StreamFrame>,
1267 /// Timestamp when the upstream writer closed the stream. `None` if the
1268 /// stream is still open (or has never been written).
1269 #[serde(default, skip_serializing_if = "Option::is_none")]
1270 pub closed_at: Option<TimestampMs>,
1271 /// Reason from the closing writer. Current values:
1272 /// `attempt_success`, `attempt_failure`, `attempt_cancelled`,
1273 /// `attempt_interrupted`. `None` iff the stream is still open.
1274 #[serde(default, skip_serializing_if = "Option::is_none")]
1275 pub closed_reason: Option<String>,
1276}
1277
1278impl StreamFrames {
1279 /// Construct an empty open-stream result (no frames, no terminal
1280 /// markers). Useful for fast-path peek helpers.
1281 pub fn empty_open() -> Self {
1282 Self {
1283 frames: Vec::new(),
1284 closed_at: None,
1285 closed_reason: None,
1286 }
1287 }
1288
1289 /// True iff the producer has closed this stream. Consumers should stop
1290 /// polling and drain once this returns true.
1291 pub fn is_closed(&self) -> bool {
1292 self.closed_at.is_some()
1293 }
1294}
1295
1296#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1297pub enum ReadFramesResult {
1298 /// Frames returned (possibly empty) plus optional closed markers.
1299 Frames(StreamFrames),
1300}
1301
1302// ═══════════════════════════════════════════════════════════════════════
1303// Phase 5 contracts: budget, quota, block/unblock
1304// ═══════════════════════════════════════════════════════════════════════
1305
1306// ─── create_budget ───
1307
1308#[derive(Clone, Debug, Serialize, Deserialize)]
1309pub struct CreateBudgetArgs {
1310 pub budget_id: crate::types::BudgetId,
1311 pub scope_type: String,
1312 pub scope_id: String,
1313 pub enforcement_mode: String,
1314 pub on_hard_limit: String,
1315 pub on_soft_limit: String,
1316 pub reset_interval_ms: u64,
1317 /// Dimension names.
1318 pub dimensions: Vec<String>,
1319 /// Hard limits per dimension (parallel with dimensions).
1320 pub hard_limits: Vec<u64>,
1321 /// Soft limits per dimension (parallel with dimensions).
1322 pub soft_limits: Vec<u64>,
1323 pub now: TimestampMs,
1324}
1325
1326#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1327pub enum CreateBudgetResult {
1328 /// Budget created.
1329 Created { budget_id: crate::types::BudgetId },
1330 /// Already exists (idempotent).
1331 AlreadySatisfied { budget_id: crate::types::BudgetId },
1332}
1333
1334// ─── create_quota_policy ───
1335
1336#[derive(Clone, Debug, Serialize, Deserialize)]
1337pub struct CreateQuotaPolicyArgs {
1338 pub quota_policy_id: crate::types::QuotaPolicyId,
1339 pub window_seconds: u64,
1340 pub max_requests_per_window: u64,
1341 pub max_concurrent: u64,
1342 pub now: TimestampMs,
1343}
1344
1345#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1346pub enum CreateQuotaPolicyResult {
1347 /// Quota policy created.
1348 Created {
1349 quota_policy_id: crate::types::QuotaPolicyId,
1350 },
1351 /// Already exists (idempotent).
1352 AlreadySatisfied {
1353 quota_policy_id: crate::types::QuotaPolicyId,
1354 },
1355}
1356
1357// ─── budget_status (read-only) ───
1358
1359/// Operator-facing budget status snapshot (not an FCALL — direct HGETALL reads).
1360#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1361pub struct BudgetStatus {
1362 pub budget_id: String,
1363 pub scope_type: String,
1364 pub scope_id: String,
1365 pub enforcement_mode: String,
1366 /// Current usage per dimension: {dimension_name: current_value}.
1367 pub usage: HashMap<String, u64>,
1368 /// Hard limits per dimension: {dimension_name: limit}.
1369 pub hard_limits: HashMap<String, u64>,
1370 /// Soft limits per dimension: {dimension_name: limit}.
1371 pub soft_limits: HashMap<String, u64>,
1372 pub breach_count: u64,
1373 pub soft_breach_count: u64,
1374 pub last_breach_at: Option<String>,
1375 pub last_breach_dim: Option<String>,
1376 pub next_reset_at: Option<String>,
1377 pub created_at: Option<String>,
1378}
1379
1380// ─── report_usage_and_check ───
1381
1382#[derive(Clone, Debug, Serialize, Deserialize)]
1383pub struct ReportUsageArgs {
1384 /// Dimension names to increment.
1385 pub dimensions: Vec<String>,
1386 /// Increment values (parallel with dimensions).
1387 pub deltas: Vec<u64>,
1388 pub now: TimestampMs,
1389 /// Optional idempotency key to prevent double-counting on retries.
1390 /// Pass the raw dedup id (e.g. `"retry-42"`); the typed FCALL wrapper
1391 /// wraps it into `ff:usagededup:{b:M}:<id>` using the budget
1392 /// partition's hash tag so it co-locates with the other budget keys
1393 /// (#108).
1394 #[serde(default)]
1395 pub dedup_key: Option<String>,
1396}
1397
1398#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1399#[non_exhaustive]
1400pub enum ReportUsageResult {
1401 /// All increments applied, no breach.
1402 Ok,
1403 /// Soft limit breached on a dimension (advisory, increments applied).
1404 SoftBreach {
1405 dimension: String,
1406 current_usage: u64,
1407 soft_limit: u64,
1408 },
1409 /// Hard limit breached (increments NOT applied).
1410 HardBreach {
1411 dimension: String,
1412 current_usage: u64,
1413 hard_limit: u64,
1414 },
1415 /// Dedup key matched — usage already applied in a prior call.
1416 AlreadyApplied,
1417}
1418
1419// ─── reset_budget ───
1420
1421#[derive(Clone, Debug, Serialize, Deserialize)]
1422pub struct ResetBudgetArgs {
1423 pub budget_id: crate::types::BudgetId,
1424 pub now: TimestampMs,
1425}
1426
1427#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1428pub enum ResetBudgetResult {
1429 /// Budget reset successfully.
1430 Reset { next_reset_at: TimestampMs },
1431}
1432
1433// ─── check_admission_and_record ───
1434
1435#[derive(Clone, Debug, Serialize, Deserialize)]
1436pub struct CheckAdmissionArgs {
1437 pub execution_id: ExecutionId,
1438 pub now: TimestampMs,
1439 pub window_seconds: u64,
1440 pub rate_limit: u64,
1441 pub concurrency_cap: u64,
1442 #[serde(default)]
1443 pub jitter_ms: Option<u64>,
1444}
1445
1446#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1447pub enum CheckAdmissionResult {
1448 /// Admitted — execution may proceed.
1449 Admitted,
1450 /// Already admitted in this window (idempotent).
1451 AlreadyAdmitted,
1452 /// Rate limit exceeded.
1453 RateExceeded { retry_after_ms: u64 },
1454 /// Concurrency cap hit.
1455 ConcurrencyExceeded,
1456}
1457
1458// ─── release_admission ───
1459
1460#[derive(Clone, Debug, Serialize, Deserialize)]
1461pub struct ReleaseAdmissionArgs {
1462 pub execution_id: ExecutionId,
1463}
1464
1465#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1466pub enum ReleaseAdmissionResult {
1467 Released,
1468}
1469
1470// ─── block_execution_for_admission ───
1471
1472#[derive(Clone, Debug, Serialize, Deserialize)]
1473pub struct BlockExecutionArgs {
1474 pub execution_id: ExecutionId,
1475 pub blocking_reason: String,
1476 #[serde(default)]
1477 pub blocking_detail: Option<String>,
1478 pub now: TimestampMs,
1479}
1480
1481#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1482pub enum BlockExecutionResult {
1483 /// Execution blocked.
1484 Blocked,
1485}
1486
1487// ─── unblock_execution ───
1488
1489#[derive(Clone, Debug, Serialize, Deserialize)]
1490pub struct UnblockExecutionArgs {
1491 pub execution_id: ExecutionId,
1492 pub now: TimestampMs,
1493 /// Expected blocking reason (prevents stale unblock).
1494 #[serde(default)]
1495 pub expected_blocking_reason: Option<String>,
1496}
1497
1498#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1499pub enum UnblockExecutionResult {
1500 /// Execution unblocked and moved to eligible.
1501 Unblocked,
1502}
1503
1504// ═══════════════════════════════════════════════════════════════════════
1505// Phase 6 contracts: flow coordination and dependencies
1506// ═══════════════════════════════════════════════════════════════════════
1507
1508// ─── create_flow ───
1509
1510#[derive(Clone, Debug, Serialize, Deserialize)]
1511pub struct CreateFlowArgs {
1512 pub flow_id: crate::types::FlowId,
1513 pub flow_kind: String,
1514 pub namespace: Namespace,
1515 pub now: TimestampMs,
1516}
1517
1518#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1519pub enum CreateFlowResult {
1520 /// Flow created successfully.
1521 Created { flow_id: crate::types::FlowId },
1522 /// Flow already exists (idempotent).
1523 AlreadySatisfied { flow_id: crate::types::FlowId },
1524}
1525
1526// ─── add_execution_to_flow ───
1527
1528#[derive(Clone, Debug, Serialize, Deserialize)]
1529pub struct AddExecutionToFlowArgs {
1530 pub flow_id: crate::types::FlowId,
1531 pub execution_id: ExecutionId,
1532 pub now: TimestampMs,
1533}
1534
1535#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1536pub enum AddExecutionToFlowResult {
1537 /// Execution added to flow.
1538 Added {
1539 execution_id: ExecutionId,
1540 new_node_count: u32,
1541 },
1542 /// Already a member (idempotent).
1543 AlreadyMember {
1544 execution_id: ExecutionId,
1545 node_count: u32,
1546 },
1547}
1548
1549// ─── cancel_flow ───
1550
1551#[derive(Clone, Debug, Serialize, Deserialize)]
1552pub struct CancelFlowArgs {
1553 pub flow_id: crate::types::FlowId,
1554 pub reason: String,
1555 pub cancellation_policy: String,
1556 pub now: TimestampMs,
1557}
1558
1559#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1560pub enum CancelFlowResult {
1561 /// Flow cancelled and all member cancellations (if any) have completed
1562 /// synchronously. Used when `cancellation_policy != "cancel_all"`, when
1563 /// the flow has no members, when the caller opted into synchronous
1564 /// dispatch (e.g. `?wait=true`), or when the flow was already in a
1565 /// terminal state (idempotent retry).
1566 ///
1567 /// On the idempotent-retry path `member_execution_ids` may be *capped*
1568 /// at the server (default 1000 entries) to bound response bandwidth on
1569 /// flows with very large membership. The first (non-idempotent) call
1570 /// always returns the full list, so clients that need every member id
1571 /// should persist the initial response.
1572 Cancelled {
1573 cancellation_policy: String,
1574 member_execution_ids: Vec<String>,
1575 },
1576 /// Flow state was flipped to cancelled atomically, but member
1577 /// cancellations are dispatched asynchronously in the background.
1578 /// Clients may poll `GET /v1/executions/{id}/state` for each member
1579 /// execution id to track terminal state.
1580 CancellationScheduled {
1581 cancellation_policy: String,
1582 member_count: u32,
1583 member_execution_ids: Vec<String>,
1584 },
1585 /// `?wait=true` dispatch completed but one or more member cancellations
1586 /// failed mid-loop (e.g. ghost member, Lua error, transport fault after
1587 /// retries exhausted). The flow itself is still flipped to cancelled
1588 /// (atomic Lua already ran); callers SHOULD inspect
1589 /// `failed_member_execution_ids` and either retry those ids directly
1590 /// via `cancel_execution` or wait for the cancel-backlog reconciler
1591 /// to retry them in the background.
1592 ///
1593 /// Only emitted by the synchronous wait path
1594 /// ([`crate::CancelFlowArgs`] via `?wait=true`). The async path returns
1595 /// [`CancelFlowResult::CancellationScheduled`] and delegates retries
1596 /// to the reconciler — there is no visible "partial" state on the
1597 /// async path because the dispatch result is not observed inline.
1598 PartiallyCancelled {
1599 cancellation_policy: String,
1600 /// All member execution ids that the cancel_flow FCALL returned
1601 /// (i.e. the full membership at the moment of cancellation).
1602 member_execution_ids: Vec<String>,
1603 /// Strict subset of `member_execution_ids` whose per-member cancel
1604 /// FCALL returned an error. Order is deterministic (matches the
1605 /// iteration order over `member_execution_ids`).
1606 failed_member_execution_ids: Vec<String>,
1607 },
1608}
1609
1610// ─── stage_dependency_edge ───
1611
1612#[derive(Clone, Debug, Serialize, Deserialize)]
1613pub struct StageDependencyEdgeArgs {
1614 pub flow_id: crate::types::FlowId,
1615 pub edge_id: crate::types::EdgeId,
1616 pub upstream_execution_id: ExecutionId,
1617 pub downstream_execution_id: ExecutionId,
1618 #[serde(default = "default_dependency_kind")]
1619 pub dependency_kind: String,
1620 #[serde(default)]
1621 pub data_passing_ref: Option<String>,
1622 pub expected_graph_revision: u64,
1623 pub now: TimestampMs,
1624}
1625
1626fn default_dependency_kind() -> String {
1627 "success_only".to_owned()
1628}
1629
1630#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1631pub enum StageDependencyEdgeResult {
1632 /// Edge staged, new graph revision.
1633 Staged {
1634 edge_id: crate::types::EdgeId,
1635 new_graph_revision: u64,
1636 },
1637}
1638
1639// ─── apply_dependency_to_child ───
1640
1641#[derive(Clone, Debug, Serialize, Deserialize)]
1642pub struct ApplyDependencyToChildArgs {
1643 pub flow_id: crate::types::FlowId,
1644 pub edge_id: crate::types::EdgeId,
1645 /// The child execution that receives the dependency.
1646 pub downstream_execution_id: ExecutionId,
1647 pub upstream_execution_id: ExecutionId,
1648 pub graph_revision: u64,
1649 #[serde(default = "default_dependency_kind")]
1650 pub dependency_kind: String,
1651 #[serde(default)]
1652 pub data_passing_ref: Option<String>,
1653 pub now: TimestampMs,
1654}
1655
1656#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1657pub enum ApplyDependencyToChildResult {
1658 /// Dependency applied, N unsatisfied deps remaining.
1659 Applied { unsatisfied_count: u32 },
1660 /// Already applied (idempotent).
1661 AlreadyApplied,
1662}
1663
1664// ─── resolve_dependency ───
1665
1666#[derive(Clone, Debug, Serialize, Deserialize)]
1667pub struct ResolveDependencyArgs {
1668 pub edge_id: crate::types::EdgeId,
1669 /// "success", "failed", "cancelled", "expired", "skipped"
1670 pub upstream_outcome: String,
1671 pub now: TimestampMs,
1672}
1673
1674#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1675pub enum ResolveDependencyResult {
1676 /// Edge satisfied — downstream may become eligible.
1677 Satisfied,
1678 /// Edge made impossible — downstream becomes skipped.
1679 Impossible,
1680 /// Already resolved (idempotent).
1681 AlreadyResolved,
1682}
1683
1684// ─── promote_blocked_to_eligible ───
1685
1686#[derive(Clone, Debug, Serialize, Deserialize)]
1687pub struct PromoteBlockedToEligibleArgs {
1688 pub execution_id: ExecutionId,
1689 pub now: TimestampMs,
1690}
1691
1692#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1693pub enum PromoteBlockedToEligibleResult {
1694 Promoted,
1695}
1696
1697// ─── evaluate_flow_eligibility ───
1698
1699#[derive(Clone, Debug, Serialize, Deserialize)]
1700pub struct EvaluateFlowEligibilityArgs {
1701 pub execution_id: ExecutionId,
1702}
1703
1704#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1705pub enum EvaluateFlowEligibilityResult {
1706 /// Execution eligibility status.
1707 Status { status: String },
1708}
1709
1710// ─── replay_execution ───
1711
1712#[derive(Clone, Debug, Serialize, Deserialize)]
1713pub struct ReplayExecutionArgs {
1714 pub execution_id: ExecutionId,
1715 pub now: TimestampMs,
1716}
1717
1718#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1719pub enum ReplayExecutionResult {
1720 /// Replayed to runnable.
1721 Replayed { public_state: PublicState },
1722}
1723
1724// ─── get_execution (full read) ───
1725
1726/// Full execution info returned by `Server::get_execution`.
1727#[derive(Clone, Debug, Serialize, Deserialize)]
1728pub struct ExecutionInfo {
1729 pub execution_id: ExecutionId,
1730 pub namespace: String,
1731 pub lane_id: String,
1732 pub priority: i32,
1733 pub execution_kind: String,
1734 pub state_vector: StateVector,
1735 pub public_state: PublicState,
1736 pub created_at: String,
1737 /// TimestampMs (ms since epoch) when the execution's first attempt
1738 /// was started by a worker claim. Empty string until the first
1739 /// claim lands. Serialised as `Option<String>` so pre-claim reads
1740 /// deserialise cleanly even if the field is absent from the wire.
1741 #[serde(default, skip_serializing_if = "Option::is_none")]
1742 pub started_at: Option<String>,
1743 /// TimestampMs when the execution reached a terminal
1744 /// `completed`/`failed`/`cancelled`/`expired` state. Empty /
1745 /// absent while still in flight.
1746 #[serde(default, skip_serializing_if = "Option::is_none")]
1747 pub completed_at: Option<String>,
1748 pub current_attempt_index: u32,
1749 pub flow_id: Option<String>,
1750 pub blocking_detail: String,
1751}
1752
1753// ─── set_execution_tags / set_flow_tags (issue #58.4) ───
1754
1755/// Args for `ff_set_execution_tags`. Tag keys MUST match
1756/// `^[a-z][a-z0-9_]*\.` — the caller-namespace rule — or the FCALL
1757/// returns `invalid_tag_key`. Values are arbitrary strings. The map is
1758/// ordered (`BTreeMap`) so two callers submitting the same logical set
1759/// of tags produce identical ARGV.
1760#[derive(Clone, Debug, Serialize, Deserialize)]
1761pub struct SetExecutionTagsArgs {
1762 pub execution_id: ExecutionId,
1763 pub tags: BTreeMap<String, String>,
1764}
1765
1766/// Result of `ff_set_execution_tags`.
1767#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1768pub enum SetExecutionTagsResult {
1769 /// Tags written. `count` is the number of key-value pairs applied.
1770 Ok { count: u32 },
1771}
1772
1773/// Args for `ff_set_flow_tags`. Same namespace rule as
1774/// [`SetExecutionTagsArgs`]. The Lua function also lazy-migrates any
1775/// pre-58.4 reserved-namespace fields stashed inline on `flow_core` into
1776/// the new tags key.
1777#[derive(Clone, Debug, Serialize, Deserialize)]
1778pub struct SetFlowTagsArgs {
1779 pub flow_id: FlowId,
1780 pub tags: BTreeMap<String, String>,
1781}
1782
1783/// Result of `ff_set_flow_tags`.
1784#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1785pub enum SetFlowTagsResult {
1786 /// Tags written. `count` is the number of key-value pairs applied.
1787 Ok { count: u32 },
1788}
1789
1790// ─── describe_execution (issue #58.1) ───
1791
1792/// Engine-decoupled read-model for one execution.
1793///
1794/// Returned by `ff_sdk::FlowFabricWorker::describe_execution`. Consumers
1795/// consult this struct instead of reaching into Valkey's exec_core hash
1796/// directly — the engine is free to rename fields or restructure storage
1797/// under this surface.
1798///
1799/// `#[non_exhaustive]` — FF may add fields in minor releases without a
1800/// semver break. Match with `..` or use field-by-field construction.
1801#[derive(Clone, Debug, PartialEq, Eq)]
1802#[non_exhaustive]
1803pub struct ExecutionSnapshot {
1804 pub execution_id: ExecutionId,
1805 pub flow_id: Option<FlowId>,
1806 pub lane_id: LaneId,
1807 pub namespace: Namespace,
1808 pub public_state: PublicState,
1809 /// Blocking reason string (e.g. `"waiting_for_worker"`,
1810 /// `"waiting_for_delay"`, `"waiting_for_dependencies"`). `None` when
1811 /// the exec_core field is empty.
1812 pub blocking_reason: Option<String>,
1813 /// Free-form operator-readable detail explaining `blocking_reason`.
1814 /// `None` when the exec_core field is empty.
1815 pub blocking_detail: Option<String>,
1816 /// Summary of the execution's currently-active attempt. `None` when
1817 /// no attempt has been started (pre-claim) or when the exec_core
1818 /// attempt fields are all empty.
1819 pub current_attempt: Option<AttemptSummary>,
1820 /// Summary of the execution's currently-held lease. `None` when the
1821 /// execution is not held by a worker.
1822 pub current_lease: Option<LeaseSummary>,
1823 /// The waitpoint this execution is currently suspended on, if any.
1824 pub current_waitpoint: Option<WaitpointId>,
1825 pub created_at: TimestampMs,
1826 /// Timestamp of the last write that mutated exec_core. Engine-maintained.
1827 pub last_mutation_at: TimestampMs,
1828 pub total_attempt_count: u32,
1829 /// Caller-owned labels. The prefix `^[a-z][a-z0-9_]*\.` is reserved for
1830 /// consumer metadata (e.g. `cairn.task_id`); FF guarantees it will not
1831 /// write keys matching that shape. FF's own fields stay in snake_case
1832 /// without dots. Empty when no tags are set.
1833 pub tags: BTreeMap<String, String>,
1834}
1835
1836impl ExecutionSnapshot {
1837 /// Construct an [`ExecutionSnapshot`]. Present so downstream crates
1838 /// (ff-sdk's `describe_execution`) can assemble the struct despite
1839 /// the `#[non_exhaustive]` marker. Prefer adding builder-style
1840 /// helpers here over loosening `non_exhaustive`.
1841 #[allow(clippy::too_many_arguments)]
1842 pub fn new(
1843 execution_id: ExecutionId,
1844 flow_id: Option<FlowId>,
1845 lane_id: LaneId,
1846 namespace: Namespace,
1847 public_state: PublicState,
1848 blocking_reason: Option<String>,
1849 blocking_detail: Option<String>,
1850 current_attempt: Option<AttemptSummary>,
1851 current_lease: Option<LeaseSummary>,
1852 current_waitpoint: Option<WaitpointId>,
1853 created_at: TimestampMs,
1854 last_mutation_at: TimestampMs,
1855 total_attempt_count: u32,
1856 tags: BTreeMap<String, String>,
1857 ) -> Self {
1858 Self {
1859 execution_id,
1860 flow_id,
1861 lane_id,
1862 namespace,
1863 public_state,
1864 blocking_reason,
1865 blocking_detail,
1866 current_attempt,
1867 current_lease,
1868 current_waitpoint,
1869 created_at,
1870 last_mutation_at,
1871 total_attempt_count,
1872 tags,
1873 }
1874 }
1875}
1876
1877/// Currently-active attempt summary inside an [`ExecutionSnapshot`].
1878///
1879/// `#[non_exhaustive]`.
1880#[derive(Clone, Debug, PartialEq, Eq)]
1881#[non_exhaustive]
1882pub struct AttemptSummary {
1883 pub attempt_id: AttemptId,
1884 pub attempt_index: AttemptIndex,
1885}
1886
1887impl AttemptSummary {
1888 /// Construct an [`AttemptSummary`]. See [`ExecutionSnapshot::new`]
1889 /// for the rationale — `#[non_exhaustive]` blocks cross-crate
1890 /// struct-literal construction.
1891 pub fn new(attempt_id: AttemptId, attempt_index: AttemptIndex) -> Self {
1892 Self {
1893 attempt_id,
1894 attempt_index,
1895 }
1896 }
1897}
1898
1899/// Currently-held lease summary inside an [`ExecutionSnapshot`].
1900///
1901/// `#[non_exhaustive]`.
1902#[derive(Clone, Debug, PartialEq, Eq)]
1903#[non_exhaustive]
1904pub struct LeaseSummary {
1905 pub lease_epoch: LeaseEpoch,
1906 pub worker_instance_id: WorkerInstanceId,
1907 pub expires_at: TimestampMs,
1908}
1909
1910impl LeaseSummary {
1911 /// Construct a [`LeaseSummary`]. See [`ExecutionSnapshot::new`]
1912 /// for the rationale.
1913 pub fn new(
1914 lease_epoch: LeaseEpoch,
1915 worker_instance_id: WorkerInstanceId,
1916 expires_at: TimestampMs,
1917 ) -> Self {
1918 Self {
1919 lease_epoch,
1920 worker_instance_id,
1921 expires_at,
1922 }
1923 }
1924}
1925
1926// ─── Common sub-types ───
1927
1928// ─── describe_flow (issue #58.2) ───
1929
1930/// Engine-decoupled read-model for one flow.
1931///
1932/// Returned by `ff_sdk::FlowFabricWorker::describe_flow`. Consumers
1933/// consult this struct instead of reaching into Valkey's flow_core hash
1934/// directly — the engine is free to rename fields or restructure storage
1935/// under this surface.
1936///
1937/// `#[non_exhaustive]` — FF may add fields in minor releases without a
1938/// semver break. Match with `..` or use [`FlowSnapshot::new`].
1939///
1940/// # `public_flow_state`
1941///
1942/// Stored as an engine-written string literal on `flow_core`. Known
1943/// values today: `open`, `running`, `blocked`, `cancelled`, `completed`,
1944/// `failed`. Surfaced as `String` (not a typed enum) because FF does
1945/// not yet expose a `PublicFlowState` type — callers that need to act
1946/// on specific values should match on the literal. The flow_projector
1947/// writes a parallel `public_flow_state` into the flow's summary hash;
1948/// this field reflects the authoritative value on `flow_core`, which
1949/// is what mutation guards (cancel/add-member) consult.
1950///
1951/// # `tags`
1952///
1953/// Unlike [`ExecutionSnapshot::tags`] (which has a dedicated tags
1954/// hash), flow tags live inline on `flow_core`. FF's own fields are
1955/// snake_case without a `.`; any field whose name starts with
1956/// `<lowercase>.` (e.g. `cairn.task_id`) is treated as consumer-owned
1957/// metadata and routed here. An empty map means no namespaced tags
1958/// were written. The prefix convention mirrors
1959/// [`ExecutionSnapshot::tags`] — consumers should keep tag keys
1960/// namespaced (`cairn.*`, `operator.*`, etc.) so future FF field
1961/// additions don't collide.
1962#[derive(Clone, Debug, PartialEq, Eq)]
1963#[non_exhaustive]
1964pub struct FlowSnapshot {
1965 pub flow_id: FlowId,
1966 /// The `flow_kind` literal passed to `create_flow` (e.g. `dag`,
1967 /// `pipeline`). Preserved as-is; FF does not interpret it.
1968 pub flow_kind: String,
1969 pub namespace: Namespace,
1970 /// Authoritative flow state on `flow_core`. See the struct-level
1971 /// docs for the set of known values.
1972 pub public_flow_state: String,
1973 /// Monotonically increasing revision bumped on every structural
1974 /// mutation (add-member, stage-edge). Used by optimistic-concurrency
1975 /// writers via `expected_graph_revision`.
1976 pub graph_revision: u64,
1977 /// Number of member executions added so far. Never decremented.
1978 pub node_count: u32,
1979 /// Number of dependency edges staged so far. Never decremented.
1980 pub edge_count: u32,
1981 pub created_at: TimestampMs,
1982 /// Timestamp of the last write that mutated `flow_core`.
1983 /// Engine-maintained.
1984 pub last_mutation_at: TimestampMs,
1985 /// When the flow reached a terminal state via `cancel_flow`. `None`
1986 /// while the flow is live. Only written by the cancel path today;
1987 /// `completed`/`failed` terminal states do not populate this field
1988 /// (the flow_projector derives them from membership).
1989 pub cancelled_at: Option<TimestampMs>,
1990 /// Operator-supplied reason from the `cancel_flow` call. `None`
1991 /// when the flow has not been cancelled.
1992 pub cancel_reason: Option<String>,
1993 /// The `cancellation_policy` value persisted by `cancel_flow`
1994 /// (e.g. `cancel_all`, `cancel_flow_only`). `None` for flows
1995 /// cancelled before this field was persisted, or not yet cancelled.
1996 pub cancellation_policy: Option<String>,
1997 /// Consumer-owned namespaced metadata (e.g. `cairn.task_id`). See
1998 /// the struct-level docs for the routing rule.
1999 pub tags: BTreeMap<String, String>,
2000}
2001
2002impl FlowSnapshot {
2003 /// Construct a [`FlowSnapshot`]. Present so downstream crates
2004 /// (ff-sdk's `describe_flow`) can assemble the struct despite the
2005 /// `#[non_exhaustive]` marker.
2006 #[allow(clippy::too_many_arguments)]
2007 pub fn new(
2008 flow_id: FlowId,
2009 flow_kind: String,
2010 namespace: Namespace,
2011 public_flow_state: String,
2012 graph_revision: u64,
2013 node_count: u32,
2014 edge_count: u32,
2015 created_at: TimestampMs,
2016 last_mutation_at: TimestampMs,
2017 cancelled_at: Option<TimestampMs>,
2018 cancel_reason: Option<String>,
2019 cancellation_policy: Option<String>,
2020 tags: BTreeMap<String, String>,
2021 ) -> Self {
2022 Self {
2023 flow_id,
2024 flow_kind,
2025 namespace,
2026 public_flow_state,
2027 graph_revision,
2028 node_count,
2029 edge_count,
2030 created_at,
2031 last_mutation_at,
2032 cancelled_at,
2033 cancel_reason,
2034 cancellation_policy,
2035 tags,
2036 }
2037 }
2038}
2039
2040// ─── describe_edge / list_*_edges (issue #58.3) ───
2041
2042/// Engine-decoupled read-model for one dependency edge.
2043///
2044/// Returned by `ff_sdk::FlowFabricWorker::describe_edge`,
2045/// `list_incoming_edges`, and `list_outgoing_edges`. Consumers consult
2046/// this struct instead of reaching into Valkey's per-flow `edge:` hash
2047/// directly — the engine is free to rename hash fields or restructure
2048/// key layout under this surface.
2049///
2050/// `#[non_exhaustive]` — FF may add fields in minor releases without a
2051/// semver break. Match with `..` or use [`EdgeSnapshot::new`].
2052///
2053/// # Fields
2054///
2055/// The struct mirrors the immutable edge record written by
2056/// `ff_stage_dependency_edge` (see `lua/flow.lua`). The flow-scoped
2057/// edge hash is only ever written once, at staging time; per-execution
2058/// resolution state lives on a separate `dep:<edge_id>` hash and is not
2059/// surfaced here. The `edge_state` field therefore reflects the
2060/// staging-time literal (currently `pending`), not the downstream
2061/// execution's dep-edge state.
2062#[derive(Clone, Debug, PartialEq, Eq)]
2063#[non_exhaustive]
2064pub struct EdgeSnapshot {
2065 pub edge_id: EdgeId,
2066 pub flow_id: FlowId,
2067 pub upstream_execution_id: ExecutionId,
2068 pub downstream_execution_id: ExecutionId,
2069 /// The `dependency_kind` literal (e.g. `success_only`) from
2070 /// `stage_dependency_edge`. Preserved as-is; FF does not interpret
2071 /// it on reads.
2072 pub dependency_kind: String,
2073 /// The satisfaction-condition literal stamped at staging time
2074 /// (e.g. `all_required`).
2075 pub satisfaction_condition: String,
2076 /// Optional opaque handle to a data-passing artifact. `None` when
2077 /// the stored field is empty (the most common case).
2078 pub data_passing_ref: Option<String>,
2079 /// Edge-state literal on the flow-scoped edge hash. Written once
2080 /// at staging as `pending`; this hash is immutable on the flow
2081 /// side. Per-execution resolution state is tracked separately on
2082 /// the child's `dep:<edge_id>` hash.
2083 pub edge_state: String,
2084 pub created_at: TimestampMs,
2085 /// Origin of the edge (e.g. `engine`). Preserved as-is.
2086 pub created_by: String,
2087}
2088
2089/// Direction marker for [`crate::engine_backend::EngineBackend::list_edges`].
2090///
2091/// Carries the subject execution whose adjacency side the caller wants
2092/// to list — mirrors the internal `AdjacencySide + subject_eid` pair
2093/// the ff-sdk free-fn `list_edges_from_set` already uses. Keeping
2094/// direction + subject fused in one enum means the trait method has a
2095/// single `direction` parameter rather than a `(side, eid)` pair, and
2096/// the backend impl can't forget one of the two.
2097///
2098/// * `Outgoing { from_node }` — the caller wants every edge whose
2099/// `upstream_execution_id == from_node`. Corresponds to the
2100/// `out:<execution_id>` adjacency SET under the execution's flow
2101/// partition.
2102/// * `Incoming { to_node }` — the caller wants every edge whose
2103/// `downstream_execution_id == to_node`. Corresponds to the
2104/// `in:<execution_id>` adjacency SET under the execution's flow
2105/// partition.
2106#[derive(Clone, Debug, PartialEq, Eq)]
2107pub enum EdgeDirection {
2108 /// Edges leaving `from_node` — `out:` adjacency SET.
2109 Outgoing {
2110 /// The subject execution whose outgoing edges to list.
2111 from_node: ExecutionId,
2112 },
2113 /// Edges landing on `to_node` — `in:` adjacency SET.
2114 Incoming {
2115 /// The subject execution whose incoming edges to list.
2116 to_node: ExecutionId,
2117 },
2118}
2119
2120impl EdgeDirection {
2121 /// Return the subject execution id regardless of direction. Shared
2122 /// helper for backend impls that need the execution id for the
2123 /// initial `HGET exec_core.flow_id` lookup (flow routing) before
2124 /// they know which adjacency SET to read.
2125 pub fn subject(&self) -> &ExecutionId {
2126 match self {
2127 Self::Outgoing { from_node } => from_node,
2128 Self::Incoming { to_node } => to_node,
2129 }
2130 }
2131}
2132
2133impl EdgeSnapshot {
2134 /// Construct an [`EdgeSnapshot`]. Present so downstream crates
2135 /// (ff-sdk's `describe_edge` / `list_*_edges`) can assemble the
2136 /// struct despite the `#[non_exhaustive]` marker.
2137 #[allow(clippy::too_many_arguments)]
2138 pub fn new(
2139 edge_id: EdgeId,
2140 flow_id: FlowId,
2141 upstream_execution_id: ExecutionId,
2142 downstream_execution_id: ExecutionId,
2143 dependency_kind: String,
2144 satisfaction_condition: String,
2145 data_passing_ref: Option<String>,
2146 edge_state: String,
2147 created_at: TimestampMs,
2148 created_by: String,
2149 ) -> Self {
2150 Self {
2151 edge_id,
2152 flow_id,
2153 upstream_execution_id,
2154 downstream_execution_id,
2155 dependency_kind,
2156 satisfaction_condition,
2157 data_passing_ref,
2158 edge_state,
2159 created_at,
2160 created_by,
2161 }
2162 }
2163}
2164
2165/// Summary of state after a mutation, returned by many functions.
2166#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2167pub struct StateSummary {
2168 pub state_vector: StateVector,
2169 pub current_attempt_index: AttemptIndex,
2170}
2171
2172#[cfg(test)]
2173mod tests {
2174 use super::*;
2175 use crate::types::FlowId;
2176
2177 #[test]
2178 fn create_execution_args_serde() {
2179 let config = crate::partition::PartitionConfig::default();
2180 let args = CreateExecutionArgs {
2181 execution_id: ExecutionId::for_flow(&FlowId::new(), &config),
2182 namespace: Namespace::new("test"),
2183 lane_id: LaneId::new("default"),
2184 execution_kind: "llm_call".to_owned(),
2185 input_payload: b"hello".to_vec(),
2186 payload_encoding: Some("json".to_owned()),
2187 priority: 0,
2188 creator_identity: "test-user".to_owned(),
2189 idempotency_key: None,
2190 tags: HashMap::new(),
2191 policy: None,
2192 delay_until: None,
2193 execution_deadline_at: None,
2194 partition_id: 42,
2195 now: TimestampMs::now(),
2196 };
2197 let json = serde_json::to_string(&args).unwrap();
2198 let parsed: CreateExecutionArgs = serde_json::from_str(&json).unwrap();
2199 assert_eq!(args.execution_id, parsed.execution_id);
2200 }
2201
2202 #[test]
2203 fn claim_result_serde() {
2204 let config = crate::partition::PartitionConfig::default();
2205 let result = ClaimExecutionResult::Claimed(ClaimedExecution {
2206 execution_id: ExecutionId::for_flow(&FlowId::new(), &config),
2207 lease_id: LeaseId::new(),
2208 lease_epoch: LeaseEpoch::new(1),
2209 attempt_index: AttemptIndex::new(0),
2210 attempt_id: AttemptId::new(),
2211 attempt_type: AttemptType::Initial,
2212 lease_expires_at: TimestampMs::from_millis(1000),
2213 });
2214 let json = serde_json::to_string(&result).unwrap();
2215 let parsed: ClaimExecutionResult = serde_json::from_str(&json).unwrap();
2216 assert_eq!(result, parsed);
2217 }
2218
2219 // ── StreamCursor (issue #92) ──
2220
2221 #[test]
2222 fn stream_cursor_display_matches_wire_tokens() {
2223 assert_eq!(StreamCursor::Start.to_string(), "start");
2224 assert_eq!(StreamCursor::End.to_string(), "end");
2225 assert_eq!(StreamCursor::At("123".into()).to_string(), "123");
2226 assert_eq!(StreamCursor::At("123-4".into()).to_string(), "123-4");
2227 }
2228
2229 #[test]
2230 fn stream_cursor_to_wire_maps_to_valkey_markers() {
2231 assert_eq!(StreamCursor::Start.to_wire(), "-");
2232 assert_eq!(StreamCursor::End.to_wire(), "+");
2233 assert_eq!(StreamCursor::At("0-0".into()).to_wire(), "0-0");
2234 assert_eq!(StreamCursor::At("17-3".into()).to_wire(), "17-3");
2235 }
2236
2237 #[test]
2238 fn stream_cursor_from_str_accepts_wire_tokens() {
2239 use std::str::FromStr;
2240 assert_eq!(
2241 StreamCursor::from_str("start").unwrap(),
2242 StreamCursor::Start
2243 );
2244 assert_eq!(StreamCursor::from_str("end").unwrap(), StreamCursor::End);
2245 assert_eq!(
2246 StreamCursor::from_str("123").unwrap(),
2247 StreamCursor::At("123".into())
2248 );
2249 assert_eq!(
2250 StreamCursor::from_str("0-0").unwrap(),
2251 StreamCursor::At("0-0".into())
2252 );
2253 assert_eq!(
2254 StreamCursor::from_str("1713100800150-0").unwrap(),
2255 StreamCursor::At("1713100800150-0".into())
2256 );
2257 }
2258
2259 #[test]
2260 fn stream_cursor_from_str_rejects_bare_markers() {
2261 use std::str::FromStr;
2262 assert!(matches!(
2263 StreamCursor::from_str("-"),
2264 Err(StreamCursorParseError::BareMarkerRejected(s)) if s == "-"
2265 ));
2266 assert!(matches!(
2267 StreamCursor::from_str("+"),
2268 Err(StreamCursorParseError::BareMarkerRejected(s)) if s == "+"
2269 ));
2270 }
2271
2272 #[test]
2273 fn stream_cursor_from_str_rejects_empty() {
2274 use std::str::FromStr;
2275 assert_eq!(
2276 StreamCursor::from_str(""),
2277 Err(StreamCursorParseError::Empty)
2278 );
2279 }
2280
2281 #[test]
2282 fn stream_cursor_from_str_rejects_malformed() {
2283 use std::str::FromStr;
2284 for bad in [
2285 "abc", "-1", "1-", "-1-2", "1-2-3", "1.2", "1 2", "Start", "END",
2286 ] {
2287 assert!(
2288 matches!(
2289 StreamCursor::from_str(bad),
2290 Err(StreamCursorParseError::Malformed(_))
2291 ),
2292 "must reject {bad:?}",
2293 );
2294 }
2295 }
2296
2297 #[test]
2298 fn stream_cursor_from_str_rejects_non_ascii() {
2299 use std::str::FromStr;
2300 assert!(matches!(
2301 StreamCursor::from_str("1\u{2013}2"),
2302 Err(StreamCursorParseError::Malformed(_))
2303 ));
2304 }
2305
2306 #[test]
2307 fn stream_cursor_serde_round_trip() {
2308 for c in [
2309 StreamCursor::Start,
2310 StreamCursor::End,
2311 StreamCursor::At("0-0".into()),
2312 StreamCursor::At("1713100800150-0".into()),
2313 ] {
2314 let json = serde_json::to_string(&c).unwrap();
2315 let back: StreamCursor = serde_json::from_str(&json).unwrap();
2316 assert_eq!(back, c);
2317 }
2318 }
2319
2320 #[test]
2321 fn stream_cursor_serializes_as_bare_string() {
2322 assert_eq!(
2323 serde_json::to_string(&StreamCursor::Start).unwrap(),
2324 r#""start""#
2325 );
2326 assert_eq!(
2327 serde_json::to_string(&StreamCursor::End).unwrap(),
2328 r#""end""#
2329 );
2330 assert_eq!(
2331 serde_json::to_string(&StreamCursor::At("123-0".into())).unwrap(),
2332 r#""123-0""#
2333 );
2334 }
2335
2336 #[test]
2337 fn stream_cursor_deserialize_rejects_bare_markers() {
2338 assert!(serde_json::from_str::<StreamCursor>(r#""-""#).is_err());
2339 assert!(serde_json::from_str::<StreamCursor>(r#""+""#).is_err());
2340 }
2341
2342 #[test]
2343 fn stream_cursor_from_beginning_is_zero_zero() {
2344 assert_eq!(
2345 StreamCursor::from_beginning(),
2346 StreamCursor::At("0-0".into())
2347 );
2348 }
2349
2350 #[test]
2351 fn stream_cursor_is_concrete_classifies_variants() {
2352 assert!(!StreamCursor::Start.is_concrete());
2353 assert!(!StreamCursor::End.is_concrete());
2354 assert!(StreamCursor::At("0-0".into()).is_concrete());
2355 assert!(StreamCursor::At("123-0".into()).is_concrete());
2356 assert!(StreamCursor::from_beginning().is_concrete());
2357 }
2358
2359 #[test]
2360 fn stream_cursor_into_wire_string_moves_without_cloning() {
2361 assert_eq!(StreamCursor::Start.into_wire_string(), "-");
2362 assert_eq!(StreamCursor::End.into_wire_string(), "+");
2363 assert_eq!(StreamCursor::At("17-3".into()).into_wire_string(), "17-3");
2364 }
2365}
2366
2367// ─── list_executions ───
2368
2369/// Summary of an execution for list views.
2370#[derive(Clone, Debug, Serialize, Deserialize)]
2371pub struct ExecutionSummary {
2372 pub execution_id: ExecutionId,
2373 pub namespace: String,
2374 pub lane_id: String,
2375 pub execution_kind: String,
2376 pub public_state: String,
2377 pub priority: i32,
2378 pub created_at: String,
2379}
2380
2381/// Result of a list_executions query.
2382#[derive(Clone, Debug, Serialize, Deserialize)]
2383pub struct ListExecutionsResult {
2384 pub executions: Vec<ExecutionSummary>,
2385 pub total_returned: usize,
2386}
2387
2388// ─── rotate_waitpoint_hmac_secret ───
2389
2390/// Args for `ff_rotate_waitpoint_hmac_secret`. Rotates the HMAC signing
2391/// kid on ONE partition. Callers fan out across every partition themselves
2392/// (ff-server does the parallel fan-out in `rotate_waitpoint_secret`;
2393/// direct-Valkey consumers mirror the pattern).
2394///
2395/// "now" is derived server-side from `redis.call("TIME")` inside the FCALL
2396/// (consistency with `validate_waitpoint_token` and flow scanners).
2397/// `grace_ms` is a duration, not a clock value, so carrying it from the
2398/// caller is safe.
2399#[derive(Clone, Debug)]
2400pub struct RotateWaitpointHmacSecretArgs {
2401 pub new_kid: String,
2402 pub new_secret_hex: String,
2403 /// Grace window in ms. Must be non-negative. Tokens signed by the
2404 /// outgoing kid remain valid for `grace_ms` after this rotation.
2405 pub grace_ms: u64,
2406}
2407
2408/// Outcome of a single-partition rotation.
2409#[derive(Clone, Debug, PartialEq, Eq)]
2410pub enum RotateWaitpointHmacSecretOutcome {
2411 /// Installed the new kid. `previous_kid` is `None` on bootstrap
2412 /// (no prior `current_kid`). `gc_count` counts expired kids reaped
2413 /// during this rotation.
2414 Rotated {
2415 previous_kid: Option<String>,
2416 new_kid: String,
2417 gc_count: u32,
2418 },
2419 /// Exact replay — same kid + same secret already installed. Safe
2420 /// operator retry; no state change.
2421 Noop { kid: String },
2422}
2423
2424// ─── list_waitpoint_hmac_kids ───
2425
2426#[derive(Clone, Debug, PartialEq, Eq)]
2427pub struct ListWaitpointHmacKidsArgs {}
2428
2429/// Snapshot of the waitpoint HMAC keystore on ONE partition.
2430#[derive(Clone, Debug, PartialEq, Eq)]
2431pub struct WaitpointHmacKids {
2432 /// The currently-signing kid. `None` if uninitialized.
2433 pub current_kid: Option<String>,
2434 /// Kids that still validate existing tokens but no longer sign
2435 /// new ones. Order is Lua HGETALL traversal order — callers that
2436 /// need a stable sort should sort by `expires_at_ms`.
2437 pub verifying: Vec<VerifyingKid>,
2438}
2439
2440#[derive(Clone, Debug, PartialEq, Eq)]
2441pub struct VerifyingKid {
2442 pub kid: String,
2443 pub expires_at_ms: i64,
2444}