ff_core/contracts/mod.rs
1//! Phase 1 function contracts — Args + Result types for each FCALL.
2//!
3//! Each Args struct defines the typed inputs to a Valkey Function.
4//! Each Result enum defines the possible outcomes (success variants + error codes).
5
6pub mod decode;
7
8use crate::policy::ExecutionPolicy;
9use crate::state::{AttemptType, PublicState, StateVector};
10use crate::types::{
11 AttemptId, AttemptIndex, CancelSource, EdgeId, ExecutionId, FlowId, LaneId, LeaseEpoch,
12 LeaseFence, LeaseId, Namespace, SignalId, SuspensionId, TimestampMs, WaitpointId,
13 WaitpointToken, WorkerId, WorkerInstanceId,
14};
15use serde::{Deserialize, Serialize};
16use std::collections::{BTreeMap, BTreeSet, HashMap};
17
18// ─── create_execution ───
19
20#[derive(Clone, Debug, Serialize, Deserialize)]
21pub struct CreateExecutionArgs {
22 pub execution_id: ExecutionId,
23 pub namespace: Namespace,
24 pub lane_id: LaneId,
25 pub execution_kind: String,
26 pub input_payload: Vec<u8>,
27 #[serde(default)]
28 pub payload_encoding: Option<String>,
29 pub priority: i32,
30 pub creator_identity: String,
31 #[serde(default)]
32 pub idempotency_key: Option<String>,
33 #[serde(default)]
34 pub tags: HashMap<String, String>,
35 /// Execution policy (retry, timeout, suspension, routing, etc.).
36 #[serde(default)]
37 pub policy: Option<ExecutionPolicy>,
38 /// If set and in the future, execution starts delayed.
39 #[serde(default)]
40 pub delay_until: Option<TimestampMs>,
41 /// Absolute deadline timestamp (ms). Execution expires if not complete by this time.
42 #[serde(default)]
43 pub execution_deadline_at: Option<TimestampMs>,
44 /// Partition ID (pre-computed).
45 pub partition_id: u16,
46 pub now: TimestampMs,
47}
48
49#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
50pub enum CreateExecutionResult {
51 /// Execution created successfully.
52 Created {
53 execution_id: ExecutionId,
54 public_state: PublicState,
55 },
56 /// Idempotent duplicate — existing execution returned.
57 Duplicate { execution_id: ExecutionId },
58}
59
60// ─── issue_claim_grant ───
61
62/// Inputs to [`crate::engine_backend::EngineBackend::issue_claim_grant`]
63/// — the trait-level entry point v0.12 PR-5 lifted out of the SDK-side
64/// `FlowFabricWorker::claim_next` inline helper.
65///
66/// `#[non_exhaustive]` + `::new` per
67/// `feedback_non_exhaustive_needs_constructor`: future fields may be
68/// added in minor releases; consumers MUST construct via
69/// [`Self::new`] and populate optional fields (`capability_hash`,
70/// `route_snapshot_json`, `admission_summary`) by direct field
71/// assignment on the returned value. Struct-literal construction is
72/// blocked by `#[non_exhaustive]`; `..Default::default()` is not
73/// available for the same reason.
74///
75/// Carries the execution's [`crate::partition::Partition`] so the
76/// Valkey backend can derive `exec_core` / `claim_grant` / the lane's
77/// `eligible_zset` KEYS without a second round-trip.
78///
79/// Does NOT derive `Serialize` / `Deserialize` — this is a
80/// trait-boundary args struct, not a wire-format type; the
81/// `#[non_exhaustive]` marker already blocks cross-crate struct-
82/// literal construction, which matters more than JSON round-trip
83/// for a scanner hot-path primitive.
84#[derive(Clone, Debug)]
85#[non_exhaustive]
86pub struct IssueClaimGrantArgs {
87 pub execution_id: ExecutionId,
88 pub lane_id: LaneId,
89 pub worker_id: WorkerId,
90 pub worker_instance_id: WorkerInstanceId,
91 /// Partition context for KEYS derivation. v0.12 PR-5.
92 pub partition: crate::partition::Partition,
93 pub capability_hash: Option<String>,
94 pub route_snapshot_json: Option<String>,
95 pub admission_summary: Option<String>,
96 /// Capabilities this worker advertises. Serialized as a sorted,
97 /// comma-separated string to the Lua FCALL (see scheduling.lua
98 /// ff_issue_claim_grant). An empty set matches only executions whose
99 /// `required_capabilities` is also empty.
100 pub worker_capabilities: BTreeSet<String>,
101 pub grant_ttl_ms: u64,
102 /// Caller-side timestamp for bookkeeping. NOT passed to the Lua FCALL —
103 /// ff_issue_claim_grant uses `redis.call("TIME")` for grant_expires_at.
104 pub now: TimestampMs,
105}
106
107impl IssueClaimGrantArgs {
108 /// Construct an `IssueClaimGrantArgs`. Added alongside
109 /// `#[non_exhaustive]` per `feedback_non_exhaustive_needs_constructor`
110 /// so the SDK worker (and any future caller) can build the args
111 /// without the struct literal becoming a cross-crate breaking
112 /// change on every minor release.
113 #[allow(clippy::too_many_arguments)]
114 pub fn new(
115 execution_id: ExecutionId,
116 lane_id: LaneId,
117 worker_id: WorkerId,
118 worker_instance_id: WorkerInstanceId,
119 partition: crate::partition::Partition,
120 worker_capabilities: BTreeSet<String>,
121 grant_ttl_ms: u64,
122 now: TimestampMs,
123 ) -> Self {
124 Self {
125 execution_id,
126 lane_id,
127 worker_id,
128 worker_instance_id,
129 partition,
130 capability_hash: None,
131 route_snapshot_json: None,
132 admission_summary: None,
133 worker_capabilities,
134 grant_ttl_ms,
135 now,
136 }
137 }
138}
139
140/// Typed outcome of [`crate::engine_backend::EngineBackend::issue_claim_grant`].
141///
142/// Single-variant today — the Valkey FCALL either writes the grant
143/// and returns `Granted`, or the Lua reject (capability mismatch,
144/// already-granted, etc.) surfaces as a typed [`crate::engine_error::EngineError`]
145/// on the outer `Result`. `#[non_exhaustive]` reserves room for
146/// future additive variants without a breaking match-arm churn on
147/// consumers.
148#[derive(Clone, Debug, PartialEq, Eq)]
149#[non_exhaustive]
150pub enum IssueClaimGrantOutcome {
151 /// Grant issued.
152 Granted { execution_id: ExecutionId },
153}
154
155/// Legacy name for `IssueClaimGrantOutcome` — retained for
156/// `ff-script`'s `FromFcallResult` plumbing. Prefer
157/// [`IssueClaimGrantOutcome`] in trait-level code.
158#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
159pub enum IssueClaimGrantResult {
160 /// Grant issued.
161 Granted { execution_id: ExecutionId },
162}
163
164// ─── scan_eligible_executions + block_route (v0.12 PR-5) ───
165
166/// Inputs to [`crate::engine_backend::EngineBackend::scan_eligible_executions`].
167///
168/// Lifted from the SDK-side `ZRANGEBYSCORE` inline on the
169/// `claim_next` scanner (v0.12 PR-5). The backend reads the lane's
170/// eligible ZSET on the given partition and returns up to `limit`
171/// execution ids in priority order (Valkey: score = `-(priority *
172/// 1e12) + created_at_ms`, so `+inf`-bounded ZRANGEBYSCORE with
173/// `LIMIT 0 <limit>` yields highest-priority-first).
174#[derive(Clone, Debug)]
175#[non_exhaustive]
176pub struct ScanEligibleArgs {
177 pub lane_id: LaneId,
178 pub partition: crate::partition::Partition,
179 /// Maximum number of execution ids to return. Backends MAY
180 /// return fewer when the partition has less work.
181 pub limit: u32,
182}
183
184impl ScanEligibleArgs {
185 /// Construct a `ScanEligibleArgs`. Added alongside
186 /// `#[non_exhaustive]` per
187 /// `feedback_non_exhaustive_needs_constructor`.
188 pub fn new(
189 lane_id: LaneId,
190 partition: crate::partition::Partition,
191 limit: u32,
192 ) -> Self {
193 Self {
194 lane_id,
195 partition,
196 limit,
197 }
198 }
199}
200
201/// Inputs to [`crate::engine_backend::EngineBackend::block_route`].
202///
203/// Lifted from the SDK-side `ff_block_execution_for_admission`
204/// inline helper on the `claim_next` scanner (v0.12 PR-5). Moves an
205/// execution from the lane's eligible ZSET into its blocked_route
206/// ZSET after a `CapabilityMismatch` reject — the engine's unblock
207/// scanner promotes blocked_route back to eligible once a worker
208/// with matching caps registers.
209#[derive(Clone, Debug)]
210#[non_exhaustive]
211pub struct BlockRouteArgs {
212 pub execution_id: ExecutionId,
213 pub lane_id: LaneId,
214 pub partition: crate::partition::Partition,
215 /// Free-form block reason code (e.g. `"waiting_for_capable_worker"`).
216 pub reason_code: String,
217 /// Human-readable reason detail for operator logs.
218 pub reason_detail: String,
219 pub now: TimestampMs,
220}
221
222impl BlockRouteArgs {
223 /// Construct a `BlockRouteArgs`.
224 pub fn new(
225 execution_id: ExecutionId,
226 lane_id: LaneId,
227 partition: crate::partition::Partition,
228 reason_code: String,
229 reason_detail: String,
230 now: TimestampMs,
231 ) -> Self {
232 Self {
233 execution_id,
234 lane_id,
235 partition,
236 reason_code,
237 reason_detail,
238 now,
239 }
240 }
241}
242
243/// Typed outcome of [`crate::engine_backend::EngineBackend::block_route`].
244///
245/// `LuaRejected` captures the logical-reject case (e.g. the execution
246/// went terminal between pick and block — eligible ZSET is left
247/// unchanged and the caller should simply `continue` to the next
248/// partition). Transport faults surface on the outer `Result` as
249/// [`crate::engine_error::EngineError::Transport`]; callers that
250/// want the pre-PR-5 "best-effort, log-and-continue" semantic wrap
251/// the call in a `match` and swallow non-success variants.
252#[derive(Clone, Debug, PartialEq, Eq)]
253#[non_exhaustive]
254pub enum BlockRouteOutcome {
255 /// Execution moved from eligible → blocked_route successfully.
256 Blocked { execution_id: ExecutionId },
257 /// Lua returned a non-success result (e.g. execution went
258 /// terminal between pick and block). `message` carries the Lua
259 /// reject code for operator visibility.
260 LuaRejected { message: String },
261}
262
263/// A claim grant issued by the scheduler for a specific execution.
264///
265/// The worker uses this to call `ff_claim_execution` (or
266/// `ff_acquire_lease`), which atomically consumes the grant and
267/// creates the lease.
268///
269/// Shared wire-level type between `ff-scheduler` (issuer) and
270/// `ff-sdk` (consumer, via `FlowFabricWorker::claim_from_grant`).
271/// Lives in `ff-core` so neither crate needs a dep on the other.
272///
273/// **Lane asymmetry with [`ResumeGrant`]:** `ClaimGrant` does NOT
274/// carry `lane_id`. The issuing scheduler's caller already picked
275/// a lane (that's how admission reached this grant) and passes it
276/// through to `claim_from_grant` as a separate argument. The grant
277/// handle stays narrow to what uniquely identifies the admission
278/// decision. The matching field on [`ResumeGrant`] is an
279/// intentional divergence — see the note on that type.
280#[derive(Clone, Debug, PartialEq, Eq)]
281#[non_exhaustive]
282pub struct ClaimGrant {
283 /// The execution that was granted.
284 pub execution_id: ExecutionId,
285 /// Opaque partition handle for this execution's hash-tag slot.
286 ///
287 /// Public wire type: consumers pass it back to FlowFabric but
288 /// must not parse the interior hash tag for routing decisions.
289 /// Internal consumers that need the typed
290 /// [`crate::partition::Partition`] call [`Self::partition`].
291 pub partition_key: crate::partition::PartitionKey,
292 /// The Valkey key holding the grant hash (for the worker to
293 /// reference).
294 pub grant_key: String,
295 /// When the grant expires if not consumed.
296 pub expires_at_ms: u64,
297}
298
299impl ClaimGrant {
300 /// Construct a fresh-claim grant. Added alongside `#[non_exhaustive]`
301 /// per RFC-024 §3.1 + `feedback_non_exhaustive_needs_constructor`.
302 pub fn new(
303 execution_id: ExecutionId,
304 partition_key: crate::partition::PartitionKey,
305 grant_key: String,
306 expires_at_ms: u64,
307 ) -> Self {
308 Self {
309 execution_id,
310 partition_key,
311 grant_key,
312 expires_at_ms,
313 }
314 }
315
316 /// Parse `partition_key` into a typed
317 /// [`crate::partition::Partition`]. Intended for internal
318 /// consumers (scheduler emitter, SDK worker claim path) that
319 /// need the family/index pair. Fails only on malformed keys
320 /// (which indicates a producer bug).
321 ///
322 /// Alias collapse applies: a grant issued against `Execution`
323 /// family round-trips to `Flow` (see [`crate::partition::PartitionKey`]
324 /// for the rationale — routing is preserved, only the metadata
325 /// family label normalises).
326 pub fn partition(
327 &self,
328 ) -> Result<crate::partition::Partition, crate::partition::PartitionKeyParseError> {
329 self.partition_key.parse()
330 }
331}
332
333/// A resume grant issued for a resumed (attempt_interrupted) execution.
334///
335/// Issued by a producer (typically `ff-scheduler` once a Batch-C
336/// reclaim scanner is in place; test fixtures in the interim — no
337/// production Rust caller exists in-tree today). Consumed by
338/// [`FlowFabricWorker::claim_from_resume_grant`], which calls
339/// `ff_claim_resumed_execution` atomically: that FCALL validates the
340/// grant, consumes it, and transitions `attempt_interrupted` →
341/// `started` while preserving the existing `attempt_index` +
342/// `attempt_id` (a resumed execution re-uses its attempt; it does
343/// not start a new one).
344///
345/// **Naming history (RFC-024).** This type was historically called
346/// `ReclaimGrant`, but its semantic has always been resume-after-
347/// suspend (the routing FCALL is `ff_claim_resumed_execution`, not
348/// `ff_reclaim_execution`). RFC-024 PR-A renamed the type to
349/// `ResumeGrant` — the name now matches the semantic. RFC-024 PR-B+C
350/// dropped the transitional `ReclaimGrant = ResumeGrant` alias and
351/// introduced a distinct new [`ReclaimGrant`] for the lease-reclaim
352/// path (`reclaim_execution` / `ff_reclaim_execution`).
353///
354/// Mirrors [`ClaimGrant`] for the resume path. Differences:
355///
356/// * [`ClaimGrant`] is issued against a freshly-eligible
357/// execution and `ff_claim_execution` creates a new attempt.
358/// * `ResumeGrant` is issued against an `attempt_interrupted`
359/// execution; `ff_claim_resumed_execution` re-uses the existing
360/// attempt and bumps the lease epoch.
361///
362/// The grant itself is written to the same `claim_grant` Valkey key
363/// that [`ClaimGrant`] uses; the distinction is which Lua FCALL
364/// consumes it (`ff_claim_execution` for new attempts,
365/// `ff_claim_resumed_execution` for resumes).
366///
367/// **Lane asymmetry with [`ClaimGrant`]:** `ResumeGrant` CARRIES
368/// `lane_id` as a field. The issuing path already knows the lane
369/// (it's read from `exec_core` at grant time); carrying it here
370/// spares the consumer a `HGET exec_core lane_id` round trip on
371/// the hot claim path. The asymmetry is intentional — prefer
372/// one-fewer-HGET on a type that already lives with the resumer's
373/// lifecycle over strict handle symmetry with `ClaimGrant`.
374///
375/// Shared wire-level type between the eventual `ff-scheduler`
376/// producer (Batch-C reclaim scanner — not yet in-tree; test
377/// fixtures construct this type today) and `ff-sdk` (consumer, via
378/// `FlowFabricWorker::claim_from_resume_grant`). Lives in
379/// `ff-core` so neither crate needs a dep on the other.
380///
381/// [`FlowFabricWorker::claim_from_resume_grant`]: https://docs.rs/ff-sdk
382#[derive(Clone, Debug, PartialEq, Eq)]
383#[non_exhaustive]
384pub struct ResumeGrant {
385 /// The execution granted for resumption.
386 pub execution_id: ExecutionId,
387 /// Opaque partition handle for this execution's hash-tag slot.
388 ///
389 /// Same wire-opacity contract as [`ClaimGrant::partition_key`].
390 /// Internal consumers call [`Self::partition`] for the parsed
391 /// form.
392 pub partition_key: crate::partition::PartitionKey,
393 /// Valkey key of the grant hash — same key shape as
394 /// [`ClaimGrant`].
395 pub grant_key: String,
396 /// Monotonic ms when the grant expires; unconsumed grants
397 /// vanish.
398 pub expires_at_ms: u64,
399 /// Lane the execution belongs to. Needed by
400 /// `ff_claim_resumed_execution` for `KEYS[3]` (eligible_zset)
401 /// and `KEYS[9]` (active_index).
402 pub lane_id: LaneId,
403}
404
405impl ResumeGrant {
406 /// Construct a resume grant. Added alongside `#[non_exhaustive]`
407 /// per RFC-024 §3.1 + `feedback_non_exhaustive_needs_constructor`.
408 pub fn new(
409 execution_id: ExecutionId,
410 partition_key: crate::partition::PartitionKey,
411 grant_key: String,
412 expires_at_ms: u64,
413 lane_id: LaneId,
414 ) -> Self {
415 Self {
416 execution_id,
417 partition_key,
418 grant_key,
419 expires_at_ms,
420 lane_id,
421 }
422 }
423
424 /// Parse `partition_key` into a typed
425 /// [`crate::partition::Partition`]. See [`ClaimGrant::partition`]
426 /// for the alias-collapse note.
427 pub fn partition(
428 &self,
429 ) -> Result<crate::partition::Partition, crate::partition::PartitionKeyParseError> {
430 self.partition_key.parse()
431 }
432}
433
434/// A lease-reclaim grant issued for an execution in
435/// `lease_expired_reclaimable` or `lease_revoked` state (RFC-024 §3.1).
436///
437/// Distinct from [`ResumeGrant`]: the reclaim grant routes to
438/// `ff_reclaim_execution` (Valkey) / the new-attempt reclaim impl
439/// (PG/SQLite), which creates a NEW attempt row and bumps the
440/// execution's `lease_reclaim_count`. The resume grant, by contrast,
441/// re-uses the existing attempt under `ff_claim_resumed_execution`.
442///
443/// Carries `lane_id` for symmetry with [`ResumeGrant`] — the Lua
444/// `ff_reclaim_execution` needs the lane for key construction, and
445/// the consuming worker would otherwise pay a round-trip to recover
446/// it from `exec_core`.
447///
448/// Backend impl bodies ship under PR-D (PG) / PR-E (SQLite) / PR-F
449/// (Valkey). This PR lands only the type + trait surface; default
450/// [`crate::engine_backend::EngineBackend::issue_reclaim_grant`] and
451/// [`crate::engine_backend::EngineBackend::reclaim_execution`] return
452/// [`crate::engine_error::EngineError::Unavailable`] until each
453/// backend PR wires its real body.
454#[derive(Clone, Debug, PartialEq, Eq)]
455#[non_exhaustive]
456pub struct ReclaimGrant {
457 /// The execution granted for lease-reclaim.
458 pub execution_id: ExecutionId,
459 /// Opaque partition handle for this execution's hash-tag slot.
460 pub partition_key: crate::partition::PartitionKey,
461 /// Backend-scoped grant key (Valkey key / PG+SQLite
462 /// `ff_claim_grant.grant_id`).
463 pub grant_key: String,
464 /// Monotonic ms when the grant expires; unconsumed grants vanish.
465 pub expires_at_ms: u64,
466 /// Lane the execution belongs to — needed by
467 /// `ff_reclaim_execution` for `KEYS[*]` construction.
468 pub lane_id: LaneId,
469}
470
471impl ReclaimGrant {
472 /// Construct a reclaim grant. Added alongside `#[non_exhaustive]`
473 /// per RFC-024 §3.1 + `feedback_non_exhaustive_needs_constructor`.
474 pub fn new(
475 execution_id: ExecutionId,
476 partition_key: crate::partition::PartitionKey,
477 grant_key: String,
478 expires_at_ms: u64,
479 lane_id: LaneId,
480 ) -> Self {
481 Self {
482 execution_id,
483 partition_key,
484 grant_key,
485 expires_at_ms,
486 lane_id,
487 }
488 }
489
490 /// Parse `partition_key` into a typed
491 /// [`crate::partition::Partition`].
492 pub fn partition(
493 &self,
494 ) -> Result<crate::partition::Partition, crate::partition::PartitionKeyParseError> {
495 self.partition_key.parse()
496 }
497}
498
499// ─── claim_execution ───
500
501#[derive(Clone, Debug, Serialize, Deserialize)]
502#[non_exhaustive]
503pub struct ClaimExecutionArgs {
504 pub execution_id: ExecutionId,
505 pub worker_id: WorkerId,
506 pub worker_instance_id: WorkerInstanceId,
507 pub lane_id: LaneId,
508 pub lease_id: LeaseId,
509 pub lease_ttl_ms: u64,
510 pub attempt_id: AttemptId,
511 /// Expected attempt index (pre-read from exec_core.total_attempt_count).
512 /// Used for KEYS construction — must match what the Lua computes.
513 pub expected_attempt_index: AttemptIndex,
514 /// JSON-encoded attempt policy snapshot.
515 #[serde(default)]
516 pub attempt_policy_json: String,
517 /// Per-attempt timeout in ms.
518 #[serde(default)]
519 pub attempt_timeout_ms: Option<u64>,
520 /// Total execution deadline (absolute timestamp ms).
521 #[serde(default)]
522 pub execution_deadline_at: Option<i64>,
523 pub now: TimestampMs,
524}
525
526impl ClaimExecutionArgs {
527 /// Construct a `ClaimExecutionArgs`. Added alongside
528 /// `#[non_exhaustive]` per `feedback_non_exhaustive_needs_constructor`
529 /// so consumers (SDK worker, backend impls) can still build the
530 /// args after the struct was sealed for forward-compat.
531 #[allow(clippy::too_many_arguments)]
532 pub fn new(
533 execution_id: ExecutionId,
534 worker_id: WorkerId,
535 worker_instance_id: WorkerInstanceId,
536 lane_id: LaneId,
537 lease_id: LeaseId,
538 lease_ttl_ms: u64,
539 attempt_id: AttemptId,
540 expected_attempt_index: AttemptIndex,
541 attempt_policy_json: String,
542 attempt_timeout_ms: Option<u64>,
543 execution_deadline_at: Option<i64>,
544 now: TimestampMs,
545 ) -> Self {
546 Self {
547 execution_id,
548 worker_id,
549 worker_instance_id,
550 lane_id,
551 lease_id,
552 lease_ttl_ms,
553 attempt_id,
554 expected_attempt_index,
555 attempt_policy_json,
556 attempt_timeout_ms,
557 execution_deadline_at,
558 now,
559 }
560 }
561}
562
563#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
564#[non_exhaustive]
565pub struct ClaimedExecution {
566 pub execution_id: ExecutionId,
567 pub lease_id: LeaseId,
568 pub lease_epoch: LeaseEpoch,
569 pub attempt_index: AttemptIndex,
570 pub attempt_id: AttemptId,
571 pub attempt_type: AttemptType,
572 pub lease_expires_at: TimestampMs,
573 /// Backend-populated attempt handle for this claim (v0.12 PR-5.5).
574 /// Valkey fills in an encoded `HandleKind::Fresh`; PG/SQLite are
575 /// `Unavailable` on `claim_execution` at runtime per
576 /// `project_claim_from_grant_pg_sqlite_gap.md`, so the field stays
577 /// a stub on those paths.
578 #[serde(default = "crate::backend::stub_handle_fresh")]
579 pub handle: crate::backend::Handle,
580}
581
582impl ClaimedExecution {
583 /// Construct a `ClaimedExecution`. Added alongside
584 /// `#[non_exhaustive]` per `feedback_non_exhaustive_needs_constructor`
585 /// so consumers (backend impls building a claim outcome) can still
586 /// build the struct after it was sealed for forward-compat.
587 #[allow(clippy::too_many_arguments)]
588 pub fn new(
589 execution_id: ExecutionId,
590 lease_id: LeaseId,
591 lease_epoch: LeaseEpoch,
592 attempt_index: AttemptIndex,
593 attempt_id: AttemptId,
594 attempt_type: AttemptType,
595 lease_expires_at: TimestampMs,
596 handle: crate::backend::Handle,
597 ) -> Self {
598 Self {
599 execution_id,
600 lease_id,
601 lease_epoch,
602 attempt_index,
603 attempt_id,
604 attempt_type,
605 lease_expires_at,
606 handle,
607 }
608 }
609}
610
611/// Typed outcome of [`crate::engine_backend::EngineBackend::claim_execution`].
612///
613/// Single-variant today; `#[non_exhaustive]` reserves room for future
614/// outcomes (e.g. an explicit `NoGrant` variant if RFC-024 splits it
615/// out of `InvalidClaimGrant`) without a breaking match-arm churn on
616/// consumers.
617#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
618#[non_exhaustive]
619pub enum ClaimExecutionResult {
620 /// Successfully claimed.
621 Claimed(ClaimedExecution),
622}
623
624// ─── complete_execution ───
625
626#[derive(Clone, Debug, Serialize, Deserialize)]
627pub struct CompleteExecutionArgs {
628 pub execution_id: ExecutionId,
629 /// RFC #58.5 — fence triple. `Some` for SDK worker paths (standard
630 /// stale-lease fence). `None` for operator overrides, in which case
631 /// `source` must be `CancelSource::OperatorOverride` or the Lua
632 /// returns `fence_required`.
633 #[serde(default)]
634 pub fence: Option<LeaseFence>,
635 pub attempt_index: AttemptIndex,
636 #[serde(default)]
637 pub result_payload: Option<Vec<u8>>,
638 #[serde(default)]
639 pub result_encoding: Option<String>,
640 /// RFC #58.5 — unfenced-call gate. Ignored when `fence` is `Some`.
641 #[serde(default)]
642 pub source: CancelSource,
643 pub now: TimestampMs,
644}
645
646#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
647pub enum CompleteExecutionResult {
648 /// Execution completed successfully.
649 Completed {
650 execution_id: ExecutionId,
651 public_state: PublicState,
652 },
653}
654
655// ─── renew_lease ───
656
657#[derive(Clone, Debug, Serialize, Deserialize)]
658pub struct RenewLeaseArgs {
659 pub execution_id: ExecutionId,
660 pub attempt_index: AttemptIndex,
661 /// RFC #58.5 — fence triple. Required (no operator override path for
662 /// renew). `None` returns `fence_required`.
663 pub fence: Option<LeaseFence>,
664 /// How long to extend the lease (milliseconds).
665 pub lease_ttl_ms: u64,
666 /// Grace period after lease_expires_at before the lease_current key is auto-deleted.
667 #[serde(default = "default_lease_history_grace_ms")]
668 pub lease_history_grace_ms: u64,
669}
670
671fn default_lease_history_grace_ms() -> u64 {
672 60_000
673}
674
675#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
676pub enum RenewLeaseResult {
677 /// Lease renewed.
678 Renewed { expires_at: TimestampMs },
679}
680
681// ─── mark_lease_expired_if_due ───
682
683#[derive(Clone, Debug, Serialize, Deserialize)]
684pub struct MarkLeaseExpiredArgs {
685 pub execution_id: ExecutionId,
686}
687
688#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
689pub enum MarkLeaseExpiredResult {
690 /// Lease was marked as expired.
691 MarkedExpired,
692 /// No action needed (already expired, not yet due, not active, etc.).
693 AlreadySatisfied { reason: String },
694}
695
696// ─── cancel_execution ───
697
698#[derive(Clone, Debug, Serialize, Deserialize)]
699pub struct CancelExecutionArgs {
700 pub execution_id: ExecutionId,
701 pub reason: String,
702 #[serde(default)]
703 pub source: CancelSource,
704 /// Required if not operator_override and execution is active.
705 #[serde(default)]
706 pub lease_id: Option<LeaseId>,
707 #[serde(default)]
708 pub lease_epoch: Option<LeaseEpoch>,
709 /// Required if not operator_override and execution is active.
710 #[serde(default)]
711 pub attempt_id: Option<AttemptId>,
712 pub now: TimestampMs,
713}
714
715#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
716pub enum CancelExecutionResult {
717 /// Execution cancelled.
718 Cancelled {
719 execution_id: ExecutionId,
720 public_state: PublicState,
721 },
722}
723
724// ─── revoke_lease ───
725
726#[derive(Clone, Debug, Serialize, Deserialize)]
727pub struct RevokeLeaseArgs {
728 pub execution_id: ExecutionId,
729 /// If set, only revoke if this matches the current lease. Empty string skips check.
730 #[serde(default)]
731 pub expected_lease_id: Option<String>,
732 /// Worker instance whose lease set to clean up. Read from exec_core before calling.
733 pub worker_instance_id: WorkerInstanceId,
734 pub reason: String,
735}
736
737#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
738pub enum RevokeLeaseResult {
739 /// Lease revoked.
740 Revoked {
741 lease_id: String,
742 lease_epoch: String,
743 },
744 /// Already revoked or expired — no action needed.
745 AlreadySatisfied { reason: String },
746}
747
748// ─── delay_execution ───
749
750#[derive(Clone, Debug, Serialize, Deserialize)]
751pub struct DelayExecutionArgs {
752 pub execution_id: ExecutionId,
753 /// RFC #58.5 — fence triple. `None` requires `source ==
754 /// CancelSource::OperatorOverride`.
755 #[serde(default)]
756 pub fence: Option<LeaseFence>,
757 pub attempt_index: AttemptIndex,
758 pub delay_until: TimestampMs,
759 #[serde(default)]
760 pub source: CancelSource,
761 pub now: TimestampMs,
762}
763
764#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
765pub enum DelayExecutionResult {
766 /// Execution delayed.
767 Delayed {
768 execution_id: ExecutionId,
769 public_state: PublicState,
770 },
771}
772
773// ─── move_to_waiting_children ───
774
775#[derive(Clone, Debug, Serialize, Deserialize)]
776pub struct MoveToWaitingChildrenArgs {
777 pub execution_id: ExecutionId,
778 /// RFC #58.5 — fence triple. `None` requires `source ==
779 /// CancelSource::OperatorOverride`.
780 #[serde(default)]
781 pub fence: Option<LeaseFence>,
782 pub attempt_index: AttemptIndex,
783 #[serde(default)]
784 pub source: CancelSource,
785 pub now: TimestampMs,
786}
787
788#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
789pub enum MoveToWaitingChildrenResult {
790 /// Moved to waiting children.
791 Moved {
792 execution_id: ExecutionId,
793 public_state: PublicState,
794 },
795}
796
797// ─── change_priority ───
798
799#[derive(Clone, Debug, Serialize, Deserialize)]
800pub struct ChangePriorityArgs {
801 pub execution_id: ExecutionId,
802 pub new_priority: i32,
803 pub lane_id: LaneId,
804 pub now: TimestampMs,
805}
806
807#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
808pub enum ChangePriorityResult {
809 /// Priority changed and re-scored.
810 Changed { execution_id: ExecutionId },
811}
812
813// ─── update_progress ───
814
815#[derive(Clone, Debug, Serialize, Deserialize)]
816pub struct UpdateProgressArgs {
817 pub execution_id: ExecutionId,
818 pub lease_id: LeaseId,
819 pub lease_epoch: LeaseEpoch,
820 pub attempt_id: AttemptId,
821 #[serde(default)]
822 pub progress_pct: Option<u8>,
823 #[serde(default)]
824 pub progress_message: Option<String>,
825 pub now: TimestampMs,
826}
827
828#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
829pub enum UpdateProgressResult {
830 /// Progress updated.
831 Updated,
832}
833
834// ═══════════════════════════════════════════════════════════════════════
835// Phase 2 contracts: fail, reclaim, expire
836// ═══════════════════════════════════════════════════════════════════════
837
838// ─── fail_execution ───
839
840#[derive(Clone, Debug, Serialize, Deserialize)]
841pub struct FailExecutionArgs {
842 pub execution_id: ExecutionId,
843 /// RFC #58.5 — fence triple. `None` requires `source ==
844 /// CancelSource::OperatorOverride`.
845 #[serde(default)]
846 pub fence: Option<LeaseFence>,
847 pub attempt_index: AttemptIndex,
848 pub failure_reason: String,
849 pub failure_category: String,
850 /// JSON-encoded retry policy (from execution policy). Empty = no retries.
851 #[serde(default)]
852 pub retry_policy_json: String,
853 /// JSON-encoded attempt policy for the next retry attempt.
854 #[serde(default)]
855 pub next_attempt_policy_json: String,
856 #[serde(default)]
857 pub source: CancelSource,
858}
859
860/// Outcome of a fail_execution call.
861#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
862pub enum FailExecutionResult {
863 /// Retry was scheduled — execution is delayed with backoff.
864 RetryScheduled {
865 delay_until: TimestampMs,
866 next_attempt_index: AttemptIndex,
867 },
868 /// No retries left — execution is terminal failed.
869 TerminalFailed,
870}
871
872// ─── issue_reclaim_grant ───
873
874#[derive(Clone, Debug, Serialize, Deserialize)]
875#[non_exhaustive]
876pub struct IssueReclaimGrantArgs {
877 pub execution_id: ExecutionId,
878 pub worker_id: WorkerId,
879 pub worker_instance_id: WorkerInstanceId,
880 pub lane_id: LaneId,
881 #[serde(default)]
882 pub capability_hash: Option<String>,
883 pub grant_ttl_ms: u64,
884 #[serde(default)]
885 pub route_snapshot_json: Option<String>,
886 #[serde(default)]
887 pub admission_summary: Option<String>,
888 /// Worker capabilities (parity with `IssueClaimGrantArgs`). The
889 /// Lua primitive `ff_issue_reclaim_grant` reads these at ARGV[9].
890 /// Populated by the SDK admin path from the registered worker's
891 /// `WorkerRegistration::capabilities` per RFC-024 §3.2 (B-2).
892 #[serde(default)]
893 pub worker_capabilities: BTreeSet<String>,
894 /// Caller-side timestamp for bookkeeping. NOT passed to the Lua FCALL —
895 /// ff_issue_reclaim_grant uses `redis.call("TIME")` for grant_expires_at
896 /// (same as ff_issue_claim_grant). Kept for contract symmetry with
897 /// IssueClaimGrantArgs and scheduler audit logging.
898 pub now: TimestampMs,
899}
900
901impl IssueReclaimGrantArgs {
902 /// Construct an `IssueReclaimGrantArgs`. Added alongside
903 /// `#[non_exhaustive]` per RFC-024 §3.2 +
904 /// `feedback_non_exhaustive_needs_constructor`.
905 #[allow(clippy::too_many_arguments)]
906 pub fn new(
907 execution_id: ExecutionId,
908 worker_id: WorkerId,
909 worker_instance_id: WorkerInstanceId,
910 lane_id: LaneId,
911 capability_hash: Option<String>,
912 grant_ttl_ms: u64,
913 route_snapshot_json: Option<String>,
914 admission_summary: Option<String>,
915 worker_capabilities: BTreeSet<String>,
916 now: TimestampMs,
917 ) -> Self {
918 Self {
919 execution_id,
920 worker_id,
921 worker_instance_id,
922 lane_id,
923 capability_hash,
924 grant_ttl_ms,
925 route_snapshot_json,
926 admission_summary,
927 worker_capabilities,
928 now,
929 }
930 }
931}
932
933#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
934pub enum IssueReclaimGrantResult {
935 /// Reclaim grant issued.
936 Granted { expires_at_ms: TimestampMs },
937}
938
939/// Typed outcome of [`crate::engine_backend::EngineBackend::issue_reclaim_grant`]
940/// (RFC-024 §3.2).
941///
942/// Construction surface: backends produce variants; consumers match
943/// on variants. No `::new()` — variants ARE the surface.
944#[derive(Clone, Debug, PartialEq, Eq)]
945#[non_exhaustive]
946pub enum IssueReclaimGrantOutcome {
947 /// Grant issued — hand the carried [`ReclaimGrant`] to
948 /// [`crate::engine_backend::EngineBackend::reclaim_execution`].
949 Granted(ReclaimGrant),
950 /// Execution is not in a reclaimable state (not
951 /// `lease_expired_reclaimable` / `lease_revoked`).
952 NotReclaimable {
953 execution_id: ExecutionId,
954 detail: String,
955 },
956 /// `max_reclaim_count` exceeded; execution transitioned to
957 /// terminal_failed by the backend primitive.
958 ReclaimCapExceeded {
959 execution_id: ExecutionId,
960 reclaim_count: u32,
961 },
962}
963
964// ─── reclaim_execution ───
965
966#[derive(Clone, Debug, Serialize, Deserialize)]
967#[non_exhaustive]
968pub struct ReclaimExecutionArgs {
969 pub execution_id: ExecutionId,
970 pub worker_id: WorkerId,
971 pub worker_instance_id: WorkerInstanceId,
972 pub lane_id: LaneId,
973 #[serde(default)]
974 pub capability_hash: Option<String>,
975 pub lease_id: LeaseId,
976 pub lease_ttl_ms: u64,
977 pub attempt_id: AttemptId,
978 /// JSON-encoded attempt policy for the reclaim attempt.
979 #[serde(default)]
980 pub attempt_policy_json: String,
981 /// Maximum reclaim count before terminal failure. `None` ⇒ backend
982 /// applies the Rust-surface default of 1000 per RFC-024 §4.6. The
983 /// Lua fallback remains 100 for pre-RFC ARGV-omitted call sites;
984 /// the two-default coexistence is explicit by design.
985 #[serde(default)]
986 pub max_reclaim_count: Option<u32>,
987 /// Old worker instance (for old_worker_leases key construction).
988 pub old_worker_instance_id: WorkerInstanceId,
989 /// Current attempt index (for old_attempt/old_stream_meta key construction).
990 pub current_attempt_index: AttemptIndex,
991}
992
993impl ReclaimExecutionArgs {
994 /// Construct a `ReclaimExecutionArgs`. Added alongside
995 /// `#[non_exhaustive]` per RFC-024 §3.2 +
996 /// `feedback_non_exhaustive_needs_constructor`.
997 #[allow(clippy::too_many_arguments)]
998 pub fn new(
999 execution_id: ExecutionId,
1000 worker_id: WorkerId,
1001 worker_instance_id: WorkerInstanceId,
1002 lane_id: LaneId,
1003 capability_hash: Option<String>,
1004 lease_id: LeaseId,
1005 lease_ttl_ms: u64,
1006 attempt_id: AttemptId,
1007 attempt_policy_json: String,
1008 max_reclaim_count: Option<u32>,
1009 old_worker_instance_id: WorkerInstanceId,
1010 current_attempt_index: AttemptIndex,
1011 ) -> Self {
1012 Self {
1013 execution_id,
1014 worker_id,
1015 worker_instance_id,
1016 lane_id,
1017 capability_hash,
1018 lease_id,
1019 lease_ttl_ms,
1020 attempt_id,
1021 attempt_policy_json,
1022 max_reclaim_count,
1023 old_worker_instance_id,
1024 current_attempt_index,
1025 }
1026 }
1027}
1028
1029/// Typed outcome of [`crate::engine_backend::EngineBackend::reclaim_execution`]
1030/// (RFC-024 §3.2).
1031///
1032/// Distinct from the wire-level [`ReclaimExecutionResult`]; this enum
1033/// is the trait-surface shape consumers match on.
1034#[derive(Clone, Debug, PartialEq, Eq)]
1035#[non_exhaustive]
1036pub enum ReclaimExecutionOutcome {
1037 /// Execution reclaimed — carries the new-attempt
1038 /// [`crate::backend::Handle`] (kind = `Reclaimed`).
1039 Claimed(crate::backend::Handle),
1040 /// Execution is not in a reclaimable state.
1041 NotReclaimable {
1042 execution_id: ExecutionId,
1043 detail: String,
1044 },
1045 /// `max_reclaim_count` exceeded; execution transitioned to
1046 /// terminal_failed.
1047 ReclaimCapExceeded {
1048 execution_id: ExecutionId,
1049 reclaim_count: u32,
1050 },
1051 /// The supplied grant was not found / already consumed / expired.
1052 GrantNotFound { execution_id: ExecutionId },
1053}
1054
1055#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1056pub enum ReclaimExecutionResult {
1057 /// Execution reclaimed — new attempt + new lease.
1058 Reclaimed {
1059 new_attempt_index: AttemptIndex,
1060 new_attempt_id: AttemptId,
1061 new_lease_id: LeaseId,
1062 new_lease_epoch: LeaseEpoch,
1063 lease_expires_at: TimestampMs,
1064 },
1065 /// Max reclaims exceeded — execution moved to terminal.
1066 MaxReclaimsExceeded,
1067}
1068
1069// ─── expire_execution ───
1070
1071#[derive(Clone, Debug, Serialize, Deserialize)]
1072pub struct ExpireExecutionArgs {
1073 pub execution_id: ExecutionId,
1074 /// "attempt_timeout" or "execution_deadline"
1075 pub expire_reason: String,
1076}
1077
1078#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1079pub enum ExpireExecutionResult {
1080 /// Execution expired.
1081 Expired { execution_id: ExecutionId },
1082 /// Already terminal — no-op.
1083 AlreadyTerminal,
1084}
1085
1086// ═══════════════════════════════════════════════════════════════════════
1087// Phase 3 contracts: suspend, signal, resume, waitpoint
1088// ═══════════════════════════════════════════════════════════════════════
1089
1090// ─── suspend_execution ───
1091
1092#[derive(Clone, Debug, Serialize, Deserialize)]
1093pub struct SuspendExecutionArgs {
1094 pub execution_id: ExecutionId,
1095 /// RFC #58.5 — fence triple. Required (no operator override path for
1096 /// suspend). `None` returns `fence_required`.
1097 pub fence: Option<LeaseFence>,
1098 pub attempt_index: AttemptIndex,
1099 pub suspension_id: SuspensionId,
1100 pub waitpoint_id: WaitpointId,
1101 pub waitpoint_key: String,
1102 pub reason_code: String,
1103 pub requested_by: String,
1104 pub resume_condition_json: String,
1105 pub resume_policy_json: String,
1106 #[serde(default)]
1107 pub continuation_metadata_pointer: Option<String>,
1108 #[serde(default)]
1109 pub timeout_at: Option<TimestampMs>,
1110 /// true to activate a pending waitpoint, false to create new.
1111 #[serde(default)]
1112 pub use_pending_waitpoint: bool,
1113 /// Timeout behavior: "fail", "cancel", "expire", "auto_resume", "escalate".
1114 #[serde(default = "default_timeout_behavior")]
1115 pub timeout_behavior: String,
1116}
1117
1118fn default_timeout_behavior() -> String {
1119 "fail".to_owned()
1120}
1121
1122#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1123pub enum SuspendExecutionResult {
1124 /// Execution suspended, waitpoint active.
1125 Suspended {
1126 suspension_id: SuspensionId,
1127 waitpoint_id: WaitpointId,
1128 waitpoint_key: String,
1129 /// HMAC-SHA1 token bound to (waitpoint_id, waitpoint_key, created_at).
1130 /// Required by signal-delivery callers to authenticate against this
1131 /// waitpoint (RFC-004 §Waitpoint Security).
1132 waitpoint_token: WaitpointToken,
1133 },
1134 /// Buffered signals already satisfied the condition — suspension skipped.
1135 /// Lease is still held. Token comes from the pending waitpoint record.
1136 AlreadySatisfied {
1137 suspension_id: SuspensionId,
1138 waitpoint_id: WaitpointId,
1139 waitpoint_key: String,
1140 waitpoint_token: WaitpointToken,
1141 },
1142}
1143
1144// ─── resume_execution ───
1145
1146#[derive(Clone, Debug, Serialize, Deserialize)]
1147pub struct ResumeExecutionArgs {
1148 pub execution_id: ExecutionId,
1149 /// "signal", "operator", "auto_resume"
1150 #[serde(default = "default_trigger_type")]
1151 pub trigger_type: String,
1152 /// Optional delay before becoming eligible (ms).
1153 #[serde(default)]
1154 pub resume_delay_ms: u64,
1155}
1156
1157fn default_trigger_type() -> String {
1158 "signal".to_owned()
1159}
1160
1161#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1162pub enum ResumeExecutionResult {
1163 /// Execution resumed to runnable.
1164 Resumed { public_state: PublicState },
1165}
1166
1167// ─── create_pending_waitpoint ───
1168
1169#[derive(Clone, Debug, Serialize, Deserialize)]
1170pub struct CreatePendingWaitpointArgs {
1171 pub execution_id: ExecutionId,
1172 pub lease_id: LeaseId,
1173 pub lease_epoch: LeaseEpoch,
1174 pub attempt_index: AttemptIndex,
1175 pub attempt_id: AttemptId,
1176 pub waitpoint_id: WaitpointId,
1177 pub waitpoint_key: String,
1178 /// Short expiry for the pending waitpoint (ms).
1179 pub expires_in_ms: u64,
1180}
1181
1182#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1183pub enum CreatePendingWaitpointResult {
1184 /// Pending waitpoint created.
1185 Created {
1186 waitpoint_id: WaitpointId,
1187 waitpoint_key: String,
1188 /// HMAC-SHA1 token bound to the pending waitpoint. Required for
1189 /// `buffer_signal_for_pending_waitpoint` and carried forward when
1190 /// the waitpoint is activated by `suspend_execution`.
1191 waitpoint_token: WaitpointToken,
1192 },
1193}
1194
1195// ─── close_waitpoint ───
1196
1197#[derive(Clone, Debug, Serialize, Deserialize)]
1198pub struct CloseWaitpointArgs {
1199 pub waitpoint_id: WaitpointId,
1200 pub reason: String,
1201}
1202
1203#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1204pub enum CloseWaitpointResult {
1205 /// Waitpoint closed.
1206 Closed,
1207}
1208
1209// ─── deliver_signal ───
1210
1211#[derive(Clone, Debug, Serialize, Deserialize)]
1212pub struct DeliverSignalArgs {
1213 pub execution_id: ExecutionId,
1214 pub waitpoint_id: WaitpointId,
1215 pub signal_id: SignalId,
1216 pub signal_name: String,
1217 pub signal_category: String,
1218 pub source_type: String,
1219 pub source_identity: String,
1220 #[serde(default)]
1221 pub payload: Option<Vec<u8>>,
1222 #[serde(default)]
1223 pub payload_encoding: Option<String>,
1224 #[serde(default)]
1225 pub correlation_id: Option<String>,
1226 #[serde(default)]
1227 pub idempotency_key: Option<String>,
1228 pub target_scope: String,
1229 #[serde(default)]
1230 pub created_at: Option<TimestampMs>,
1231 /// Dedup TTL for idempotency key (ms).
1232 #[serde(default)]
1233 pub dedup_ttl_ms: Option<u64>,
1234 /// Resume delay after signal satisfaction (ms).
1235 #[serde(default)]
1236 pub resume_delay_ms: Option<u64>,
1237 /// Max signals per execution (default 10000).
1238 #[serde(default)]
1239 pub max_signals_per_execution: Option<u64>,
1240 /// MAXLEN for the waitpoint signal stream.
1241 #[serde(default)]
1242 pub signal_maxlen: Option<u64>,
1243 /// HMAC-SHA1 token issued when the waitpoint was created. Required for
1244 /// signal delivery; missing/tampered/rotated-past-grace tokens are
1245 /// rejected with `invalid_token` or `token_expired` (RFC-004).
1246 ///
1247 /// Defense-in-depth: `WaitpointToken` is a transparent string newtype,
1248 /// so an empty string deserializes successfully from JSON. The
1249 /// validation boundary is in Lua (`validate_waitpoint_token` returns
1250 /// `missing_token` on empty input); this type intentionally does NOT
1251 /// pre-reject at the Rust layer so callers get a consistent typed
1252 /// error regardless of how they constructed the args.
1253 pub waitpoint_token: WaitpointToken,
1254 pub now: TimestampMs,
1255}
1256
1257#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1258pub enum DeliverSignalResult {
1259 /// Signal accepted with the given effect.
1260 Accepted { signal_id: SignalId, effect: String },
1261 /// Duplicate signal (idempotency key matched).
1262 Duplicate { existing_signal_id: SignalId },
1263}
1264
1265// ─── buffer_signal_for_pending_waitpoint ───
1266
1267#[derive(Clone, Debug, Serialize, Deserialize)]
1268pub struct BufferSignalArgs {
1269 pub execution_id: ExecutionId,
1270 pub waitpoint_id: WaitpointId,
1271 pub signal_id: SignalId,
1272 pub signal_name: String,
1273 pub signal_category: String,
1274 pub source_type: String,
1275 pub source_identity: String,
1276 #[serde(default)]
1277 pub payload: Option<Vec<u8>>,
1278 #[serde(default)]
1279 pub payload_encoding: Option<String>,
1280 #[serde(default)]
1281 pub idempotency_key: Option<String>,
1282 pub target_scope: String,
1283 /// HMAC-SHA1 token issued when `create_pending_waitpoint` ran. Required
1284 /// to authenticate early signals targeting the pending waitpoint.
1285 pub waitpoint_token: WaitpointToken,
1286 pub now: TimestampMs,
1287}
1288
1289#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1290pub enum BufferSignalResult {
1291 /// Signal buffered for pending waitpoint.
1292 Buffered { signal_id: SignalId },
1293 /// Duplicate signal.
1294 Duplicate { existing_signal_id: SignalId },
1295}
1296
1297// ─── list_pending_waitpoints ───
1298
1299/// One entry in the read-only view of an execution's active waitpoints.
1300///
1301/// Returned by `EngineBackend::list_pending_waitpoints` (and the
1302/// `GET /v1/executions/{id}/pending-waitpoints` REST endpoint).
1303///
1304/// **RFC-017 §8 schema rewrite (Stage D1).** This struct no longer
1305/// carries the raw HMAC `waitpoint_token` at the trait boundary — the
1306/// backend emits only the sanitised `(token_kid, token_fingerprint)`
1307/// pair. The HTTP handler (see `ff-server::api::list_pending_waitpoints`)
1308/// wraps the trait response and re-injects the real token on the
1309/// v0.7.x wire for one-release deprecation warning; the wire field is
1310/// removed entirely at v0.8.0.
1311#[non_exhaustive]
1312#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1313pub struct PendingWaitpointInfo {
1314 pub waitpoint_id: WaitpointId,
1315 pub waitpoint_key: String,
1316 /// Current waitpoint state: `pending`, `active`, `closed`. Callers
1317 /// typically filter to `pending` or `active`.
1318 pub state: String,
1319 /// Signal names the resume condition is waiting for. Reviewers that
1320 /// need to drive a specific waitpoint — particularly when multiple
1321 /// concurrent waitpoints exist on one execution — filter on this to
1322 /// pick the right target.
1323 ///
1324 /// An EMPTY vec means the condition matches any signal (wildcard, per
1325 /// `lua/helpers.lua` `initialize_condition`). Callers must not infer
1326 /// "no waitpoint" from empty; check `state` / length of the outer
1327 /// list for that.
1328 #[serde(default)]
1329 pub required_signal_names: Vec<String>,
1330 /// Timestamp when the waitpoint record was first written.
1331 pub created_at: TimestampMs,
1332 /// Timestamp when the waitpoint was activated (suspension landed).
1333 /// `None` while the waitpoint is still `pending`.
1334 #[serde(default, skip_serializing_if = "Option::is_none")]
1335 pub activated_at: Option<TimestampMs>,
1336 /// Scheduled expiration timestamp. `None` if no timeout configured.
1337 #[serde(default, skip_serializing_if = "Option::is_none")]
1338 pub expires_at: Option<TimestampMs>,
1339 /// Owning execution — surfaces without a separate lookup.
1340 pub execution_id: ExecutionId,
1341 /// HMAC key identifier (the `<kid>` prefix of the stored
1342 /// `waitpoint_token`). Safe to expose — identifies which signing
1343 /// key minted the token without revealing the key material.
1344 pub token_kid: String,
1345 /// 16-hex-char (8-byte) fingerprint of the HMAC digest. Audit-friendly
1346 /// handle that correlates across logs without being replayable.
1347 pub token_fingerprint: String,
1348}
1349
1350impl PendingWaitpointInfo {
1351 /// Construct a `PendingWaitpointInfo` with the 7 required fields.
1352 /// Optional fields (`activated_at`, `expires_at`) default to
1353 /// `None`; use [`Self::with_activated_at`] / [`Self::with_expires_at`]
1354 /// to populate them. `required_signal_names` defaults to empty
1355 /// (wildcard condition); use [`Self::with_required_signal_names`]
1356 /// to set it.
1357 pub fn new(
1358 waitpoint_id: WaitpointId,
1359 waitpoint_key: String,
1360 state: String,
1361 created_at: TimestampMs,
1362 execution_id: ExecutionId,
1363 token_kid: String,
1364 token_fingerprint: String,
1365 ) -> Self {
1366 Self {
1367 waitpoint_id,
1368 waitpoint_key,
1369 state,
1370 required_signal_names: Vec::new(),
1371 created_at,
1372 activated_at: None,
1373 expires_at: None,
1374 execution_id,
1375 token_kid,
1376 token_fingerprint,
1377 }
1378 }
1379
1380 pub fn with_activated_at(mut self, activated_at: TimestampMs) -> Self {
1381 self.activated_at = Some(activated_at);
1382 self
1383 }
1384
1385 pub fn with_expires_at(mut self, expires_at: TimestampMs) -> Self {
1386 self.expires_at = Some(expires_at);
1387 self
1388 }
1389
1390 pub fn with_required_signal_names(mut self, names: Vec<String>) -> Self {
1391 self.required_signal_names = names;
1392 self
1393 }
1394}
1395
1396// ─── expire_suspension ───
1397
1398#[derive(Clone, Debug, Serialize, Deserialize)]
1399pub struct ExpireSuspensionArgs {
1400 pub execution_id: ExecutionId,
1401}
1402
1403#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1404pub enum ExpireSuspensionResult {
1405 /// Suspension expired with the given behavior applied.
1406 Expired { behavior_applied: String },
1407 /// Already resolved — no action needed.
1408 AlreadySatisfied { reason: String },
1409}
1410
1411// ─── claim_resumed_execution ───
1412
1413#[derive(Clone, Debug, Serialize, Deserialize)]
1414pub struct ClaimResumedExecutionArgs {
1415 pub execution_id: ExecutionId,
1416 pub worker_id: WorkerId,
1417 pub worker_instance_id: WorkerInstanceId,
1418 pub lane_id: LaneId,
1419 pub lease_id: LeaseId,
1420 pub lease_ttl_ms: u64,
1421 /// Current attempt index (for KEYS construction — from exec_core).
1422 pub current_attempt_index: AttemptIndex,
1423 /// Remaining attempt timeout from before suspension (ms). 0 = no timeout.
1424 #[serde(default)]
1425 pub remaining_attempt_timeout_ms: Option<u64>,
1426 pub now: TimestampMs,
1427}
1428
1429#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1430#[non_exhaustive]
1431pub struct ClaimedResumedExecution {
1432 pub execution_id: ExecutionId,
1433 pub lease_id: LeaseId,
1434 pub lease_epoch: LeaseEpoch,
1435 pub attempt_index: AttemptIndex,
1436 pub attempt_id: AttemptId,
1437 pub lease_expires_at: TimestampMs,
1438 /// Backend-populated attempt handle for this resumed claim
1439 /// (v0.12 PR-5.5). Valkey fills in `HandleKind::Resumed`; PG/SQLite
1440 /// populate a backend-tagged real handle via
1441 /// `ff_core::handle_codec::encode`.
1442 #[serde(default = "crate::backend::stub_handle_resumed")]
1443 pub handle: crate::backend::Handle,
1444}
1445
1446impl ClaimedResumedExecution {
1447 /// Construct a `ClaimedResumedExecution`. Added alongside
1448 /// `#[non_exhaustive]` per `feedback_non_exhaustive_needs_constructor`
1449 /// so consumers (backend impls building a resumed-claim outcome)
1450 /// can still build the struct after it was sealed for forward-compat.
1451 #[allow(clippy::too_many_arguments)]
1452 pub fn new(
1453 execution_id: ExecutionId,
1454 lease_id: LeaseId,
1455 lease_epoch: LeaseEpoch,
1456 attempt_index: AttemptIndex,
1457 attempt_id: AttemptId,
1458 lease_expires_at: TimestampMs,
1459 handle: crate::backend::Handle,
1460 ) -> Self {
1461 Self {
1462 execution_id,
1463 lease_id,
1464 lease_epoch,
1465 attempt_index,
1466 attempt_id,
1467 lease_expires_at,
1468 handle,
1469 }
1470 }
1471}
1472
1473#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1474pub enum ClaimResumedExecutionResult {
1475 /// Successfully claimed resumed execution (same attempt continues).
1476 Claimed(ClaimedResumedExecution),
1477}
1478
1479// ═══════════════════════════════════════════════════════════════════════
1480// Phase 4 contracts: stream
1481// ═══════════════════════════════════════════════════════════════════════
1482
1483// ─── append_frame ───
1484
1485#[derive(Clone, Debug, Serialize, Deserialize)]
1486pub struct AppendFrameArgs {
1487 pub execution_id: ExecutionId,
1488 pub attempt_index: AttemptIndex,
1489 pub lease_id: LeaseId,
1490 pub lease_epoch: LeaseEpoch,
1491 pub attempt_id: AttemptId,
1492 pub frame_type: String,
1493 pub timestamp: TimestampMs,
1494 pub payload: Vec<u8>,
1495 #[serde(default)]
1496 pub encoding: Option<String>,
1497 /// Optional structured metadata for the frame (JSON blob).
1498 #[serde(default)]
1499 pub metadata_json: Option<String>,
1500 #[serde(default)]
1501 pub correlation_id: Option<String>,
1502 #[serde(default)]
1503 pub source: Option<String>,
1504 /// MAXLEN for the stream. 0 = no trim.
1505 #[serde(default)]
1506 pub retention_maxlen: Option<u32>,
1507 /// Max payload bytes per frame. Default: 65536.
1508 #[serde(default)]
1509 pub max_payload_bytes: Option<u32>,
1510}
1511
1512#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1513pub enum AppendFrameResult {
1514 /// Frame appended successfully.
1515 Appended {
1516 /// Valkey Stream entry ID (e.g. "1713100800150-0").
1517 entry_id: String,
1518 /// Total frame count after this append.
1519 frame_count: u64,
1520 },
1521}
1522
1523// ─── StreamCursor (issue #92) ───
1524
1525/// Opaque cursor for attempt-stream reads/tails.
1526///
1527/// Replaces the bare `&str` / `String` stream-id parameters previously
1528/// carried on `read_stream` / `tail_stream` / `ReadStreamParams` /
1529/// `TailStreamParams`. The wire form is a flat string — serde is
1530/// transparent via `try_from`/`into` — so `?from=start&to=end` and
1531/// `?after=123-0` continue to work for REST clients.
1532///
1533/// # Public wire grammar
1534///
1535/// The ONLY accepted tokens are:
1536///
1537/// * `"start"` — first entry in the stream (XRANGE `-` equivalent).
1538/// Valid in `read_stream` / `ReadStreamParams`.
1539/// * `"end"` — latest entry in the stream (XRANGE `+` equivalent).
1540/// Valid in `read_stream` / `ReadStreamParams`.
1541/// * `"<ms>"` or `"<ms>-<seq>"` — a concrete Valkey Stream entry id.
1542/// Valid everywhere.
1543///
1544/// The bare XRANGE/XREAD markers `"-"` and `"+"` are **NOT** accepted
1545/// on the wire. The opaque `StreamCursor` grammar is the public
1546/// contract; the Valkey `-`/`+` markers are an internal implementation
1547/// detail carried only inside the Lua-adjacent [`ReadFramesArgs`] /
1548/// `xread_block` path via [`StreamCursor::to_wire`].
1549///
1550/// For XREAD (tail), the documented "from the beginning" convention is
1551/// `StreamCursor::At("0-0".into())` — use the convenience constructor
1552/// [`StreamCursor::from_beginning`] which returns exactly that value.
1553/// `Start` / `End` are rejected by the SDK's `tail_stream` boundary
1554/// because XREAD does not accept `-` / `+` as cursors. The
1555/// [`StreamCursor::is_concrete`] helper centralises this
1556/// Start/End-vs-At decision for boundary-validation call sites.
1557///
1558/// # Why an enum instead of a string
1559///
1560/// A string parameter lets malformed ids escape to the Lua/Valkey
1561/// layer, surfacing as a script error and HTTP 500. An enum with
1562/// fallible `FromStr` / `TryFrom<String>` catches every malformed input
1563/// at the wire boundary with a structured error, and prevents bare `-`
1564/// / `+` from leaking into consumer code as tacit extensions of the
1565/// public API.
1566#[derive(Clone, Debug, PartialEq, Eq, Hash)]
1567pub enum StreamCursor {
1568 /// First entry in the stream (XRANGE start marker).
1569 Start,
1570 /// Latest entry in the stream (XRANGE end marker).
1571 End,
1572 /// A concrete Valkey Stream entry id (`<ms>` or `<ms>-<seq>`).
1573 ///
1574 /// For XREAD-style tails, the documented "from the beginning"
1575 /// convention is `At("0-0".to_owned())` — see
1576 /// [`StreamCursor::from_beginning`].
1577 At(String),
1578}
1579
1580impl StreamCursor {
1581 /// Convenience constructor for the XREAD-from-beginning convention
1582 /// (`"0-0"`). XREAD's `last_id` is exclusive, so passing this as
1583 /// the `after` cursor returns every entry in the stream.
1584 pub fn from_beginning() -> Self {
1585 Self::At("0-0".to_owned())
1586 }
1587
1588 /// Serde default helper — emits `StreamCursor::Start`. Used as
1589 /// `#[serde(default = "StreamCursor::start")]` on REST query
1590 /// structs.
1591 pub fn start() -> Self {
1592 Self::Start
1593 }
1594
1595 /// Serde default helper — emits `StreamCursor::End`.
1596 pub fn end() -> Self {
1597 Self::End
1598 }
1599
1600 /// Serde default helper — emits
1601 /// `StreamCursor::from_beginning()`. Used as the default for
1602 /// `TailStreamParams::after`.
1603 pub fn beginning() -> Self {
1604 Self::from_beginning()
1605 }
1606
1607 /// Internal-only: lower the cursor to the XRANGE/XREAD marker
1608 /// string Valkey expects. `Start → "-"`, `End → "+"`,
1609 /// `At(s) → s`.
1610 ///
1611 /// Used at the ff-script adapter edge (right before constructing
1612 /// `ReadFramesArgs` or calling `xread_block`) to translate the
1613 /// opaque wire grammar into the Lua-ABI form. NOT part of the
1614 /// public wire — do not emit these raw characters to consumers.
1615 /// Hidden from the generated docs to discourage external use;
1616 /// external consumers should never need to see the raw `-` / `+`.
1617 #[doc(hidden)]
1618 pub fn to_wire(&self) -> &str {
1619 match self {
1620 Self::Start => "-",
1621 Self::End => "+",
1622 Self::At(s) => s.as_str(),
1623 }
1624 }
1625
1626 /// Internal-only owned variant of [`Self::to_wire`] — moves the
1627 /// inner `String` out of `At(s)` without cloning. Use at adapter
1628 /// edges that construct an owned wire string (e.g.
1629 /// `ReadFramesArgs.from_id`) from a `StreamCursor` that is about
1630 /// to be dropped.
1631 #[doc(hidden)]
1632 pub fn into_wire_string(self) -> String {
1633 match self {
1634 Self::Start => "-".to_owned(),
1635 Self::End => "+".to_owned(),
1636 Self::At(s) => s,
1637 }
1638 }
1639
1640 /// True iff this cursor is a concrete entry id
1641 /// (`"<ms>"` / `"<ms>-<seq>"`). False for the open markers
1642 /// `Start` / `End`.
1643 ///
1644 /// Used by boundaries like XREAD (tailing) that do not accept
1645 /// open markers — rejecting a cursor is equivalent to
1646 /// `!cursor.is_concrete()`. Centralised here to keep the SDK and
1647 /// REST guards in lock-step.
1648 pub fn is_concrete(&self) -> bool {
1649 matches!(self, Self::At(_))
1650 }
1651}
1652
1653impl std::fmt::Display for StreamCursor {
1654 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1655 match self {
1656 Self::Start => f.write_str("start"),
1657 Self::End => f.write_str("end"),
1658 Self::At(s) => f.write_str(s),
1659 }
1660 }
1661}
1662
1663/// Error produced when parsing a [`StreamCursor`] from a string.
1664#[derive(Clone, Debug, PartialEq, Eq)]
1665pub enum StreamCursorParseError {
1666 /// Empty input.
1667 Empty,
1668 /// Input matched a rejected bare-marker alias (`"-"`, `"+"`).
1669 /// The public wire requires `"start"` / `"end"`; the raw Valkey
1670 /// markers are internal-only.
1671 BareMarkerRejected(String),
1672 /// Input was neither a recognized keyword nor a well-formed
1673 /// Stream entry id. Entry ids must match `^\d+(?:-\d+)?$`.
1674 Malformed(String),
1675}
1676
1677impl std::fmt::Display for StreamCursorParseError {
1678 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1679 match self {
1680 Self::Empty => f.write_str("stream cursor must not be empty"),
1681 Self::BareMarkerRejected(s) => write!(
1682 f,
1683 "bare marker '{s}' is not a valid stream cursor; use 'start' or 'end'"
1684 ),
1685 Self::Malformed(s) => write!(
1686 f,
1687 "invalid stream cursor '{s}' (expected 'start', 'end', '<ms>', or '<ms>-<seq>')"
1688 ),
1689 }
1690 }
1691}
1692
1693impl std::error::Error for StreamCursorParseError {}
1694
1695/// Shared grammar check — classifies `s` as `Start` / `End` / a
1696/// concrete-id shape / malformed / empty, WITHOUT allocating. The
1697/// owned vs borrowed entry points ([`StreamCursor::from_str`],
1698/// [`StreamCursor::try_from`]) consume this classification and move
1699/// the owned `String` into `At` when applicable, avoiding a
1700/// round-trip `String → &str → String::to_owned` for the common
1701/// REST-query path.
1702enum StreamCursorClass {
1703 Start,
1704 End,
1705 Concrete,
1706 BareMarker,
1707 Empty,
1708 Malformed,
1709}
1710
1711fn classify_stream_cursor(s: &str) -> StreamCursorClass {
1712 if s.is_empty() {
1713 return StreamCursorClass::Empty;
1714 }
1715 if s == "-" || s == "+" {
1716 return StreamCursorClass::BareMarker;
1717 }
1718 if s == "start" {
1719 return StreamCursorClass::Start;
1720 }
1721 if s == "end" {
1722 return StreamCursorClass::End;
1723 }
1724 if !s.is_ascii() {
1725 return StreamCursorClass::Malformed;
1726 }
1727 let (ms_part, seq_part) = match s.split_once('-') {
1728 Some((ms, seq)) => (ms, Some(seq)),
1729 None => (s, None),
1730 };
1731 let ms_ok = !ms_part.is_empty() && ms_part.bytes().all(|b| b.is_ascii_digit());
1732 let seq_ok = seq_part
1733 .map(|p| !p.is_empty() && p.bytes().all(|b| b.is_ascii_digit()))
1734 .unwrap_or(true);
1735 if ms_ok && seq_ok {
1736 StreamCursorClass::Concrete
1737 } else {
1738 StreamCursorClass::Malformed
1739 }
1740}
1741
1742impl std::str::FromStr for StreamCursor {
1743 type Err = StreamCursorParseError;
1744
1745 fn from_str(s: &str) -> Result<Self, Self::Err> {
1746 match classify_stream_cursor(s) {
1747 StreamCursorClass::Start => Ok(Self::Start),
1748 StreamCursorClass::End => Ok(Self::End),
1749 StreamCursorClass::Concrete => Ok(Self::At(s.to_owned())),
1750 StreamCursorClass::BareMarker => {
1751 Err(StreamCursorParseError::BareMarkerRejected(s.to_owned()))
1752 }
1753 StreamCursorClass::Empty => Err(StreamCursorParseError::Empty),
1754 StreamCursorClass::Malformed => Err(StreamCursorParseError::Malformed(s.to_owned())),
1755 }
1756 }
1757}
1758
1759impl TryFrom<String> for StreamCursor {
1760 type Error = StreamCursorParseError;
1761
1762 fn try_from(s: String) -> Result<Self, Self::Error> {
1763 // Owned parsing path — the `At` variant moves `s` in directly,
1764 // avoiding the `&str → String::to_owned` re-allocation that a
1765 // blind forward to `FromStr::from_str(&s)` would force. Error
1766 // paths still pay one allocation to describe the offending
1767 // input.
1768 match classify_stream_cursor(&s) {
1769 StreamCursorClass::Start => Ok(Self::Start),
1770 StreamCursorClass::End => Ok(Self::End),
1771 StreamCursorClass::Concrete => Ok(Self::At(s)),
1772 StreamCursorClass::BareMarker => Err(StreamCursorParseError::BareMarkerRejected(s)),
1773 StreamCursorClass::Empty => Err(StreamCursorParseError::Empty),
1774 StreamCursorClass::Malformed => Err(StreamCursorParseError::Malformed(s)),
1775 }
1776 }
1777}
1778
1779impl From<StreamCursor> for String {
1780 fn from(c: StreamCursor) -> Self {
1781 c.to_string()
1782 }
1783}
1784
1785impl Serialize for StreamCursor {
1786 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
1787 serializer.collect_str(self)
1788 }
1789}
1790
1791impl<'de> Deserialize<'de> for StreamCursor {
1792 fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
1793 let s = String::deserialize(deserializer)?;
1794 Self::try_from(s).map_err(serde::de::Error::custom)
1795 }
1796}
1797
1798// ─── read_attempt_stream / tail_attempt_stream ───
1799
1800/// Hard cap on the number of frames returned by a single read/tail call.
1801///
1802/// Single source of truth across the Rust layer (ff-script, ff-server,
1803/// ff-sdk). The Lua side in `lua/stream.lua` keeps a matching literal with
1804/// an inline reference back here; bump both together if you ever need to
1805/// lift the cap.
1806pub const STREAM_READ_HARD_CAP: u64 = 10_000;
1807
1808/// A single frame read from an attempt-scoped stream.
1809///
1810/// Field set mirrors what `ff_append_frame` writes: `frame_type`, `ts`,
1811/// `payload`, `encoding`, `source`, and optionally `correlation_id`. Stored
1812/// as an ordered map so field order is deterministic across read calls.
1813#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1814pub struct StreamFrame {
1815 /// Valkey Stream entry ID, e.g. "1713100800150-0".
1816 pub id: String,
1817 /// Frame fields in sorted order.
1818 pub fields: std::collections::BTreeMap<String, String>,
1819}
1820
1821/// Inputs to `ff_read_attempt_stream` (XRANGE wrapper).
1822#[derive(Clone, Debug, Serialize, Deserialize)]
1823pub struct ReadFramesArgs {
1824 pub execution_id: ExecutionId,
1825 pub attempt_index: AttemptIndex,
1826 /// XRANGE start ID. Use "-" for earliest.
1827 pub from_id: String,
1828 /// XRANGE end ID. Use "+" for latest.
1829 pub to_id: String,
1830 /// XRANGE COUNT limit. MUST be `>= 1`. The REST and SDK layers reject
1831 /// `0` at the boundary; the Lua side rejects it too. `STREAM_READ_HARD_CAP`
1832 /// is the upper bound.
1833 pub count_limit: u64,
1834}
1835
1836/// Result of reading frames from an attempt stream — frames plus terminal
1837/// signal so consumers can stop polling without a timeout fallback.
1838#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1839pub struct StreamFrames {
1840 /// Entries in the requested range (possibly empty).
1841 pub frames: Vec<StreamFrame>,
1842 /// Timestamp when the upstream writer closed the stream. `None` if the
1843 /// stream is still open (or has never been written).
1844 #[serde(default, skip_serializing_if = "Option::is_none")]
1845 pub closed_at: Option<TimestampMs>,
1846 /// Reason from the closing writer. Current values:
1847 /// `attempt_success`, `attempt_failure`, `attempt_cancelled`,
1848 /// `attempt_interrupted`. `None` iff the stream is still open.
1849 #[serde(default, skip_serializing_if = "Option::is_none")]
1850 pub closed_reason: Option<String>,
1851}
1852
1853impl StreamFrames {
1854 /// Construct an empty open-stream result (no frames, no terminal
1855 /// markers). Useful for fast-path peek helpers.
1856 pub fn empty_open() -> Self {
1857 Self {
1858 frames: Vec::new(),
1859 closed_at: None,
1860 closed_reason: None,
1861 }
1862 }
1863
1864 /// True iff the producer has closed this stream. Consumers should stop
1865 /// polling and drain once this returns true.
1866 pub fn is_closed(&self) -> bool {
1867 self.closed_at.is_some()
1868 }
1869}
1870
1871#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1872pub enum ReadFramesResult {
1873 /// Frames returned (possibly empty) plus optional closed markers.
1874 Frames(StreamFrames),
1875}
1876
1877// ═══════════════════════════════════════════════════════════════════════
1878// Phase 5 contracts: budget, quota, block/unblock
1879// ═══════════════════════════════════════════════════════════════════════
1880
1881// ─── create_budget ───
1882
1883#[derive(Clone, Debug, Serialize, Deserialize)]
1884pub struct CreateBudgetArgs {
1885 pub budget_id: crate::types::BudgetId,
1886 pub scope_type: String,
1887 pub scope_id: String,
1888 pub enforcement_mode: String,
1889 pub on_hard_limit: String,
1890 pub on_soft_limit: String,
1891 pub reset_interval_ms: u64,
1892 /// Dimension names.
1893 pub dimensions: Vec<String>,
1894 /// Hard limits per dimension (parallel with dimensions).
1895 pub hard_limits: Vec<u64>,
1896 /// Soft limits per dimension (parallel with dimensions).
1897 pub soft_limits: Vec<u64>,
1898 pub now: TimestampMs,
1899}
1900
1901#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1902pub enum CreateBudgetResult {
1903 /// Budget created.
1904 Created { budget_id: crate::types::BudgetId },
1905 /// Already exists (idempotent).
1906 AlreadySatisfied { budget_id: crate::types::BudgetId },
1907}
1908
1909// ─── create_quota_policy ───
1910
1911#[derive(Clone, Debug, Serialize, Deserialize)]
1912pub struct CreateQuotaPolicyArgs {
1913 pub quota_policy_id: crate::types::QuotaPolicyId,
1914 pub window_seconds: u64,
1915 pub max_requests_per_window: u64,
1916 pub max_concurrent: u64,
1917 pub now: TimestampMs,
1918}
1919
1920#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1921pub enum CreateQuotaPolicyResult {
1922 /// Quota policy created.
1923 Created {
1924 quota_policy_id: crate::types::QuotaPolicyId,
1925 },
1926 /// Already exists (idempotent).
1927 AlreadySatisfied {
1928 quota_policy_id: crate::types::QuotaPolicyId,
1929 },
1930}
1931
1932// ─── budget_status (read-only) ───
1933
1934/// Operator-facing budget status snapshot (not an FCALL — direct HGETALL reads).
1935#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1936pub struct BudgetStatus {
1937 pub budget_id: String,
1938 pub scope_type: String,
1939 pub scope_id: String,
1940 pub enforcement_mode: String,
1941 /// Current usage per dimension: {dimension_name: current_value}.
1942 pub usage: HashMap<String, u64>,
1943 /// Hard limits per dimension: {dimension_name: limit}.
1944 pub hard_limits: HashMap<String, u64>,
1945 /// Soft limits per dimension: {dimension_name: limit}.
1946 pub soft_limits: HashMap<String, u64>,
1947 pub breach_count: u64,
1948 pub soft_breach_count: u64,
1949 pub last_breach_at: Option<String>,
1950 pub last_breach_dim: Option<String>,
1951 pub next_reset_at: Option<String>,
1952 pub created_at: Option<String>,
1953}
1954
1955// ─── report_usage_and_check ───
1956
1957#[derive(Clone, Debug, Serialize, Deserialize)]
1958pub struct ReportUsageArgs {
1959 /// Dimension names to increment.
1960 pub dimensions: Vec<String>,
1961 /// Increment values (parallel with dimensions).
1962 pub deltas: Vec<u64>,
1963 pub now: TimestampMs,
1964 /// Optional idempotency key to prevent double-counting on retries.
1965 /// Pass the raw dedup id (e.g. `"retry-42"`); the typed FCALL wrapper
1966 /// wraps it into `ff:usagededup:{b:M}:<id>` using the budget
1967 /// partition's hash tag so it co-locates with the other budget keys
1968 /// (#108).
1969 #[serde(default)]
1970 pub dedup_key: Option<String>,
1971}
1972
1973#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1974#[non_exhaustive]
1975pub enum ReportUsageResult {
1976 /// All increments applied, no breach.
1977 Ok,
1978 /// Soft limit breached on a dimension (advisory, increments applied).
1979 SoftBreach {
1980 dimension: String,
1981 current_usage: u64,
1982 soft_limit: u64,
1983 },
1984 /// Hard limit breached (increments NOT applied).
1985 HardBreach {
1986 dimension: String,
1987 current_usage: u64,
1988 hard_limit: u64,
1989 },
1990 /// Dedup key matched — usage already applied in a prior call.
1991 AlreadyApplied,
1992}
1993
1994// ─── reset_budget ───
1995
1996#[derive(Clone, Debug, Serialize, Deserialize)]
1997pub struct ResetBudgetArgs {
1998 pub budget_id: crate::types::BudgetId,
1999 pub now: TimestampMs,
2000}
2001
2002#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2003pub enum ResetBudgetResult {
2004 /// Budget reset successfully.
2005 Reset { next_reset_at: TimestampMs },
2006}
2007
2008// ─── check_admission_and_record ───
2009
2010#[derive(Clone, Debug, Serialize, Deserialize)]
2011pub struct CheckAdmissionArgs {
2012 pub execution_id: ExecutionId,
2013 pub now: TimestampMs,
2014 pub window_seconds: u64,
2015 pub rate_limit: u64,
2016 pub concurrency_cap: u64,
2017 #[serde(default)]
2018 pub jitter_ms: Option<u64>,
2019}
2020
2021#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2022pub enum CheckAdmissionResult {
2023 /// Admitted — execution may proceed.
2024 Admitted,
2025 /// Already admitted in this window (idempotent).
2026 AlreadyAdmitted,
2027 /// Rate limit exceeded.
2028 RateExceeded { retry_after_ms: u64 },
2029 /// Concurrency cap hit.
2030 ConcurrencyExceeded,
2031}
2032
2033// ─── release_admission ───
2034
2035#[derive(Clone, Debug, Serialize, Deserialize)]
2036pub struct ReleaseAdmissionArgs {
2037 pub execution_id: ExecutionId,
2038}
2039
2040#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2041pub enum ReleaseAdmissionResult {
2042 Released,
2043}
2044
2045// ─── block_execution_for_admission ───
2046
2047#[derive(Clone, Debug, Serialize, Deserialize)]
2048pub struct BlockExecutionArgs {
2049 pub execution_id: ExecutionId,
2050 pub blocking_reason: String,
2051 #[serde(default)]
2052 pub blocking_detail: Option<String>,
2053 pub now: TimestampMs,
2054}
2055
2056#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2057pub enum BlockExecutionResult {
2058 /// Execution blocked.
2059 Blocked,
2060}
2061
2062// ─── unblock_execution ───
2063
2064#[derive(Clone, Debug, Serialize, Deserialize)]
2065pub struct UnblockExecutionArgs {
2066 pub execution_id: ExecutionId,
2067 pub now: TimestampMs,
2068 /// Expected blocking reason (prevents stale unblock).
2069 #[serde(default)]
2070 pub expected_blocking_reason: Option<String>,
2071}
2072
2073#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2074pub enum UnblockExecutionResult {
2075 /// Execution unblocked and moved to eligible.
2076 Unblocked,
2077}
2078
2079// ═══════════════════════════════════════════════════════════════════════
2080// Phase 6 contracts: flow coordination and dependencies
2081// ═══════════════════════════════════════════════════════════════════════
2082
2083// ─── create_flow ───
2084
2085#[derive(Clone, Debug, Serialize, Deserialize)]
2086pub struct CreateFlowArgs {
2087 pub flow_id: crate::types::FlowId,
2088 pub flow_kind: String,
2089 pub namespace: Namespace,
2090 pub now: TimestampMs,
2091}
2092
2093#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2094pub enum CreateFlowResult {
2095 /// Flow created successfully.
2096 Created { flow_id: crate::types::FlowId },
2097 /// Flow already exists (idempotent).
2098 AlreadySatisfied { flow_id: crate::types::FlowId },
2099}
2100
2101// ─── add_execution_to_flow ───
2102
2103#[derive(Clone, Debug, Serialize, Deserialize)]
2104pub struct AddExecutionToFlowArgs {
2105 pub flow_id: crate::types::FlowId,
2106 pub execution_id: ExecutionId,
2107 pub now: TimestampMs,
2108}
2109
2110#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2111pub enum AddExecutionToFlowResult {
2112 /// Execution added to flow.
2113 Added {
2114 execution_id: ExecutionId,
2115 new_node_count: u32,
2116 },
2117 /// Already a member (idempotent).
2118 AlreadyMember {
2119 execution_id: ExecutionId,
2120 node_count: u32,
2121 },
2122}
2123
2124// ─── cancel_flow ───
2125
2126#[derive(Clone, Debug, Serialize, Deserialize)]
2127pub struct CancelFlowArgs {
2128 pub flow_id: crate::types::FlowId,
2129 pub reason: String,
2130 pub cancellation_policy: String,
2131 pub now: TimestampMs,
2132}
2133
2134#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2135pub enum CancelFlowResult {
2136 /// Flow cancelled and all member cancellations (if any) have completed
2137 /// synchronously. Used when `cancellation_policy != "cancel_all"`, when
2138 /// the flow has no members, when the caller opted into synchronous
2139 /// dispatch (e.g. `?wait=true`), or when the flow was already in a
2140 /// terminal state (idempotent retry).
2141 ///
2142 /// On the idempotent-retry path `member_execution_ids` may be *capped*
2143 /// at the server (default 1000 entries) to bound response bandwidth on
2144 /// flows with very large membership. The first (non-idempotent) call
2145 /// always returns the full list, so clients that need every member id
2146 /// should persist the initial response.
2147 Cancelled {
2148 cancellation_policy: String,
2149 member_execution_ids: Vec<String>,
2150 },
2151 /// Flow state was flipped to cancelled atomically, but member
2152 /// cancellations are dispatched asynchronously in the background.
2153 /// Clients may poll `GET /v1/executions/{id}/state` for each member
2154 /// execution id to track terminal state.
2155 CancellationScheduled {
2156 cancellation_policy: String,
2157 member_count: u32,
2158 member_execution_ids: Vec<String>,
2159 },
2160 /// `?wait=true` dispatch completed but one or more member cancellations
2161 /// failed mid-loop (e.g. ghost member, Lua error, transport fault after
2162 /// retries exhausted). The flow itself is still flipped to cancelled
2163 /// (atomic Lua already ran); callers SHOULD inspect
2164 /// `failed_member_execution_ids` and either retry those ids directly
2165 /// via `cancel_execution` or wait for the cancel-backlog reconciler
2166 /// to retry them in the background.
2167 ///
2168 /// Only emitted by the synchronous wait path
2169 /// ([`crate::CancelFlowArgs`] via `?wait=true`). The async path returns
2170 /// [`CancelFlowResult::CancellationScheduled`] and delegates retries
2171 /// to the reconciler — there is no visible "partial" state on the
2172 /// async path because the dispatch result is not observed inline.
2173 PartiallyCancelled {
2174 cancellation_policy: String,
2175 /// All member execution ids that the cancel_flow FCALL returned
2176 /// (i.e. the full membership at the moment of cancellation).
2177 member_execution_ids: Vec<String>,
2178 /// Strict subset of `member_execution_ids` whose per-member cancel
2179 /// FCALL returned an error. Order is deterministic (matches the
2180 /// iteration order over `member_execution_ids`).
2181 failed_member_execution_ids: Vec<String>,
2182 },
2183}
2184
2185/// RFC-017 Stage E2: result of the "header" portion of a cancel_flow
2186/// operation — the atomic flow-state flip + member enumeration.
2187///
2188/// The Server composes this with its own wait/async member-dispatch
2189/// machinery to build the wire-level [`CancelFlowResult`]. Backends
2190/// implement [`crate::engine_backend::EngineBackend::cancel_flow_header`]
2191/// (default: `Unavailable`) so the Valkey-native `ff_cancel_flow`
2192/// FCALL (with its `flow_already_terminal` idempotency branch) can be
2193/// driven through the trait without re-shaping the existing public
2194/// `cancel_flow(id, policy, wait)` signature.
2195#[derive(Clone, Debug, PartialEq, Eq)]
2196#[non_exhaustive]
2197pub enum CancelFlowHeader {
2198 /// Flow-state flipped this call. `member_execution_ids` is the
2199 /// full (uncapped) membership at flip time.
2200 Cancelled {
2201 cancellation_policy: String,
2202 member_execution_ids: Vec<String>,
2203 },
2204 /// Flow was already in a terminal state on entry. The backend has
2205 /// surfaced the *stored* `cancellation_policy`, `cancel_reason`,
2206 /// and full membership so the Server can return an idempotent
2207 /// [`CancelFlowResult::Cancelled`] without re-doing the flip.
2208 AlreadyTerminal {
2209 /// `None` only for flows cancelled by pre-E2 Lua that never
2210 /// persisted the policy field.
2211 stored_cancellation_policy: Option<String>,
2212 /// `None` when the flow was never cancel-reason-stamped.
2213 stored_cancel_reason: Option<String>,
2214 /// Full membership. Server caps to
2215 /// `ALREADY_TERMINAL_MEMBER_CAP` before wiring.
2216 member_execution_ids: Vec<String>,
2217 },
2218}
2219
2220// ─── stage_dependency_edge ───
2221
2222#[derive(Clone, Debug, Serialize, Deserialize)]
2223pub struct StageDependencyEdgeArgs {
2224 pub flow_id: crate::types::FlowId,
2225 pub edge_id: crate::types::EdgeId,
2226 pub upstream_execution_id: ExecutionId,
2227 pub downstream_execution_id: ExecutionId,
2228 #[serde(default = "default_dependency_kind")]
2229 pub dependency_kind: String,
2230 #[serde(default)]
2231 pub data_passing_ref: Option<String>,
2232 pub expected_graph_revision: u64,
2233 pub now: TimestampMs,
2234}
2235
2236fn default_dependency_kind() -> String {
2237 "success_only".to_owned()
2238}
2239
2240#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2241pub enum StageDependencyEdgeResult {
2242 /// Edge staged, new graph revision.
2243 Staged {
2244 edge_id: crate::types::EdgeId,
2245 new_graph_revision: u64,
2246 },
2247}
2248
2249// ─── apply_dependency_to_child ───
2250
2251#[derive(Clone, Debug, Serialize, Deserialize)]
2252pub struct ApplyDependencyToChildArgs {
2253 pub flow_id: crate::types::FlowId,
2254 pub edge_id: crate::types::EdgeId,
2255 /// The child execution that receives the dependency.
2256 pub downstream_execution_id: ExecutionId,
2257 pub upstream_execution_id: ExecutionId,
2258 pub graph_revision: u64,
2259 #[serde(default = "default_dependency_kind")]
2260 pub dependency_kind: String,
2261 #[serde(default)]
2262 pub data_passing_ref: Option<String>,
2263 pub now: TimestampMs,
2264}
2265
2266#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2267pub enum ApplyDependencyToChildResult {
2268 /// Dependency applied, N unsatisfied deps remaining.
2269 Applied { unsatisfied_count: u32 },
2270 /// Already applied (idempotent).
2271 AlreadyApplied,
2272}
2273
2274// ─── resolve_dependency ───
2275
2276#[derive(Clone, Debug, Serialize, Deserialize)]
2277pub struct ResolveDependencyArgs {
2278 pub edge_id: crate::types::EdgeId,
2279 /// "success", "failed", "cancelled", "expired", "skipped"
2280 pub upstream_outcome: String,
2281 pub now: TimestampMs,
2282}
2283
2284#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2285pub enum ResolveDependencyResult {
2286 /// Edge satisfied — downstream may become eligible.
2287 Satisfied,
2288 /// Edge made impossible — downstream becomes skipped.
2289 Impossible,
2290 /// Already resolved (idempotent).
2291 AlreadyResolved,
2292}
2293
2294// ─── promote_blocked_to_eligible ───
2295
2296#[derive(Clone, Debug, Serialize, Deserialize)]
2297pub struct PromoteBlockedToEligibleArgs {
2298 pub execution_id: ExecutionId,
2299 pub now: TimestampMs,
2300}
2301
2302#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2303pub enum PromoteBlockedToEligibleResult {
2304 Promoted,
2305}
2306
2307// ─── evaluate_flow_eligibility ───
2308
2309#[derive(Clone, Debug, Serialize, Deserialize)]
2310pub struct EvaluateFlowEligibilityArgs {
2311 pub execution_id: ExecutionId,
2312}
2313
2314#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2315pub enum EvaluateFlowEligibilityResult {
2316 /// Execution eligibility status.
2317 Status { status: String },
2318}
2319
2320// ─── replay_execution ───
2321
2322#[derive(Clone, Debug, Serialize, Deserialize)]
2323pub struct ReplayExecutionArgs {
2324 pub execution_id: ExecutionId,
2325 pub now: TimestampMs,
2326}
2327
2328#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2329pub enum ReplayExecutionResult {
2330 /// Replayed to runnable.
2331 Replayed { public_state: PublicState },
2332}
2333
2334// ─── get_execution (full read) ───
2335
2336/// Full execution info returned by `Server::get_execution`.
2337#[derive(Clone, Debug, Serialize, Deserialize)]
2338pub struct ExecutionInfo {
2339 pub execution_id: ExecutionId,
2340 pub namespace: String,
2341 pub lane_id: String,
2342 pub priority: i32,
2343 pub execution_kind: String,
2344 pub state_vector: StateVector,
2345 pub public_state: PublicState,
2346 pub created_at: String,
2347 /// TimestampMs (ms since epoch) when the execution's first attempt
2348 /// was started by a worker claim. Empty string until the first
2349 /// claim lands. Serialised as `Option<String>` so pre-claim reads
2350 /// deserialise cleanly even if the field is absent from the wire.
2351 #[serde(default, skip_serializing_if = "Option::is_none")]
2352 pub started_at: Option<String>,
2353 /// TimestampMs when the execution reached a terminal
2354 /// `completed`/`failed`/`cancelled`/`expired` state. Empty /
2355 /// absent while still in flight.
2356 #[serde(default, skip_serializing_if = "Option::is_none")]
2357 pub completed_at: Option<String>,
2358 pub current_attempt_index: u32,
2359 pub flow_id: Option<String>,
2360 pub blocking_detail: String,
2361}
2362
2363// ─── set_execution_tags / set_flow_tags (issue #58.4) ───
2364
2365/// Args for `ff_set_execution_tags`. Tag keys MUST match
2366/// `^[a-z][a-z0-9_]*\.` — the caller-namespace rule — or the FCALL
2367/// returns `invalid_tag_key`. Values are arbitrary strings. The map is
2368/// ordered (`BTreeMap`) so two callers submitting the same logical set
2369/// of tags produce identical ARGV.
2370#[derive(Clone, Debug, Serialize, Deserialize)]
2371pub struct SetExecutionTagsArgs {
2372 pub execution_id: ExecutionId,
2373 pub tags: BTreeMap<String, String>,
2374}
2375
2376/// Result of `ff_set_execution_tags`.
2377#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2378pub enum SetExecutionTagsResult {
2379 /// Tags written. `count` is the number of key-value pairs applied.
2380 Ok { count: u32 },
2381}
2382
2383/// Args for `ff_set_flow_tags`. Same namespace rule as
2384/// [`SetExecutionTagsArgs`]. The Lua function also lazy-migrates any
2385/// pre-58.4 reserved-namespace fields stashed inline on `flow_core` into
2386/// the new tags key.
2387#[derive(Clone, Debug, Serialize, Deserialize)]
2388pub struct SetFlowTagsArgs {
2389 pub flow_id: FlowId,
2390 pub tags: BTreeMap<String, String>,
2391}
2392
2393/// Result of `ff_set_flow_tags`.
2394#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2395pub enum SetFlowTagsResult {
2396 /// Tags written. `count` is the number of key-value pairs applied.
2397 Ok { count: u32 },
2398}
2399
2400// ─── describe_execution (issue #58.1) ───
2401
2402/// Engine-decoupled read-model for one execution.
2403///
2404/// Returned by `ff_sdk::FlowFabricWorker::describe_execution`. Consumers
2405/// consult this struct instead of reaching into Valkey's exec_core hash
2406/// directly — the engine is free to rename fields or restructure storage
2407/// under this surface.
2408///
2409/// `#[non_exhaustive]` — FF may add fields in minor releases without a
2410/// semver break. Match with `..` or use field-by-field construction.
2411#[derive(Clone, Debug, PartialEq, Eq)]
2412#[non_exhaustive]
2413pub struct ExecutionSnapshot {
2414 pub execution_id: ExecutionId,
2415 pub flow_id: Option<FlowId>,
2416 pub lane_id: LaneId,
2417 pub namespace: Namespace,
2418 pub public_state: PublicState,
2419 /// Blocking reason string (e.g. `"waiting_for_worker"`,
2420 /// `"waiting_for_delay"`, `"waiting_for_dependencies"`). `None` when
2421 /// the exec_core field is empty.
2422 pub blocking_reason: Option<String>,
2423 /// Free-form operator-readable detail explaining `blocking_reason`.
2424 /// `None` when the exec_core field is empty.
2425 pub blocking_detail: Option<String>,
2426 /// Summary of the execution's currently-active attempt. `None` when
2427 /// no attempt has been started (pre-claim) or when the exec_core
2428 /// attempt fields are all empty.
2429 pub current_attempt: Option<AttemptSummary>,
2430 /// Summary of the execution's currently-held lease. `None` when the
2431 /// execution is not held by a worker.
2432 pub current_lease: Option<LeaseSummary>,
2433 /// The waitpoint this execution is currently suspended on, if any.
2434 pub current_waitpoint: Option<WaitpointId>,
2435 pub created_at: TimestampMs,
2436 /// Timestamp of the last write that mutated exec_core. Engine-maintained.
2437 pub last_mutation_at: TimestampMs,
2438 pub total_attempt_count: u32,
2439 /// Caller-owned labels. The prefix `^[a-z][a-z0-9_]*\.` is reserved for
2440 /// consumer metadata (e.g. `cairn.task_id`); FF guarantees it will not
2441 /// write keys matching that shape. FF's own fields stay in snake_case
2442 /// without dots. Empty when no tags are set.
2443 pub tags: BTreeMap<String, String>,
2444}
2445
2446impl ExecutionSnapshot {
2447 /// Construct an [`ExecutionSnapshot`]. Present so downstream crates
2448 /// (ff-sdk's `describe_execution`) can assemble the struct despite
2449 /// the `#[non_exhaustive]` marker. Prefer adding builder-style
2450 /// helpers here over loosening `non_exhaustive`.
2451 #[allow(clippy::too_many_arguments)]
2452 pub fn new(
2453 execution_id: ExecutionId,
2454 flow_id: Option<FlowId>,
2455 lane_id: LaneId,
2456 namespace: Namespace,
2457 public_state: PublicState,
2458 blocking_reason: Option<String>,
2459 blocking_detail: Option<String>,
2460 current_attempt: Option<AttemptSummary>,
2461 current_lease: Option<LeaseSummary>,
2462 current_waitpoint: Option<WaitpointId>,
2463 created_at: TimestampMs,
2464 last_mutation_at: TimestampMs,
2465 total_attempt_count: u32,
2466 tags: BTreeMap<String, String>,
2467 ) -> Self {
2468 Self {
2469 execution_id,
2470 flow_id,
2471 lane_id,
2472 namespace,
2473 public_state,
2474 blocking_reason,
2475 blocking_detail,
2476 current_attempt,
2477 current_lease,
2478 current_waitpoint,
2479 created_at,
2480 last_mutation_at,
2481 total_attempt_count,
2482 tags,
2483 }
2484 }
2485}
2486
2487/// Currently-active attempt summary inside an [`ExecutionSnapshot`].
2488///
2489/// `#[non_exhaustive]`.
2490#[derive(Clone, Debug, PartialEq, Eq)]
2491#[non_exhaustive]
2492pub struct AttemptSummary {
2493 pub attempt_id: AttemptId,
2494 pub attempt_index: AttemptIndex,
2495}
2496
2497impl AttemptSummary {
2498 /// Construct an [`AttemptSummary`]. See [`ExecutionSnapshot::new`]
2499 /// for the rationale — `#[non_exhaustive]` blocks cross-crate
2500 /// struct-literal construction.
2501 pub fn new(attempt_id: AttemptId, attempt_index: AttemptIndex) -> Self {
2502 Self {
2503 attempt_id,
2504 attempt_index,
2505 }
2506 }
2507}
2508
2509/// Currently-held lease summary inside an [`ExecutionSnapshot`].
2510///
2511/// `#[non_exhaustive]`. New fields may be added in minor releases — use
2512/// [`LeaseSummary::new`] plus the fluent `with_*` setters to construct
2513/// one, and match with `..` in destructuring.
2514///
2515/// # Field provenance (FF#278)
2516///
2517/// * `lease_id` — minted at claim time (`ff_claim_execution` /
2518/// `ff_claim_resumed_execution`), cleared atomically with the other
2519/// lease fields on revoke/expire/complete. Stable for the lifetime
2520/// of a single lease; a fresh one is minted per re-claim. Defaults
2521/// to the nil UUID when a backend does not surface per-lease ids
2522/// (treat as "field not populated").
2523/// * `attempt_index` — the 1-based attempt counter (`current_attempt_index`
2524/// on `exec_core`). Set atomically with `current_attempt_id` at claim
2525/// time; always populated while a Valkey-backed lease is held.
2526/// * `last_heartbeat_at` — the most recent `ff_renew_lease` timestamp
2527/// (`lease_last_renewed_at` on `exec_core`). `None` when the field
2528/// is empty (e.g. legacy data pre-0.9, or a backend that does not
2529/// surface per-renewal heartbeats).
2530#[derive(Clone, Debug, PartialEq, Eq)]
2531#[non_exhaustive]
2532pub struct LeaseSummary {
2533 pub lease_epoch: LeaseEpoch,
2534 pub worker_instance_id: WorkerInstanceId,
2535 pub expires_at: TimestampMs,
2536 /// Per-lease unique identity. Correlates audit-log entries,
2537 /// reclaim events, and recovery traces.
2538 pub lease_id: LeaseId,
2539 /// 1-based attempt counter; `.0` mirrors `current_attempt_index`
2540 /// on `exec_core`.
2541 pub attempt_index: AttemptIndex,
2542 /// Most recent heartbeat (lease-renewal) timestamp. `None` when the
2543 /// backend does not surface per-renewal ticks on this lease.
2544 pub last_heartbeat_at: Option<TimestampMs>,
2545}
2546
2547impl LeaseSummary {
2548 /// Construct a [`LeaseSummary`] with the three always-present
2549 /// fields. Use the `with_*` setters to populate the FF#278
2550 /// additions (`lease_id`, `attempt_index`, `last_heartbeat_at`);
2551 /// otherwise they default to the nil / zero / empty forms, which
2552 /// callers should treat as "field not surfaced by this backend".
2553 ///
2554 /// See [`ExecutionSnapshot::new`] for the broader `#[non_exhaustive]`
2555 /// construction rationale.
2556 pub fn new(
2557 lease_epoch: LeaseEpoch,
2558 worker_instance_id: WorkerInstanceId,
2559 expires_at: TimestampMs,
2560 ) -> Self {
2561 Self {
2562 lease_epoch,
2563 worker_instance_id,
2564 expires_at,
2565 lease_id: LeaseId::from_uuid(uuid::Uuid::nil()),
2566 attempt_index: AttemptIndex::new(0),
2567 last_heartbeat_at: None,
2568 }
2569 }
2570
2571 /// Set the lease's unique identity (FF#278).
2572 #[must_use]
2573 pub fn with_lease_id(mut self, lease_id: LeaseId) -> Self {
2574 self.lease_id = lease_id;
2575 self
2576 }
2577
2578 /// Set the 1-based attempt counter (FF#278).
2579 #[must_use]
2580 pub fn with_attempt_index(mut self, attempt_index: AttemptIndex) -> Self {
2581 self.attempt_index = attempt_index;
2582 self
2583 }
2584
2585 /// Set the most recent heartbeat timestamp (FF#278).
2586 #[must_use]
2587 pub fn with_last_heartbeat_at(mut self, ts: TimestampMs) -> Self {
2588 self.last_heartbeat_at = Some(ts);
2589 self
2590 }
2591}
2592
2593// ─── read_execution_context (v0.12 agnostic-SDK prep, PR-1) ───
2594
2595/// Point-read bundle of the three execution-scoped fields the SDK
2596/// worker needs to construct a `ClaimedTask` (see `ff_sdk::ClaimedTask`):
2597/// `input_payload`, `execution_kind`, and `tags`.
2598///
2599/// Returned by
2600/// [`EngineBackend::read_execution_context`](crate::engine_backend::EngineBackend::read_execution_context).
2601/// All three fields are execution-scoped (not per-attempt) across
2602/// Valkey, Postgres, and SQLite — there is no per-attempt variant in
2603/// the data model.
2604///
2605/// `#[non_exhaustive]` — FF may add fields in minor releases without a
2606/// semver break. Construct via [`ExecutionContext::new`]; match with
2607/// `..` when destructuring.
2608#[derive(Clone, Debug, PartialEq, Eq)]
2609#[non_exhaustive]
2610pub struct ExecutionContext {
2611 /// Opaque payload handed to the execution body. Empty when the
2612 /// execution was created with no payload.
2613 pub input_payload: Vec<u8>,
2614 /// Caller-supplied `execution_kind` label — free-form string
2615 /// identifying which handler the worker should dispatch to.
2616 pub execution_kind: String,
2617 /// Caller-owned tag map. Tag key conventions mirror
2618 /// [`ExecutionSnapshot::tags`]; empty when no tags are set.
2619 pub tags: HashMap<String, String>,
2620}
2621
2622impl ExecutionContext {
2623 /// Construct an [`ExecutionContext`]. Present so downstream crates
2624 /// (concrete `EngineBackend` impls) can assemble the struct despite
2625 /// the `#[non_exhaustive]` marker. Prefer adding builder-style
2626 /// helpers here over loosening `non_exhaustive`.
2627 pub fn new(
2628 input_payload: Vec<u8>,
2629 execution_kind: String,
2630 tags: HashMap<String, String>,
2631 ) -> Self {
2632 Self {
2633 input_payload,
2634 execution_kind,
2635 tags,
2636 }
2637 }
2638}
2639
2640// ─── Common sub-types ───
2641
2642// ─── describe_flow (issue #58.2) ───
2643
2644/// Engine-decoupled read-model for one flow.
2645///
2646/// Returned by `ff_sdk::FlowFabricWorker::describe_flow`. Consumers
2647/// consult this struct instead of reaching into Valkey's flow_core hash
2648/// directly — the engine is free to rename fields or restructure storage
2649/// under this surface.
2650///
2651/// `#[non_exhaustive]` — FF may add fields in minor releases without a
2652/// semver break. Match with `..` or use [`FlowSnapshot::new`].
2653///
2654/// # `public_flow_state`
2655///
2656/// Stored as an engine-written string literal on `flow_core`. Known
2657/// values today: `open`, `running`, `blocked`, `cancelled`, `completed`,
2658/// `failed`. Surfaced as `String` (not a typed enum) because FF does
2659/// not yet expose a `PublicFlowState` type — callers that need to act
2660/// on specific values should match on the literal. The flow_projector
2661/// writes a parallel `public_flow_state` into the flow's summary hash;
2662/// this field reflects the authoritative value on `flow_core`, which
2663/// is what mutation guards (cancel/add-member) consult.
2664///
2665/// # `tags`
2666///
2667/// Unlike [`ExecutionSnapshot::tags`] (which has a dedicated tags
2668/// hash), flow tags live inline on `flow_core`. FF's own fields are
2669/// snake_case without a `.`; any field whose name starts with
2670/// `<lowercase>.` (e.g. `cairn.task_id`) is treated as consumer-owned
2671/// metadata and routed here. An empty map means no namespaced tags
2672/// were written. The prefix convention mirrors
2673/// [`ExecutionSnapshot::tags`] — consumers should keep tag keys
2674/// namespaced (`cairn.*`, `operator.*`, etc.) so future FF field
2675/// additions don't collide.
2676#[derive(Clone, Debug, PartialEq, Eq)]
2677#[non_exhaustive]
2678pub struct FlowSnapshot {
2679 pub flow_id: FlowId,
2680 /// The `flow_kind` literal passed to `create_flow` (e.g. `dag`,
2681 /// `pipeline`). Preserved as-is; FF does not interpret it.
2682 pub flow_kind: String,
2683 pub namespace: Namespace,
2684 /// Authoritative flow state on `flow_core`. See the struct-level
2685 /// docs for the set of known values.
2686 pub public_flow_state: String,
2687 /// Monotonically increasing revision bumped on every structural
2688 /// mutation (add-member, stage-edge). Used by optimistic-concurrency
2689 /// writers via `expected_graph_revision`.
2690 pub graph_revision: u64,
2691 /// Number of member executions added so far. Never decremented.
2692 pub node_count: u32,
2693 /// Number of dependency edges staged so far. Never decremented.
2694 pub edge_count: u32,
2695 pub created_at: TimestampMs,
2696 /// Timestamp of the last write that mutated `flow_core`.
2697 /// Engine-maintained.
2698 pub last_mutation_at: TimestampMs,
2699 /// When the flow reached a terminal state via `cancel_flow`. `None`
2700 /// while the flow is live. Only written by the cancel path today;
2701 /// `completed`/`failed` terminal states do not populate this field
2702 /// (the flow_projector derives them from membership).
2703 pub cancelled_at: Option<TimestampMs>,
2704 /// Operator-supplied reason from the `cancel_flow` call. `None`
2705 /// when the flow has not been cancelled.
2706 pub cancel_reason: Option<String>,
2707 /// The `cancellation_policy` value persisted by `cancel_flow`
2708 /// (e.g. `cancel_all`, `cancel_flow_only`). `None` for flows
2709 /// cancelled before this field was persisted, or not yet cancelled.
2710 pub cancellation_policy: Option<String>,
2711 /// Consumer-owned namespaced metadata (e.g. `cairn.task_id`). See
2712 /// the struct-level docs for the routing rule.
2713 pub tags: BTreeMap<String, String>,
2714 /// RFC-016 Stage A: inbound edge groups known on this flow.
2715 ///
2716 /// One entry per downstream execution that has at least one staged
2717 /// inbound dependency edge. Populated from the
2718 /// `ff:flow:{fp:N}:<flow_id>:edgegroup:<downstream_eid>` hash —
2719 /// when that hash is absent (existing flows created before Stage A),
2720 /// the backend falls back to reading the legacy
2721 /// `deps_meta.unsatisfied_required_count` counter on the
2722 /// downstream's exec partition and reports the group as
2723 /// [`EdgeDependencyPolicy::AllOf`] with the derived counters
2724 /// (backward-compat shim — see RFC-016 §11 Stage A).
2725 ///
2726 /// Every entry in Stage A reports `policy = AllOf`; Stage B/C/D/E
2727 /// extend the variants and wire the quorum counters.
2728 pub edge_groups: Vec<EdgeGroupSnapshot>,
2729}
2730
2731impl FlowSnapshot {
2732 /// Construct a [`FlowSnapshot`]. Present so downstream crates
2733 /// (ff-sdk's `describe_flow`) can assemble the struct despite the
2734 /// `#[non_exhaustive]` marker.
2735 #[allow(clippy::too_many_arguments)]
2736 pub fn new(
2737 flow_id: FlowId,
2738 flow_kind: String,
2739 namespace: Namespace,
2740 public_flow_state: String,
2741 graph_revision: u64,
2742 node_count: u32,
2743 edge_count: u32,
2744 created_at: TimestampMs,
2745 last_mutation_at: TimestampMs,
2746 cancelled_at: Option<TimestampMs>,
2747 cancel_reason: Option<String>,
2748 cancellation_policy: Option<String>,
2749 tags: BTreeMap<String, String>,
2750 edge_groups: Vec<EdgeGroupSnapshot>,
2751 ) -> Self {
2752 Self {
2753 flow_id,
2754 flow_kind,
2755 namespace,
2756 public_flow_state,
2757 graph_revision,
2758 node_count,
2759 edge_count,
2760 created_at,
2761 last_mutation_at,
2762 cancelled_at,
2763 cancel_reason,
2764 cancellation_policy,
2765 tags,
2766 edge_groups,
2767 }
2768 }
2769}
2770
2771// ─── describe_edge / list_*_edges (issue #58.3) ───
2772
2773/// Engine-decoupled read-model for one dependency edge.
2774///
2775/// Returned by `ff_sdk::FlowFabricWorker::describe_edge`,
2776/// `list_incoming_edges`, and `list_outgoing_edges`. Consumers consult
2777/// this struct instead of reaching into Valkey's per-flow `edge:` hash
2778/// directly — the engine is free to rename hash fields or restructure
2779/// key layout under this surface.
2780///
2781/// `#[non_exhaustive]` — FF may add fields in minor releases without a
2782/// semver break. Match with `..` or use [`EdgeSnapshot::new`].
2783///
2784/// # Fields
2785///
2786/// The struct mirrors the immutable edge record written by
2787/// `ff_stage_dependency_edge` (see `lua/flow.lua`). The flow-scoped
2788/// edge hash is only ever written once, at staging time; per-execution
2789/// resolution state lives on a separate `dep:<edge_id>` hash and is not
2790/// surfaced here. The `edge_state` field therefore reflects the
2791/// staging-time literal (currently `pending`), not the downstream
2792/// execution's dep-edge state.
2793#[derive(Clone, Debug, PartialEq, Eq)]
2794#[non_exhaustive]
2795pub struct EdgeSnapshot {
2796 pub edge_id: EdgeId,
2797 pub flow_id: FlowId,
2798 pub upstream_execution_id: ExecutionId,
2799 pub downstream_execution_id: ExecutionId,
2800 /// The `dependency_kind` literal (e.g. `success_only`) from
2801 /// `stage_dependency_edge`. Preserved as-is; FF does not interpret
2802 /// it on reads.
2803 pub dependency_kind: String,
2804 /// The satisfaction-condition literal stamped at staging time
2805 /// (e.g. `all_required`).
2806 pub satisfaction_condition: String,
2807 /// Optional opaque handle to a data-passing artifact. `None` when
2808 /// the stored field is empty (the most common case).
2809 pub data_passing_ref: Option<String>,
2810 /// Edge-state literal on the flow-scoped edge hash. Written once
2811 /// at staging as `pending`; this hash is immutable on the flow
2812 /// side. Per-execution resolution state is tracked separately on
2813 /// the child's `dep:<edge_id>` hash.
2814 pub edge_state: String,
2815 pub created_at: TimestampMs,
2816 /// Origin of the edge (e.g. `engine`). Preserved as-is.
2817 pub created_by: String,
2818}
2819
2820/// Direction marker for [`crate::engine_backend::EngineBackend::list_edges`].
2821///
2822/// Carries the subject execution whose adjacency side the caller wants
2823/// to list — mirrors the internal `AdjacencySide + subject_eid` pair
2824/// the ff-sdk free-fn `list_edges_from_set` already uses. Keeping
2825/// direction + subject fused in one enum means the trait method has a
2826/// single `direction` parameter rather than a `(side, eid)` pair, and
2827/// the backend impl can't forget one of the two.
2828///
2829/// * `Outgoing { from_node }` — the caller wants every edge whose
2830/// `upstream_execution_id == from_node`. Corresponds to the
2831/// `out:<execution_id>` adjacency SET under the execution's flow
2832/// partition.
2833/// * `Incoming { to_node }` — the caller wants every edge whose
2834/// `downstream_execution_id == to_node`. Corresponds to the
2835/// `in:<execution_id>` adjacency SET under the execution's flow
2836/// partition.
2837#[derive(Clone, Debug, PartialEq, Eq)]
2838pub enum EdgeDirection {
2839 /// Edges leaving `from_node` — `out:` adjacency SET.
2840 Outgoing {
2841 /// The subject execution whose outgoing edges to list.
2842 from_node: ExecutionId,
2843 },
2844 /// Edges landing on `to_node` — `in:` adjacency SET.
2845 Incoming {
2846 /// The subject execution whose incoming edges to list.
2847 to_node: ExecutionId,
2848 },
2849}
2850
2851impl EdgeDirection {
2852 /// Return the subject execution id regardless of direction. Shared
2853 /// helper for backend impls that need the execution id for the
2854 /// initial `HGET exec_core.flow_id` lookup (flow routing) before
2855 /// they know which adjacency SET to read.
2856 pub fn subject(&self) -> &ExecutionId {
2857 match self {
2858 Self::Outgoing { from_node } => from_node,
2859 Self::Incoming { to_node } => to_node,
2860 }
2861 }
2862}
2863
2864impl EdgeSnapshot {
2865 /// Construct an [`EdgeSnapshot`]. Present so downstream crates
2866 /// (ff-sdk's `describe_edge` / `list_*_edges`) can assemble the
2867 /// struct despite the `#[non_exhaustive]` marker.
2868 #[allow(clippy::too_many_arguments)]
2869 pub fn new(
2870 edge_id: EdgeId,
2871 flow_id: FlowId,
2872 upstream_execution_id: ExecutionId,
2873 downstream_execution_id: ExecutionId,
2874 dependency_kind: String,
2875 satisfaction_condition: String,
2876 data_passing_ref: Option<String>,
2877 edge_state: String,
2878 created_at: TimestampMs,
2879 created_by: String,
2880 ) -> Self {
2881 Self {
2882 edge_id,
2883 flow_id,
2884 upstream_execution_id,
2885 downstream_execution_id,
2886 dependency_kind,
2887 satisfaction_condition,
2888 data_passing_ref,
2889 edge_state,
2890 created_at,
2891 created_by,
2892 }
2893 }
2894}
2895
2896// ─── RFC-016 edge-group policy (Stage A) ───
2897
2898/// Policy controlling how an inbound edge group's satisfaction is
2899/// decided.
2900///
2901/// Stage A honours only [`EdgeDependencyPolicy::AllOf`] — the two
2902/// quorum variants exist so the wire/snapshot surface is stable for
2903/// Stage B/C/D's resolver extensions, but
2904/// [`crate::engine_backend::EngineBackend::set_edge_group_policy`]
2905/// rejects them with [`crate::engine_error::EngineError::Validation`]
2906/// until Stage B lands.
2907///
2908/// `#[non_exhaustive]` — future stages may add variants (e.g.
2909/// `Threshold` — see RFC-016 §10.3) without a semver break. Construct
2910/// via the [`EdgeDependencyPolicy::all_of`], [`EdgeDependencyPolicy::any_of`],
2911/// and [`EdgeDependencyPolicy::quorum`] helpers.
2912#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2913#[non_exhaustive]
2914#[serde(tag = "kind", rename_all = "snake_case")]
2915pub enum EdgeDependencyPolicy {
2916 /// Today's behavior: every edge in the inbound group must be
2917 /// satisfied (RFC-007 `all_required` + `success_only`).
2918 AllOf,
2919 /// k-of-n where k==1 — satisfied on the first upstream success.
2920 /// Stage A: rejected on
2921 /// [`crate::engine_backend::EngineBackend::set_edge_group_policy`];
2922 /// resolver emits nothing for this variant yet.
2923 AnyOf {
2924 #[serde(rename = "on_satisfied")]
2925 on_satisfied: OnSatisfied,
2926 },
2927 /// k-of-n quorum. Stage A: rejected on
2928 /// [`crate::engine_backend::EngineBackend::set_edge_group_policy`].
2929 Quorum {
2930 k: u32,
2931 #[serde(rename = "on_satisfied")]
2932 on_satisfied: OnSatisfied,
2933 },
2934}
2935
2936impl EdgeDependencyPolicy {
2937 /// Construct the default all-of policy (RFC-007 behavior).
2938 pub fn all_of() -> Self {
2939 Self::AllOf
2940 }
2941
2942 /// Construct an any-of policy — reserved for Stage B.
2943 pub fn any_of(on_satisfied: OnSatisfied) -> Self {
2944 Self::AnyOf { on_satisfied }
2945 }
2946
2947 /// Construct a quorum policy — reserved for Stage B.
2948 pub fn quorum(k: u32, on_satisfied: OnSatisfied) -> Self {
2949 Self::Quorum { k, on_satisfied }
2950 }
2951
2952 /// Stable string label used for wire format + metric labels.
2953 /// `all_of` | `any_of` | `quorum`.
2954 pub fn variant_str(&self) -> &'static str {
2955 match self {
2956 Self::AllOf => "all_of",
2957 Self::AnyOf { .. } => "any_of",
2958 Self::Quorum { .. } => "quorum",
2959 }
2960 }
2961}
2962
2963/// Policy for unfinished sibling upstreams once the quorum is met.
2964///
2965/// `#[non_exhaustive]` — RFC-016 §10.5 rejects a third variant today
2966/// but keeps the door open. Construct via [`OnSatisfied::cancel_remaining`]
2967/// / [`OnSatisfied::let_run`].
2968#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2969#[non_exhaustive]
2970#[serde(rename_all = "snake_case")]
2971pub enum OnSatisfied {
2972 /// Default. Cancel any still-running siblings once quorum met.
2973 CancelRemaining,
2974 /// Let stragglers finish; their terminals update counters for
2975 /// observability only (one-shot downstream).
2976 LetRun,
2977}
2978
2979impl OnSatisfied {
2980 /// Construct the default `cancel_remaining` disposition.
2981 pub fn cancel_remaining() -> Self {
2982 Self::CancelRemaining
2983 }
2984
2985 /// Construct the `let_run` disposition.
2986 pub fn let_run() -> Self {
2987 Self::LetRun
2988 }
2989
2990 /// Stable string label for wire format.
2991 pub fn variant_str(&self) -> &'static str {
2992 match self {
2993 Self::CancelRemaining => "cancel_remaining",
2994 Self::LetRun => "let_run",
2995 }
2996 }
2997}
2998
2999/// Edge-group lifecycle state (Stage A exposes only `pending` +
3000/// `satisfied` + `impossible`; `cancelled` reserved for Stage C).
3001#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
3002#[non_exhaustive]
3003#[serde(rename_all = "snake_case")]
3004pub enum EdgeGroupState {
3005 Pending,
3006 Satisfied,
3007 Impossible,
3008 Cancelled,
3009}
3010
3011impl EdgeGroupState {
3012 pub fn from_literal(s: &str) -> Self {
3013 match s {
3014 "satisfied" => Self::Satisfied,
3015 "impossible" => Self::Impossible,
3016 "cancelled" => Self::Cancelled,
3017 _ => Self::Pending,
3018 }
3019 }
3020
3021 pub fn as_str(&self) -> &'static str {
3022 match self {
3023 Self::Pending => "pending",
3024 Self::Satisfied => "satisfied",
3025 Self::Impossible => "impossible",
3026 Self::Cancelled => "cancelled",
3027 }
3028 }
3029}
3030
3031/// Snapshot of one inbound edge group (per downstream execution).
3032///
3033/// Exposed via [`FlowSnapshot::edge_groups`]. Stage A only populates
3034/// `AllOf` groups and their counters; Stage B/C add `failed` /
3035/// `skipped` / `satisfied_at` wiring for the quorum variants.
3036///
3037/// `#[non_exhaustive]` — future stages will add fields (`satisfied_at`,
3038/// `failed_count` write-path, `cancel_siblings_pending`). Match with
3039/// `..` or use [`EdgeGroupSnapshot::new`].
3040#[derive(Clone, Debug, PartialEq, Eq)]
3041#[non_exhaustive]
3042pub struct EdgeGroupSnapshot {
3043 pub downstream_execution_id: ExecutionId,
3044 pub policy: EdgeDependencyPolicy,
3045 pub total_deps: u32,
3046 pub satisfied_count: u32,
3047 pub failed_count: u32,
3048 pub skipped_count: u32,
3049 pub running_count: u32,
3050 pub group_state: EdgeGroupState,
3051}
3052
3053impl EdgeGroupSnapshot {
3054 #[allow(clippy::too_many_arguments)]
3055 pub fn new(
3056 downstream_execution_id: ExecutionId,
3057 policy: EdgeDependencyPolicy,
3058 total_deps: u32,
3059 satisfied_count: u32,
3060 failed_count: u32,
3061 skipped_count: u32,
3062 running_count: u32,
3063 group_state: EdgeGroupState,
3064 ) -> Self {
3065 Self {
3066 downstream_execution_id,
3067 policy,
3068 total_deps,
3069 satisfied_count,
3070 failed_count,
3071 skipped_count,
3072 running_count,
3073 group_state,
3074 }
3075 }
3076}
3077
3078// ─── set_edge_group_policy (RFC-016 §6.1) ───
3079
3080#[derive(Clone, Debug, Serialize, Deserialize)]
3081pub struct SetEdgeGroupPolicyArgs {
3082 pub flow_id: FlowId,
3083 pub downstream_execution_id: ExecutionId,
3084 pub policy: EdgeDependencyPolicy,
3085 pub now: TimestampMs,
3086}
3087
3088#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
3089pub enum SetEdgeGroupPolicyResult {
3090 /// Policy stored (fresh write).
3091 Set,
3092 /// Policy already stored with an identical value (idempotent).
3093 AlreadySet,
3094}
3095
3096// ─── list_flows (issue #185) ───
3097
3098/// Typed flow-lifecycle status surfaced on [`FlowSummary`].
3099///
3100/// Mirrors the free-form `public_flow_state` literal that FF's flow
3101/// lifecycle writes onto `flow_core` (known values: `open`, `running`,
3102/// `blocked`, `cancelled`, `completed`, `failed` — see [`FlowSnapshot`]).
3103/// The three "active" runtime states (`open`, `running`, `blocked`)
3104/// collapse to [`FlowStatus::Active`] here — callers that need the
3105/// exact runtime sub-state should use [`FlowSnapshot::public_flow_state`]
3106/// via [`crate::engine_backend::EngineBackend::describe_flow`]. `failed`
3107/// maps to [`FlowStatus::Failed`].
3108///
3109/// `#[non_exhaustive]` so future lifecycle states (if FF introduces
3110/// any) can be added without a semver break.
3111#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
3112#[non_exhaustive]
3113pub enum FlowStatus {
3114 /// `open` / `running` / `blocked` — flow is still live on the engine.
3115 Active,
3116 /// Terminal success: all members reached a successful terminal state
3117 /// and the flow projector flipped `public_flow_state` to `completed`.
3118 Completed,
3119 /// Terminal failure: one or more members failed and the flow
3120 /// projector flipped `public_flow_state` to `failed`.
3121 Failed,
3122 /// Cancelled by an operator via `cancel_flow`.
3123 Cancelled,
3124 /// The stored `public_flow_state` literal is present but not a
3125 /// known value. The raw literal is preserved on
3126 /// [`FlowSnapshot::public_flow_state`] — callers that need to act
3127 /// on it should fall back to [`crate::engine_backend::EngineBackend::describe_flow`].
3128 Unknown,
3129}
3130
3131impl FlowStatus {
3132 /// Map the raw `public_flow_state` literal stored on `flow_core`
3133 /// to a typed [`FlowStatus`]. Unknown literals surface as
3134 /// [`FlowStatus::Unknown`] so the list surface stays forwards-
3135 /// compatible with future engine-side state additions.
3136 pub fn from_public_flow_state(raw: &str) -> Self {
3137 match raw {
3138 "open" | "running" | "blocked" => Self::Active,
3139 "completed" => Self::Completed,
3140 "failed" => Self::Failed,
3141 "cancelled" => Self::Cancelled,
3142 _ => Self::Unknown,
3143 }
3144 }
3145}
3146
3147/// Lightweight per-flow projection returned by
3148/// [`crate::engine_backend::EngineBackend::list_flows`].
3149///
3150/// Deliberately narrower than [`FlowSnapshot`] — listing pages serve
3151/// dashboard-style enumerations where the caller does not want to pay
3152/// for the full `flow_core` hash on every row. Consumers that need
3153/// revision / node-count / tags / cancel metadata should fan out to
3154/// [`crate::engine_backend::EngineBackend::describe_flow`] for the
3155/// specific ids they care about.
3156///
3157/// `#[non_exhaustive]` — FF may add fields in minor releases without
3158/// a semver break. Match with `..` or use [`FlowSummary::new`].
3159#[derive(Clone, Debug, PartialEq, Eq)]
3160#[non_exhaustive]
3161pub struct FlowSummary {
3162 pub flow_id: FlowId,
3163 /// Timestamp (ms since unix epoch) `flow_core.created_at` was
3164 /// stamped. Mirrors [`FlowSnapshot::created_at`]; kept typed so
3165 /// callers that want raw millis can read `.0`.
3166 pub created_at: TimestampMs,
3167 /// Typed projection of `flow_core.public_flow_state`. See
3168 /// [`FlowStatus`] for the mapping.
3169 pub status: FlowStatus,
3170}
3171
3172impl FlowSummary {
3173 /// Construct a [`FlowSummary`]. Present so downstream crates can
3174 /// assemble the struct despite the `#[non_exhaustive]` marker.
3175 pub fn new(flow_id: FlowId, created_at: TimestampMs, status: FlowStatus) -> Self {
3176 Self {
3177 flow_id,
3178 created_at,
3179 status,
3180 }
3181 }
3182}
3183
3184/// One page of [`FlowSummary`] rows returned by
3185/// [`crate::engine_backend::EngineBackend::list_flows`].
3186///
3187/// `next_cursor` is `Some(last_flow_id)` when at least one more row
3188/// may exist on the partition — callers forward it verbatim as the
3189/// next call's `cursor` argument to continue iteration. `None` means
3190/// the listing is exhausted. Cursor semantics match the Postgres
3191/// `WHERE flow_id > $cursor ORDER BY flow_id LIMIT $limit` pattern
3192/// (see the trait method's rustdoc).
3193///
3194/// `#[non_exhaustive]` — FF may add summary-level fields (total count,
3195/// partition hint) in future minor releases without a semver break.
3196#[derive(Clone, Debug, PartialEq, Eq)]
3197#[non_exhaustive]
3198pub struct ListFlowsPage {
3199 pub flows: Vec<FlowSummary>,
3200 pub next_cursor: Option<FlowId>,
3201}
3202
3203impl ListFlowsPage {
3204 /// Construct a [`ListFlowsPage`]. Present so downstream crates can
3205 /// assemble the struct despite the `#[non_exhaustive]` marker.
3206 pub fn new(flows: Vec<FlowSummary>, next_cursor: Option<FlowId>) -> Self {
3207 Self { flows, next_cursor }
3208 }
3209}
3210
3211/// Summary of state after a mutation, returned by many functions.
3212#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
3213pub struct StateSummary {
3214 pub state_vector: StateVector,
3215 pub current_attempt_index: AttemptIndex,
3216}
3217
3218#[cfg(test)]
3219mod tests {
3220 use super::*;
3221 use crate::types::FlowId;
3222
3223 #[test]
3224 fn create_execution_args_serde() {
3225 let config = crate::partition::PartitionConfig::default();
3226 let args = CreateExecutionArgs {
3227 execution_id: ExecutionId::for_flow(&FlowId::new(), &config),
3228 namespace: Namespace::new("test"),
3229 lane_id: LaneId::new("default"),
3230 execution_kind: "llm_call".to_owned(),
3231 input_payload: b"hello".to_vec(),
3232 payload_encoding: Some("json".to_owned()),
3233 priority: 0,
3234 creator_identity: "test-user".to_owned(),
3235 idempotency_key: None,
3236 tags: HashMap::new(),
3237 policy: None,
3238 delay_until: None,
3239 execution_deadline_at: None,
3240 partition_id: 42,
3241 now: TimestampMs::now(),
3242 };
3243 let json = serde_json::to_string(&args).unwrap();
3244 let parsed: CreateExecutionArgs = serde_json::from_str(&json).unwrap();
3245 assert_eq!(args.execution_id, parsed.execution_id);
3246 }
3247
3248 #[test]
3249 fn claim_result_serde() {
3250 let config = crate::partition::PartitionConfig::default();
3251 let result = ClaimExecutionResult::Claimed(ClaimedExecution {
3252 execution_id: ExecutionId::for_flow(&FlowId::new(), &config),
3253 lease_id: LeaseId::new(),
3254 lease_epoch: LeaseEpoch::new(1),
3255 attempt_index: AttemptIndex::new(0),
3256 attempt_id: AttemptId::new(),
3257 attempt_type: AttemptType::Initial,
3258 lease_expires_at: TimestampMs::from_millis(1000),
3259 handle: crate::backend::stub_handle_fresh(),
3260 });
3261 let json = serde_json::to_string(&result).unwrap();
3262 let parsed: ClaimExecutionResult = serde_json::from_str(&json).unwrap();
3263 assert_eq!(result, parsed);
3264 }
3265
3266 // ── StreamCursor (issue #92) ──
3267
3268 #[test]
3269 fn stream_cursor_display_matches_wire_tokens() {
3270 assert_eq!(StreamCursor::Start.to_string(), "start");
3271 assert_eq!(StreamCursor::End.to_string(), "end");
3272 assert_eq!(StreamCursor::At("123".into()).to_string(), "123");
3273 assert_eq!(StreamCursor::At("123-4".into()).to_string(), "123-4");
3274 }
3275
3276 #[test]
3277 fn stream_cursor_to_wire_maps_to_valkey_markers() {
3278 assert_eq!(StreamCursor::Start.to_wire(), "-");
3279 assert_eq!(StreamCursor::End.to_wire(), "+");
3280 assert_eq!(StreamCursor::At("0-0".into()).to_wire(), "0-0");
3281 assert_eq!(StreamCursor::At("17-3".into()).to_wire(), "17-3");
3282 }
3283
3284 #[test]
3285 fn stream_cursor_from_str_accepts_wire_tokens() {
3286 use std::str::FromStr;
3287 assert_eq!(
3288 StreamCursor::from_str("start").unwrap(),
3289 StreamCursor::Start
3290 );
3291 assert_eq!(StreamCursor::from_str("end").unwrap(), StreamCursor::End);
3292 assert_eq!(
3293 StreamCursor::from_str("123").unwrap(),
3294 StreamCursor::At("123".into())
3295 );
3296 assert_eq!(
3297 StreamCursor::from_str("0-0").unwrap(),
3298 StreamCursor::At("0-0".into())
3299 );
3300 assert_eq!(
3301 StreamCursor::from_str("1713100800150-0").unwrap(),
3302 StreamCursor::At("1713100800150-0".into())
3303 );
3304 }
3305
3306 #[test]
3307 fn stream_cursor_from_str_rejects_bare_markers() {
3308 use std::str::FromStr;
3309 assert!(matches!(
3310 StreamCursor::from_str("-"),
3311 Err(StreamCursorParseError::BareMarkerRejected(s)) if s == "-"
3312 ));
3313 assert!(matches!(
3314 StreamCursor::from_str("+"),
3315 Err(StreamCursorParseError::BareMarkerRejected(s)) if s == "+"
3316 ));
3317 }
3318
3319 #[test]
3320 fn stream_cursor_from_str_rejects_empty() {
3321 use std::str::FromStr;
3322 assert_eq!(
3323 StreamCursor::from_str(""),
3324 Err(StreamCursorParseError::Empty)
3325 );
3326 }
3327
3328 #[test]
3329 fn stream_cursor_from_str_rejects_malformed() {
3330 use std::str::FromStr;
3331 for bad in [
3332 "abc", "-1", "1-", "-1-2", "1-2-3", "1.2", "1 2", "Start", "END",
3333 ] {
3334 assert!(
3335 matches!(
3336 StreamCursor::from_str(bad),
3337 Err(StreamCursorParseError::Malformed(_))
3338 ),
3339 "must reject {bad:?}",
3340 );
3341 }
3342 }
3343
3344 #[test]
3345 fn stream_cursor_from_str_rejects_non_ascii() {
3346 use std::str::FromStr;
3347 assert!(matches!(
3348 StreamCursor::from_str("1\u{2013}2"),
3349 Err(StreamCursorParseError::Malformed(_))
3350 ));
3351 }
3352
3353 #[test]
3354 fn stream_cursor_serde_round_trip() {
3355 for c in [
3356 StreamCursor::Start,
3357 StreamCursor::End,
3358 StreamCursor::At("0-0".into()),
3359 StreamCursor::At("1713100800150-0".into()),
3360 ] {
3361 let json = serde_json::to_string(&c).unwrap();
3362 let back: StreamCursor = serde_json::from_str(&json).unwrap();
3363 assert_eq!(back, c);
3364 }
3365 }
3366
3367 #[test]
3368 fn stream_cursor_serializes_as_bare_string() {
3369 assert_eq!(
3370 serde_json::to_string(&StreamCursor::Start).unwrap(),
3371 r#""start""#
3372 );
3373 assert_eq!(
3374 serde_json::to_string(&StreamCursor::End).unwrap(),
3375 r#""end""#
3376 );
3377 assert_eq!(
3378 serde_json::to_string(&StreamCursor::At("123-0".into())).unwrap(),
3379 r#""123-0""#
3380 );
3381 }
3382
3383 #[test]
3384 fn stream_cursor_deserialize_rejects_bare_markers() {
3385 assert!(serde_json::from_str::<StreamCursor>(r#""-""#).is_err());
3386 assert!(serde_json::from_str::<StreamCursor>(r#""+""#).is_err());
3387 }
3388
3389 #[test]
3390 fn stream_cursor_from_beginning_is_zero_zero() {
3391 assert_eq!(
3392 StreamCursor::from_beginning(),
3393 StreamCursor::At("0-0".into())
3394 );
3395 }
3396
3397 #[test]
3398 fn stream_cursor_is_concrete_classifies_variants() {
3399 assert!(!StreamCursor::Start.is_concrete());
3400 assert!(!StreamCursor::End.is_concrete());
3401 assert!(StreamCursor::At("0-0".into()).is_concrete());
3402 assert!(StreamCursor::At("123-0".into()).is_concrete());
3403 assert!(StreamCursor::from_beginning().is_concrete());
3404 }
3405
3406 #[test]
3407 fn stream_cursor_into_wire_string_moves_without_cloning() {
3408 assert_eq!(StreamCursor::Start.into_wire_string(), "-");
3409 assert_eq!(StreamCursor::End.into_wire_string(), "+");
3410 assert_eq!(StreamCursor::At("17-3".into()).into_wire_string(), "17-3");
3411 }
3412}
3413
3414// ─── list_executions ───
3415
3416/// Summary of an execution for list views.
3417#[derive(Clone, Debug, Serialize, Deserialize)]
3418pub struct ExecutionSummary {
3419 pub execution_id: ExecutionId,
3420 pub namespace: String,
3421 pub lane_id: String,
3422 pub execution_kind: String,
3423 pub public_state: String,
3424 pub priority: i32,
3425 pub created_at: String,
3426}
3427
3428/// Result of a list_executions query.
3429#[derive(Clone, Debug, Serialize, Deserialize)]
3430pub struct ListExecutionsResult {
3431 pub executions: Vec<ExecutionSummary>,
3432 pub total_returned: usize,
3433}
3434
3435// ─── list_lanes (issue #184) ───
3436
3437/// One page of lane ids returned by
3438/// [`crate::engine_backend::EngineBackend::list_lanes`].
3439///
3440/// Lanes are global (not partition-scoped) — the backend enumerates
3441/// every registered lane, sorts by [`LaneId`] name, and returns a
3442/// `limit`-sized slice starting after `cursor` (exclusive).
3443///
3444/// `next_cursor` is `Some(last_lane_in_page)` when more pages remain
3445/// and `None` when the caller has read the final page. Callers that
3446/// want the full list loop until `next_cursor` is `None`, threading
3447/// the previous page's `next_cursor` into the next call's `cursor`
3448/// argument.
3449///
3450/// `#[non_exhaustive]` — FF may add fields (e.g. a `total` hint) in
3451/// minor releases without a semver break.
3452#[derive(Clone, Debug, PartialEq, Eq)]
3453#[non_exhaustive]
3454pub struct ListLanesPage {
3455 /// The lanes in this page, sorted by [`LaneId`] name.
3456 pub lanes: Vec<LaneId>,
3457 /// Cursor for the next page (exclusive). `None` ⇒ final page.
3458 pub next_cursor: Option<LaneId>,
3459}
3460
3461impl ListLanesPage {
3462 /// Construct a [`ListLanesPage`]. Present so downstream crates
3463 /// (ff-backend-valkey's `list_lanes` impl) can assemble the
3464 /// struct despite the `#[non_exhaustive]` marker.
3465 pub fn new(lanes: Vec<LaneId>, next_cursor: Option<LaneId>) -> Self {
3466 Self { lanes, next_cursor }
3467 }
3468}
3469
3470// ─── list_suspended ───
3471
3472/// One entry in a [`ListSuspendedPage`] — a suspended execution and
3473/// the reason it is blocked, answering an operator's "what's this
3474/// waiting on?" without a follow-up round-trip.
3475///
3476/// `reason` carries the free-form `reason_code` recorded by the
3477/// suspending worker at `lua/suspension.lua` (HSET `suspension:current
3478/// reason_code`). It is a `String`, not a closed enum: the suspension
3479/// pipeline accepts arbitrary caller-supplied codes (typical values
3480/// are `"signal"`, `"timer"`, `"children"`, `"join"`, but consumers
3481/// embed bespoke codes). A future enum projection can classify
3482/// known codes once the set is frozen; until then, callers that want
3483/// structured routing MUST match on the string explicitly.
3484#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
3485#[non_exhaustive]
3486pub struct SuspendedExecutionEntry {
3487 /// Execution currently in `lifecycle_phase=suspended`.
3488 pub execution_id: ExecutionId,
3489 /// Score stored on the per-lane suspended ZSET — the scheduled
3490 /// `timeout_at` in milliseconds, or the `9999999999999` sentinel
3491 /// when no timeout was set (see `lua/suspension.lua`).
3492 pub suspended_at_ms: i64,
3493 /// Free-form reason code from `suspension:current.reason_code`.
3494 /// Empty string when the suspension hash is absent or does not
3495 /// carry a `reason_code` field (older records). See the struct
3496 /// rustdoc for the deliberate-String rationale.
3497 pub reason: String,
3498}
3499
3500impl SuspendedExecutionEntry {
3501 /// Construct a new entry. Preferred over direct field init for
3502 /// `#[non_exhaustive]` forward-compat.
3503 pub fn new(execution_id: ExecutionId, suspended_at_ms: i64, reason: String) -> Self {
3504 Self {
3505 execution_id,
3506 suspended_at_ms,
3507 reason,
3508 }
3509 }
3510}
3511
3512/// One cursor-paginated page of suspended executions.
3513///
3514/// Pagination is cursor-based (not offset/limit) so a Valkey backend
3515/// can resume a partition scan from the last seen execution id and a
3516/// future Postgres backend can do keyset pagination on
3517/// `executions WHERE state='suspended'`. The cursor is opaque to
3518/// callers: pass `next_cursor` back as the `cursor` argument to the
3519/// next [`EngineBackend::list_suspended`] call to fetch the next
3520/// page. `None` means the stream is exhausted.
3521///
3522/// [`EngineBackend::list_suspended`]: crate::engine_backend::EngineBackend::list_suspended
3523#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
3524#[non_exhaustive]
3525pub struct ListSuspendedPage {
3526 /// Entries on this page, ordered by ascending `suspended_at_ms`
3527 /// (timeout order) with `execution_id` as a lex tiebreak.
3528 pub entries: Vec<SuspendedExecutionEntry>,
3529 /// Resume-point for the next page. `None` when no further
3530 /// entries remain in the partition.
3531 pub next_cursor: Option<ExecutionId>,
3532}
3533
3534impl ListSuspendedPage {
3535 /// Construct a new page. Preferred over direct field init for
3536 /// `#[non_exhaustive]` forward-compat.
3537 pub fn new(entries: Vec<SuspendedExecutionEntry>, next_cursor: Option<ExecutionId>) -> Self {
3538 Self {
3539 entries,
3540 next_cursor,
3541 }
3542 }
3543}
3544
3545// ─── list_executions ───
3546
3547/// One page of partition-scoped execution ids returned by
3548/// [`EngineBackend::list_executions`](crate::engine_backend::EngineBackend::list_executions).
3549///
3550/// Pagination is forward-only and cursor-based. `next_cursor` carries
3551/// the last `ExecutionId` emitted in `executions` iff another page is
3552/// available; callers pass that id back as the next call's `cursor`
3553/// (exclusive start). `next_cursor = None` signals end-of-stream.
3554///
3555/// `#[non_exhaustive]` — FF may add fields (e.g. `approximate_total`)
3556/// in minor releases without a semver break. Use
3557/// [`ListExecutionsPage::new`] for cross-crate construction.
3558#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
3559#[non_exhaustive]
3560pub struct ListExecutionsPage {
3561 /// Execution ids on this page, in ascending lexicographic order.
3562 pub executions: Vec<ExecutionId>,
3563 /// Exclusive cursor to request the next page. `None` ⇒ no more
3564 /// results.
3565 pub next_cursor: Option<ExecutionId>,
3566}
3567
3568impl ListExecutionsPage {
3569 /// Construct a [`ListExecutionsPage`]. Present so downstream
3570 /// crates can assemble the struct despite the `#[non_exhaustive]`
3571 /// marker.
3572 pub fn new(executions: Vec<ExecutionId>, next_cursor: Option<ExecutionId>) -> Self {
3573 Self { executions, next_cursor }
3574 }
3575}
3576
3577// ─── rotate_waitpoint_hmac_secret ───
3578
3579/// Args for `ff_rotate_waitpoint_hmac_secret`. Rotates the HMAC signing
3580/// kid on ONE partition. Callers fan out across every partition themselves
3581/// (ff-server does the parallel fan-out in `rotate_waitpoint_secret`;
3582/// direct-Valkey consumers mirror the pattern).
3583///
3584/// "now" is derived server-side from `redis.call("TIME")` inside the FCALL
3585/// (consistency with `validate_waitpoint_token` and flow scanners).
3586/// `grace_ms` is a duration, not a clock value, so carrying it from the
3587/// caller is safe.
3588#[derive(Clone, Debug)]
3589pub struct RotateWaitpointHmacSecretArgs {
3590 pub new_kid: String,
3591 pub new_secret_hex: String,
3592 /// Grace window in ms. Must be non-negative. Tokens signed by the
3593 /// outgoing kid remain valid for `grace_ms` after this rotation.
3594 pub grace_ms: u64,
3595}
3596
3597/// Outcome of a single-partition rotation.
3598#[derive(Clone, Debug, PartialEq, Eq)]
3599pub enum RotateWaitpointHmacSecretOutcome {
3600 /// Installed the new kid. `previous_kid` is `None` on bootstrap
3601 /// (no prior `current_kid`). `gc_count` counts expired kids reaped
3602 /// during this rotation.
3603 Rotated {
3604 previous_kid: Option<String>,
3605 new_kid: String,
3606 gc_count: u32,
3607 },
3608 /// Exact replay — same kid + same secret already installed. Safe
3609 /// operator retry; no state change.
3610 Noop { kid: String },
3611}
3612
3613// ─── rotate_waitpoint_hmac_secret_all ───
3614
3615/// Args for [`EngineBackend::rotate_waitpoint_hmac_secret_all`] — the
3616/// cluster-wide / backend-native rotation of the waitpoint HMAC
3617/// signing kid.
3618///
3619/// **v0.7 migration-master Q4:** a single additive trait method
3620/// replaces the per-partition fan-out that direct-Valkey consumers
3621/// hand-rolled via
3622/// [`ff_sdk::admin::rotate_waitpoint_hmac_secret_all_partitions`].
3623/// On Valkey it fans out N FCALLs (one per execution partition);
3624/// on Postgres (Wave 4) it resolves to a single INSERT against the
3625/// global `ff_waitpoint_hmac(kid, secret, rotated_at)` table (no
3626/// partition_id column). Consumers prefer this method for clarity —
3627/// the pre-existing free-fn + per-partition surface stays available
3628/// for backwards compat.
3629///
3630/// `#[non_exhaustive]` with a [`Self::new`] constructor per the
3631/// project memory-rule (unbuildable non_exhaustive types are a dead
3632/// API).
3633#[derive(Clone, Debug)]
3634#[non_exhaustive]
3635pub struct RotateWaitpointHmacSecretAllArgs {
3636 pub new_kid: String,
3637 pub new_secret_hex: String,
3638 /// Grace window in ms for tokens signed by the outgoing kid.
3639 /// Duration (not a clock value), identical to
3640 /// [`RotateWaitpointHmacSecretArgs::grace_ms`].
3641 pub grace_ms: u64,
3642}
3643
3644impl RotateWaitpointHmacSecretAllArgs {
3645 /// Build the args. Keeping the constructor so consumers don't
3646 /// struct-literal past the `#[non_exhaustive]` marker.
3647 pub fn new(
3648 new_kid: impl Into<String>,
3649 new_secret_hex: impl Into<String>,
3650 grace_ms: u64,
3651 ) -> Self {
3652 Self {
3653 new_kid: new_kid.into(),
3654 new_secret_hex: new_secret_hex.into(),
3655 grace_ms,
3656 }
3657 }
3658}
3659
3660/// Per-partition entry of [`RotateWaitpointHmacSecretAllResult`].
3661/// Mirrors [`ff_sdk::admin::PartitionRotationOutcome`] but typed at
3662/// the `ff-core` layer so both Valkey and Postgres backends return
3663/// the same shape without a Postgres→ferriskey dep.
3664///
3665/// On backends with no partition concept (Postgres) the entry list
3666/// has length 1 with `partition = 0` and the outcome of the global
3667/// row write.
3668#[derive(Debug)]
3669#[non_exhaustive]
3670pub struct RotateWaitpointHmacSecretAllEntry {
3671 pub partition: u16,
3672 /// The per-partition (or global) rotation outcome. Per-partition
3673 /// failures are surfaced as inner `Err` so the fan-out can report
3674 /// partial success — matching the existing SDK free-fn contract.
3675 pub result: Result<RotateWaitpointHmacSecretOutcome, crate::engine_error::EngineError>,
3676}
3677
3678impl RotateWaitpointHmacSecretAllEntry {
3679 pub fn new(
3680 partition: u16,
3681 result: Result<RotateWaitpointHmacSecretOutcome, crate::engine_error::EngineError>,
3682 ) -> Self {
3683 Self { partition, result }
3684 }
3685}
3686
3687/// Result of [`EngineBackend::rotate_waitpoint_hmac_secret_all`].
3688///
3689/// The Valkey backend returns one entry per execution partition. The
3690/// Postgres backend (Wave 4) will return a single-entry vec with
3691/// `partition = 0` since the Postgres schema stores one global row
3692/// per kid (Q4 §adjudication). Consumers that want a uniform "did
3693/// ALL rotations succeed?" view inspect each entry's `.result`.
3694#[derive(Debug)]
3695#[non_exhaustive]
3696pub struct RotateWaitpointHmacSecretAllResult {
3697 pub entries: Vec<RotateWaitpointHmacSecretAllEntry>,
3698}
3699
3700impl RotateWaitpointHmacSecretAllResult {
3701 pub fn new(entries: Vec<RotateWaitpointHmacSecretAllEntry>) -> Self {
3702 Self { entries }
3703 }
3704}
3705
3706// ─── seed_waitpoint_hmac_secret (issue #280) ───
3707
3708/// Args for [`EngineBackend::seed_waitpoint_hmac_secret`].
3709///
3710/// Two required fields and no optional knobs, so there is no fluent
3711/// builder — just `new(kid, secret_hex)`. `#[non_exhaustive]` is kept
3712/// (with the paired constructor, per the project memory rule) so
3713/// future additive knobs don't break callers.
3714///
3715/// Boot-time provisioning entry point for fresh deployments — see
3716/// issue #280 for why cairn needed this in addition to
3717/// [`RotateWaitpointHmacSecretAllArgs`]. Unlike rotate, seed is
3718/// idempotent: callers invoke it on every boot and the backend
3719/// decides whether to install.
3720#[derive(Clone, Debug)]
3721#[non_exhaustive]
3722pub struct SeedWaitpointHmacSecretArgs {
3723 pub kid: String,
3724 pub secret_hex: String,
3725}
3726
3727impl SeedWaitpointHmacSecretArgs {
3728 pub fn new(kid: impl Into<String>, secret_hex: impl Into<String>) -> Self {
3729 Self {
3730 kid: kid.into(),
3731 secret_hex: secret_hex.into(),
3732 }
3733 }
3734}
3735
3736/// Result of [`EngineBackend::seed_waitpoint_hmac_secret`].
3737///
3738/// * `Seeded` — the backend had no `current_kid` (or no row in the
3739/// global keystore) and installed `kid` as the active signing kid.
3740/// * `AlreadySeeded` — a row for `kid` is already installed.
3741/// `same_secret` reports whether the stored secret bytes match the
3742/// caller-supplied hex; `false` means the caller should pick a fresh
3743/// kid for rotation rather than silently re-installing under the
3744/// existing kid.
3745#[derive(Clone, Debug, PartialEq, Eq)]
3746#[non_exhaustive]
3747pub enum SeedOutcome {
3748 Seeded { kid: String },
3749 AlreadySeeded { kid: String, same_secret: bool },
3750}
3751
3752// ─── list_waitpoint_hmac_kids ───
3753
3754#[derive(Clone, Debug, PartialEq, Eq)]
3755pub struct ListWaitpointHmacKidsArgs {}
3756
3757/// Snapshot of the waitpoint HMAC keystore on ONE partition.
3758#[derive(Clone, Debug, PartialEq, Eq)]
3759pub struct WaitpointHmacKids {
3760 /// The currently-signing kid. `None` if uninitialized.
3761 pub current_kid: Option<String>,
3762 /// Kids that still validate existing tokens but no longer sign
3763 /// new ones. Order is Lua HGETALL traversal order — callers that
3764 /// need a stable sort should sort by `expires_at_ms`.
3765 pub verifying: Vec<VerifyingKid>,
3766}
3767
3768#[derive(Clone, Debug, PartialEq, Eq)]
3769pub struct VerifyingKid {
3770 pub kid: String,
3771 pub expires_at_ms: i64,
3772}
3773
3774// ═══════════════════════════════════════════════════════════════════════
3775// RFC-013 Stage 1d: EngineBackend::suspend typed args + outcome
3776// ═══════════════════════════════════════════════════════════════════════
3777//
3778// `SuspendExecutionArgs` / `SuspendExecutionResult` above remain the
3779// wire-level Lua-ARGV mirror used by the backend serializer. The types
3780// below are the public trait-surface shapes RFC-013 §2.2–§2.6 specifies.
3781//
3782// Every type in this block is `#[non_exhaustive]` per the RFC §2.2.1
3783// memory-rule compliance note; each gets a constructor so external-crate
3784// consumers can build them without struct-literal access.
3785
3786use crate::backend::WaitpointHmac;
3787
3788/// Partition-scoped idempotency key for retry-safe `EngineBackend::suspend`.
3789///
3790/// See RFC-013 §2.2 — when set on [`SuspendArgs::idempotency_key`], the
3791/// backend dedups the call on `(partition, execution_id, idempotency_key)`
3792/// and a second `suspend` with the same triple returns the first call's
3793/// [`SuspendOutcome`] verbatim. Absent a key, `suspend` is NOT retry-
3794/// idempotent; callers must describe-and-reconcile per §3.1.
3795///
3796/// Follows the `UsageDimensions::dedup_key` pattern — opaque to the
3797/// engine, byte-compared at the partition scope.
3798#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
3799#[serde(transparent)]
3800pub struct IdempotencyKey(String);
3801
3802impl IdempotencyKey {
3803 /// Construct from any stringy input. Empty strings are accepted;
3804 /// the backend treats an empty key as "no dedup" at the serialize
3805 /// step so `Some(IdempotencyKey::new(""))` is functionally the same
3806 /// as `None`.
3807 pub fn new(key: impl Into<String>) -> Self {
3808 Self(key.into())
3809 }
3810
3811 /// Borrow the underlying string.
3812 pub fn as_str(&self) -> &str {
3813 &self.0
3814 }
3815}
3816
3817impl std::fmt::Display for IdempotencyKey {
3818 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3819 f.write_str(&self.0)
3820 }
3821}
3822
3823/// v1 signal-match predicate inside [`ResumeCondition::Single`].
3824///
3825/// RFC-013 §2.4 — `ByName(String)` matches a single concrete signal
3826/// name; `Wildcard` matches any delivered signal. RFC-014 may extend
3827/// (payload predicates, pattern matching) — `#[non_exhaustive]`.
3828#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
3829#[non_exhaustive]
3830pub enum SignalMatcher {
3831 /// Match by exact signal name.
3832 ByName(String),
3833 /// Match any signal delivered to the waitpoint.
3834 Wildcard,
3835}
3836
3837/// Hard cap on composite-condition nesting depth (RFC-014 §5.4
3838/// invariant 4; §5.5 cap rationale). Soft-cap: bumping requires only
3839/// this constant + the cap-rationale paragraph in RFC-014 §5.5 — no
3840/// wire-format change. Keep in sync.
3841pub const MAX_COMPOSITE_DEPTH: usize = 4;
3842
3843/// RFC-013 reserves this enum slot; RFC-014 populates it with the
3844/// concrete composition vocabulary (`AllOf` + `Count`). The enum is
3845/// `#[non_exhaustive]` so RFC-016 or later RFCs may add variants
3846/// (`AnyOf` has been explicitly rejected per RFC-014 §2.3 in favour of
3847/// `Count { n: 1, .. }`; the guard exists for orthogonal future work).
3848#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
3849#[serde(tag = "kind")]
3850#[non_exhaustive]
3851pub enum CompositeBody {
3852 /// All listed sub-conditions must be satisfied. Order-independent.
3853 /// Once satisfied, further signals to member waitpoints are observed
3854 /// but do not re-open satisfaction. RFC-014 §2.1.
3855 AllOf {
3856 members: Vec<ResumeCondition>,
3857 },
3858 /// At least `n` distinct satisfiers (by [`CountKind`]) must match.
3859 /// `matcher` optionally constrains participating signals; `None`
3860 /// lets any signal on any of `waitpoints` count. RFC-014 §2.1.
3861 Count {
3862 n: u32,
3863 count_kind: CountKind,
3864 #[serde(default, skip_serializing_if = "Option::is_none")]
3865 matcher: Option<SignalMatcher>,
3866 waitpoints: Vec<String>,
3867 },
3868}
3869
3870/// How `Count` nodes distinguish satisfiers. RFC-014 §2.1 + §3.2.
3871#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
3872#[non_exhaustive]
3873pub enum CountKind {
3874 /// n distinct `waitpoint_id`s in `waitpoints` must fire.
3875 DistinctWaitpoints,
3876 /// n distinct `signal_id`s across the waitpoint set.
3877 DistinctSignals,
3878 /// n distinct `source_type:source_identity` tuples.
3879 DistinctSources,
3880}
3881
3882impl CompositeBody {
3883 /// `AllOf { members }` constructor (RFC-014 §10.3 SDK surface).
3884 pub fn all_of(members: impl IntoIterator<Item = ResumeCondition>) -> Self {
3885 Self::AllOf {
3886 members: members.into_iter().collect(),
3887 }
3888 }
3889
3890 /// `Count` constructor with explicit kind + waitpoint set.
3891 pub fn count(
3892 n: u32,
3893 count_kind: CountKind,
3894 matcher: Option<SignalMatcher>,
3895 waitpoints: impl IntoIterator<Item = String>,
3896 ) -> Self {
3897 Self::Count {
3898 n,
3899 count_kind,
3900 matcher,
3901 waitpoints: waitpoints.into_iter().collect(),
3902 }
3903 }
3904}
3905
3906/// Declarative resume condition for [`SuspendArgs::resume_condition`].
3907///
3908/// RFC-013 §2.4 — typed replacement for the SDK's former
3909/// `ConditionMatcher` / `resume_condition_json` pair.
3910#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
3911#[non_exhaustive]
3912pub enum ResumeCondition {
3913 /// Single waitpoint-key match with a predicate. `matcher` is
3914 /// evaluated against every signal delivered to `waitpoint_key`.
3915 Single {
3916 waitpoint_key: String,
3917 matcher: SignalMatcher,
3918 },
3919 /// Operator-only resume — no signal satisfies; only an explicit
3920 /// operator resume closes the waitpoint.
3921 OperatorOnly,
3922 /// Pure-timeout suspension. No signal satisfier; the waitpoint
3923 /// resolves only via `timeout_behavior` at `timeout_at`. Requires
3924 /// `SuspendArgs::timeout_at` to be `Some(_)` — otherwise the
3925 /// Rust-side validator rejects as `timeout_only_without_deadline`.
3926 TimeoutOnly,
3927 /// Multi-condition composition; RFC-014 defines the body.
3928 Composite(CompositeBody),
3929}
3930
3931/// RFC-014 §5.1 validation error shape. Emitted by
3932/// [`ResumeCondition::validate_composite`] when a composite fails a
3933/// structural / cardinality invariant at suspend-time, before any Valkey
3934/// call. Carries a human-readable `detail` per §5.1.1.
3935#[derive(Clone, Debug, PartialEq, Eq)]
3936pub struct CompositeValidationError {
3937 pub detail: String,
3938}
3939
3940impl CompositeValidationError {
3941 fn new(detail: impl Into<String>) -> Self {
3942 Self {
3943 detail: detail.into(),
3944 }
3945 }
3946}
3947
3948impl ResumeCondition {
3949 /// RFC-014 §10.3 builder — `AllOf` across N distinct waitpoints,
3950 /// each member a `Single { matcher: Wildcard }` leaf. Canonical
3951 /// Pattern 3 shape for heterogeneous-subsystem "all fired"
3952 /// semantics (e.g. `db-migration-complete` + `cache-warmed` +
3953 /// `feature-flag-set`).
3954 ///
3955 /// Callers that need per-waitpoint matchers should construct the
3956 /// tree directly via
3957 /// [`ResumeCondition::Composite(CompositeBody::all_of(..))`].
3958 pub fn all_of_waitpoints<I, S>(waitpoint_keys: I) -> Self
3959 where
3960 I: IntoIterator<Item = S>,
3961 S: Into<String>,
3962 {
3963 let members: Vec<ResumeCondition> = waitpoint_keys
3964 .into_iter()
3965 .map(|k| ResumeCondition::Single {
3966 waitpoint_key: k.into(),
3967 matcher: SignalMatcher::Wildcard,
3968 })
3969 .collect();
3970 ResumeCondition::Composite(CompositeBody::AllOf { members })
3971 }
3972
3973 /// Collect every distinct `waitpoint_key` the condition targets.
3974 /// Used at suspend-time to validate the condition's wp set against
3975 /// `SuspendArgs.waitpoints` (RFC-014 §5.1 multi-binding cross-
3976 /// check). Order follows tree DFS, de-duplicated preserving first
3977 /// occurrence.
3978 pub fn referenced_waitpoint_keys(&self) -> Vec<String> {
3979 let mut out: Vec<String> = Vec::new();
3980 let mut push = |k: &str| {
3981 if !out.iter().any(|e| e == k) {
3982 out.push(k.to_owned());
3983 }
3984 };
3985 fn walk(cond: &ResumeCondition, push: &mut dyn FnMut(&str)) {
3986 match cond {
3987 ResumeCondition::Single { waitpoint_key, .. } => push(waitpoint_key),
3988 ResumeCondition::Composite(body) => walk_body(body, push),
3989 _ => {}
3990 }
3991 }
3992 fn walk_body(body: &CompositeBody, push: &mut dyn FnMut(&str)) {
3993 match body {
3994 CompositeBody::AllOf { members } => {
3995 for m in members {
3996 walk(m, push);
3997 }
3998 }
3999 CompositeBody::Count { waitpoints, .. } => {
4000 for w in waitpoints {
4001 push(w.as_str());
4002 }
4003 }
4004 }
4005 }
4006 walk(self, &mut push);
4007 out
4008 }
4009
4010 /// Validate RFC-014 structural invariants on a composite condition.
4011 /// Single / OperatorOnly / TimeoutOnly return Ok — they carry no
4012 /// composite body. Checks cover:
4013 /// * `AllOf { members: [] }` — §5.1 `allof_empty_members`
4014 /// * `Count { n: 0 }` — §5.1 `count_n_zero`
4015 /// * `Count { waitpoints: [] }` — §5.1 `count_waitpoints_empty`
4016 /// * `Count { n > waitpoints.len(), DistinctWaitpoints }` — §5.1
4017 /// `count_exceeds_waitpoint_set`
4018 /// * depth > [`MAX_COMPOSITE_DEPTH`] — §5.1 `condition_depth_exceeded`
4019 pub fn validate_composite(&self) -> Result<(), CompositeValidationError> {
4020 match self {
4021 ResumeCondition::Composite(body) => validate_body(body, 1, ""),
4022 _ => Ok(()),
4023 }
4024 }
4025}
4026
4027fn validate_body(
4028 body: &CompositeBody,
4029 depth: usize,
4030 path: &str,
4031) -> Result<(), CompositeValidationError> {
4032 if depth > MAX_COMPOSITE_DEPTH {
4033 return Err(CompositeValidationError::new(format!(
4034 "depth {} exceeds cap {} at path {}",
4035 depth,
4036 MAX_COMPOSITE_DEPTH,
4037 if path.is_empty() { "<root>" } else { path }
4038 )));
4039 }
4040 match body {
4041 CompositeBody::AllOf { members } => {
4042 if members.is_empty() {
4043 return Err(CompositeValidationError::new(format!(
4044 "allof_empty_members at path {}",
4045 if path.is_empty() { "<root>" } else { path }
4046 )));
4047 }
4048 for (i, m) in members.iter().enumerate() {
4049 let child_path = if path.is_empty() {
4050 format!("members[{i}]")
4051 } else {
4052 format!("{path}.members[{i}]")
4053 };
4054 if let ResumeCondition::Composite(inner) = m {
4055 validate_body(inner, depth + 1, &child_path)?;
4056 }
4057 // Leaf `Single` / operator / timeout needs no further
4058 // structural checks — RFC-013 already constrains them.
4059 }
4060 Ok(())
4061 }
4062 CompositeBody::Count {
4063 n,
4064 count_kind,
4065 waitpoints,
4066 ..
4067 } => {
4068 if *n == 0 {
4069 return Err(CompositeValidationError::new(format!(
4070 "count_n_zero at path {}",
4071 if path.is_empty() { "<root>" } else { path }
4072 )));
4073 }
4074 if waitpoints.is_empty() {
4075 return Err(CompositeValidationError::new(format!(
4076 "count_waitpoints_empty at path {}",
4077 if path.is_empty() { "<root>" } else { path }
4078 )));
4079 }
4080 if matches!(count_kind, CountKind::DistinctWaitpoints)
4081 && (*n as usize) > waitpoints.len()
4082 {
4083 return Err(CompositeValidationError::new(format!(
4084 "count_exceeds_waitpoint_set: n={} > waitpoints.len()={} at path {}",
4085 n,
4086 waitpoints.len(),
4087 if path.is_empty() { "<root>" } else { path }
4088 )));
4089 }
4090 Ok(())
4091 }
4092 }
4093}
4094
4095/// Where a satisfied suspension routes back to.
4096///
4097/// v1 ships only [`ResumeTarget::Runnable`] — execution returns to
4098/// `runnable` and goes through normal scheduling.
4099#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
4100#[non_exhaustive]
4101pub enum ResumeTarget {
4102 Runnable,
4103}
4104
4105/// Resume-side policy carried alongside [`ResumeCondition`].
4106///
4107/// RFC-013 §2.5 — what happens when the condition is satisfied. Fields
4108/// mirror the `resume_policy_json` the backend serializer writes to Lua
4109/// (RFC-004 §Resume policy fields).
4110#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
4111#[non_exhaustive]
4112pub struct ResumePolicy {
4113 pub resume_target: ResumeTarget,
4114 pub consume_matched_signals: bool,
4115 pub retain_signal_buffer_until_closed: bool,
4116 #[serde(default, skip_serializing_if = "Option::is_none")]
4117 pub resume_delay_ms: Option<u64>,
4118 pub close_waitpoint_on_resume: bool,
4119}
4120
4121impl Default for ResumePolicy {
4122 fn default() -> Self {
4123 Self::normal()
4124 }
4125}
4126
4127impl ResumePolicy {
4128 /// Construct a [`ResumePolicy`] with the canonical v1 defaults
4129 /// (see [`Self::normal`]). Alias for [`Self::normal`] — provided
4130 /// so external consumers have a conventional `new` constructor
4131 /// against this `#[non_exhaustive]` struct.
4132 pub fn new() -> Self {
4133 Self::normal()
4134 }
4135
4136 /// Canonical v1 defaults (RFC-013 §2.2.1):
4137 /// * `resume_target = Runnable`
4138 /// * `consume_matched_signals = true`
4139 /// * `retain_signal_buffer_until_closed = false`
4140 /// * `resume_delay_ms = None`
4141 /// * `close_waitpoint_on_resume = true`
4142 pub fn normal() -> Self {
4143 Self {
4144 resume_target: ResumeTarget::Runnable,
4145 consume_matched_signals: true,
4146 retain_signal_buffer_until_closed: false,
4147 resume_delay_ms: None,
4148 close_waitpoint_on_resume: true,
4149 }
4150 }
4151}
4152
4153/// Timeout behavior at the suspension deadline (RFC-004 §Timeout Behavior).
4154#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
4155#[non_exhaustive]
4156pub enum TimeoutBehavior {
4157 Fail,
4158 Cancel,
4159 Expire,
4160 AutoResumeWithTimeoutSignal,
4161 /// v2 per RFC-004 Implementation Notes; enum slot present for
4162 /// additive RFC-014/RFC-015 landing.
4163 Escalate,
4164}
4165
4166impl TimeoutBehavior {
4167 /// Lua-side string encoding. Matches the wire values Lua's
4168 /// `ff_expire_suspension` matches on.
4169 pub fn as_wire_str(self) -> &'static str {
4170 match self {
4171 Self::Fail => "fail",
4172 Self::Cancel => "cancel",
4173 Self::Expire => "expire",
4174 Self::AutoResumeWithTimeoutSignal => "auto_resume_with_timeout_signal",
4175 Self::Escalate => "escalate",
4176 }
4177 }
4178}
4179
4180/// Reason category for a suspension (RFC-004 §Suspension Reason Categories).
4181#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
4182#[non_exhaustive]
4183pub enum SuspensionReasonCode {
4184 WaitingForSignal,
4185 WaitingForApproval,
4186 WaitingForCallback,
4187 WaitingForToolResult,
4188 WaitingForOperatorReview,
4189 PausedByPolicy,
4190 PausedByBudget,
4191 StepBoundary,
4192 ManualPause,
4193}
4194
4195impl SuspensionReasonCode {
4196 pub fn as_wire_str(self) -> &'static str {
4197 match self {
4198 Self::WaitingForSignal => "waiting_for_signal",
4199 Self::WaitingForApproval => "waiting_for_approval",
4200 Self::WaitingForCallback => "waiting_for_callback",
4201 Self::WaitingForToolResult => "waiting_for_tool_result",
4202 Self::WaitingForOperatorReview => "waiting_for_operator_review",
4203 Self::PausedByPolicy => "paused_by_policy",
4204 Self::PausedByBudget => "paused_by_budget",
4205 Self::StepBoundary => "step_boundary",
4206 Self::ManualPause => "manual_pause",
4207 }
4208 }
4209}
4210
4211/// Who requested the suspension.
4212#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
4213#[non_exhaustive]
4214pub enum SuspensionRequester {
4215 Worker,
4216 Operator,
4217 Policy,
4218 SystemTimeoutPolicy,
4219}
4220
4221impl SuspensionRequester {
4222 pub fn as_wire_str(self) -> &'static str {
4223 match self {
4224 Self::Worker => "worker",
4225 Self::Operator => "operator",
4226 Self::Policy => "policy",
4227 Self::SystemTimeoutPolicy => "system_timeout_policy",
4228 }
4229 }
4230}
4231
4232/// How the waitpoint resource backing a [`SuspendArgs`] is obtained.
4233///
4234/// RFC-013 §2.2 — `Fresh` mints a new waitpoint as part of `suspend`;
4235/// `UsePending` activates a waitpoint previously issued via
4236/// `EngineBackend::create_waitpoint`.
4237#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
4238#[non_exhaustive]
4239pub enum WaitpointBinding {
4240 Fresh {
4241 waitpoint_id: WaitpointId,
4242 waitpoint_key: String,
4243 },
4244 UsePending {
4245 waitpoint_id: WaitpointId,
4246 },
4247}
4248
4249impl WaitpointBinding {
4250 /// Mint a fresh binding with a random `waitpoint_id` (UUID v4) and
4251 /// `waitpoint_key = "wpk:<uuid>"`.
4252 pub fn fresh() -> Self {
4253 let wp_id = WaitpointId::new();
4254 let key = format!("wpk:{wp_id}");
4255 Self::Fresh {
4256 waitpoint_id: wp_id,
4257 waitpoint_key: key,
4258 }
4259 }
4260
4261 /// Construct a `UsePending` binding from a pending waitpoint
4262 /// previously issued by `create_waitpoint`. The HMAC token is
4263 /// resolved Lua-side from the partition's waitpoint hash at
4264 /// `suspend` time (RFC-013 §5.1).
4265 pub fn use_pending(pending: &crate::backend::PendingWaitpoint) -> Self {
4266 Self::UsePending {
4267 waitpoint_id: pending.waitpoint_id.clone(),
4268 }
4269 }
4270}
4271
4272/// Trait-surface input to [`EngineBackend::suspend`] (RFC-013 §2.2 +
4273/// RFC-014 Pattern 3 widening).
4274///
4275/// Built via [`SuspendArgs::new`] + `with_*` setters; direct struct-
4276/// literal construction across crate boundaries is not possible
4277/// (`#[non_exhaustive]`).
4278///
4279/// ## Waitpoints
4280///
4281/// `waitpoints` is a non-empty `Vec<WaitpointBinding>`. The first entry
4282/// is the "primary" binding (accessible via [`primary`](Self::primary))
4283/// and carries the `current_waitpoint_id` written onto `exec_core` for
4284/// operator visibility. Additional entries land in Valkey as their own
4285/// waitpoint hashes / signal streams / HMAC tokens, enabling RFC-014
4286/// Pattern 3 `AllOf { members: [Single{wp1}, Single{wp2}, ...] }` across
4287/// distinct heterogeneous subsystems.
4288///
4289/// [`SuspendArgs::new`] takes exactly the primary binding; call
4290/// [`with_waitpoint`](Self::with_waitpoint) to append further bindings
4291/// (the RFC-014 builder API).
4292#[derive(Clone, Debug, Serialize, Deserialize)]
4293#[non_exhaustive]
4294pub struct SuspendArgs {
4295 pub suspension_id: SuspensionId,
4296 /// RFC-014 Pattern 3: all waitpoint bindings for this suspension.
4297 /// Guaranteed non-empty; `waitpoints[0]` is the primary.
4298 pub waitpoints: Vec<WaitpointBinding>,
4299 pub resume_condition: ResumeCondition,
4300 pub resume_policy: ResumePolicy,
4301 pub reason_code: SuspensionReasonCode,
4302 pub requested_by: SuspensionRequester,
4303 #[serde(default, skip_serializing_if = "Option::is_none")]
4304 pub timeout_at: Option<TimestampMs>,
4305 pub timeout_behavior: TimeoutBehavior,
4306 #[serde(default, skip_serializing_if = "Option::is_none")]
4307 pub continuation_metadata_pointer: Option<String>,
4308 pub now: TimestampMs,
4309 #[serde(default, skip_serializing_if = "Option::is_none")]
4310 pub idempotency_key: Option<IdempotencyKey>,
4311}
4312
4313impl SuspendArgs {
4314 /// Build a minimal `SuspendArgs` for a worker-originated suspension.
4315 ///
4316 /// `waitpoint` becomes the primary binding. Append additional
4317 /// bindings with [`with_waitpoint`](Self::with_waitpoint) (RFC-014
4318 /// Pattern 3) or replace the set with
4319 /// [`with_waitpoints`](Self::with_waitpoints).
4320 ///
4321 /// Defaults: `requested_by = Worker`, `timeout_at = None`,
4322 /// `timeout_behavior = Fail`, `continuation_metadata_pointer = None`,
4323 /// `idempotency_key = None`.
4324 pub fn new(
4325 suspension_id: SuspensionId,
4326 waitpoint: WaitpointBinding,
4327 resume_condition: ResumeCondition,
4328 resume_policy: ResumePolicy,
4329 reason_code: SuspensionReasonCode,
4330 now: TimestampMs,
4331 ) -> Self {
4332 Self {
4333 suspension_id,
4334 waitpoints: vec![waitpoint],
4335 resume_condition,
4336 resume_policy,
4337 reason_code,
4338 requested_by: SuspensionRequester::Worker,
4339 timeout_at: None,
4340 timeout_behavior: TimeoutBehavior::Fail,
4341 continuation_metadata_pointer: None,
4342 now,
4343 idempotency_key: None,
4344 }
4345 }
4346
4347 /// Primary binding — `waitpoints[0]`. Guaranteed present by
4348 /// construction.
4349 pub fn primary(&self) -> &WaitpointBinding {
4350 &self.waitpoints[0]
4351 }
4352
4353 pub fn with_timeout(mut self, at: TimestampMs, behavior: TimeoutBehavior) -> Self {
4354 self.timeout_at = Some(at);
4355 self.timeout_behavior = behavior;
4356 self
4357 }
4358
4359 pub fn with_requester(mut self, requester: SuspensionRequester) -> Self {
4360 self.requested_by = requester;
4361 self
4362 }
4363
4364 pub fn with_continuation_metadata_pointer(mut self, p: String) -> Self {
4365 self.continuation_metadata_pointer = Some(p);
4366 self
4367 }
4368
4369 pub fn with_idempotency_key(mut self, key: IdempotencyKey) -> Self {
4370 self.idempotency_key = Some(key);
4371 self
4372 }
4373
4374 /// RFC-014 Pattern 3 — append a further waitpoint binding to this
4375 /// suspension. Each additional binding yields its own waitpoint
4376 /// hash, signal stream, condition hash and HMAC token in Valkey,
4377 /// but all share the suspension record and composite evaluator
4378 /// under one `suspension:current`.
4379 ///
4380 /// Ordering: the primary (from [`SuspendArgs::new`]) stays at
4381 /// `waitpoints[0]`; subsequent `with_waitpoint` calls append at the
4382 /// tail.
4383 pub fn with_waitpoint(mut self, binding: WaitpointBinding) -> Self {
4384 self.waitpoints.push(binding);
4385 self
4386 }
4387
4388 /// RFC-014 Pattern 3 — replace the full binding vector in one call.
4389 /// Must be non-empty; an empty Vec is a programmer error and will
4390 /// be rejected by the backend's `validate_suspend_args` with
4391 /// `waitpoints_empty`.
4392 pub fn with_waitpoints(mut self, bindings: Vec<WaitpointBinding>) -> Self {
4393 self.waitpoints = bindings;
4394 self
4395 }
4396}
4397
4398/// Shared "what happened on the waitpoint" payload carried in both
4399/// [`SuspendOutcome`] variants.
4400///
4401/// For Pattern 3 (RFC-014) — multi-waitpoint suspensions — the primary
4402/// binding's identity lives at the top level (`waitpoint_id` /
4403/// `waitpoint_key` / `waitpoint_token`) and remaining bindings are
4404/// exposed via `additional_waitpoints`, each carrying its own minted
4405/// HMAC token so external signallers can deliver to any of the N
4406/// waitpoints the suspension is listening on.
4407#[derive(Clone, Debug, PartialEq, Eq)]
4408#[non_exhaustive]
4409pub struct SuspendOutcomeDetails {
4410 pub suspension_id: SuspensionId,
4411 pub waitpoint_id: WaitpointId,
4412 pub waitpoint_key: String,
4413 pub waitpoint_token: WaitpointHmac,
4414 /// RFC-014 Pattern 3 extras (beyond the primary). Empty for
4415 /// single-waitpoint suspensions (patterns 1 + 2); carries one
4416 /// entry per additional binding for Pattern 3.
4417 pub additional_waitpoints: Vec<AdditionalWaitpointBinding>,
4418}
4419
4420/// RFC-014 Pattern 3 — per-binding identity + HMAC token for
4421/// waitpoints beyond the primary. Structure mirrors the top-level
4422/// fields on [`SuspendOutcomeDetails`].
4423#[derive(Clone, Debug, PartialEq, Eq)]
4424#[non_exhaustive]
4425pub struct AdditionalWaitpointBinding {
4426 pub waitpoint_id: WaitpointId,
4427 pub waitpoint_key: String,
4428 pub waitpoint_token: WaitpointHmac,
4429}
4430
4431impl AdditionalWaitpointBinding {
4432 pub fn new(
4433 waitpoint_id: WaitpointId,
4434 waitpoint_key: String,
4435 waitpoint_token: WaitpointHmac,
4436 ) -> Self {
4437 Self {
4438 waitpoint_id,
4439 waitpoint_key,
4440 waitpoint_token,
4441 }
4442 }
4443}
4444
4445impl SuspendOutcomeDetails {
4446 pub fn new(
4447 suspension_id: SuspensionId,
4448 waitpoint_id: WaitpointId,
4449 waitpoint_key: String,
4450 waitpoint_token: WaitpointHmac,
4451 ) -> Self {
4452 Self {
4453 suspension_id,
4454 waitpoint_id,
4455 waitpoint_key,
4456 waitpoint_token,
4457 additional_waitpoints: Vec::new(),
4458 }
4459 }
4460
4461 /// Attach RFC-014 Pattern 3 additional-waitpoint bindings. The
4462 /// primary binding stays at the top-level fields; `extras` lands
4463 /// in [`additional_waitpoints`](Self::additional_waitpoints).
4464 pub fn with_additional_waitpoints(
4465 mut self,
4466 extras: Vec<AdditionalWaitpointBinding>,
4467 ) -> Self {
4468 self.additional_waitpoints = extras;
4469 self
4470 }
4471}
4472
4473/// Trait-surface output from [`EngineBackend::suspend`] (RFC-013 §2.3).
4474///
4475/// Two variants encode the "lease released" vs "lease retained" split.
4476/// See §2.3 for the runtime-enforcement semantics.
4477#[derive(Clone, Debug, PartialEq, Eq)]
4478#[non_exhaustive]
4479pub enum SuspendOutcome {
4480 /// The worker's pre-suspend handle is no longer lease-bearing; a
4481 /// fresh `HandleKind::Suspended` handle supersedes it.
4482 Suspended {
4483 details: SuspendOutcomeDetails,
4484 handle: crate::backend::Handle,
4485 },
4486 /// Buffered signals on a pending waitpoint already satisfied the
4487 /// condition at suspension time; the lease is retained and the
4488 /// caller's pre-suspend handle remains valid.
4489 AlreadySatisfied { details: SuspendOutcomeDetails },
4490}
4491
4492impl SuspendOutcome {
4493 /// Borrow the shared details regardless of variant.
4494 pub fn details(&self) -> &SuspendOutcomeDetails {
4495 match self {
4496 Self::Suspended { details, .. } => details,
4497 Self::AlreadySatisfied { details } => details,
4498 }
4499 }
4500}
4501
4502// `EngineBackend::suspend` type re-exports for `ff_core::backend::*`
4503// consumers. The `backend` module re-exports these below so external
4504// crates can reach them via the idiomatic `ff_core::backend` path that
4505// already sources the other trait-surface types (RFC-013 §9.1).
4506
4507// ─── RFC-017 Stage A — trait-expansion Args/Result types ─────────────
4508//
4509// Per RFC-017 §5.1.1: every struct/enum introduced here is
4510// `#[non_exhaustive]` and ships with a `pub fn new(...)` constructor so
4511// additive field growth post-v0.8 does not force cross-crate churn.
4512
4513// ─── claim_for_worker ───
4514
4515/// Inputs to `EngineBackend::claim_for_worker` (RFC-017 §5, §7). The
4516/// Valkey impl forwards to `ff_scheduler::Scheduler::claim_for_worker`;
4517/// the Postgres impl forwards to its own scheduler module. The trait
4518/// method hides the backend-specific dispatch behind one shape.
4519#[non_exhaustive]
4520#[derive(Clone, Debug)]
4521pub struct ClaimForWorkerArgs {
4522 pub lane_id: LaneId,
4523 pub worker_id: WorkerId,
4524 pub worker_instance_id: WorkerInstanceId,
4525 pub worker_capabilities: std::collections::BTreeSet<String>,
4526 pub grant_ttl_ms: u64,
4527}
4528
4529impl ClaimForWorkerArgs {
4530 /// Required-field constructor. Optional fields today: none — kept
4531 /// for forward-compat so a future optional (e.g. `deadline_ms`)
4532 /// does not break callers using the builder pattern.
4533 pub fn new(
4534 lane_id: LaneId,
4535 worker_id: WorkerId,
4536 worker_instance_id: WorkerInstanceId,
4537 worker_capabilities: std::collections::BTreeSet<String>,
4538 grant_ttl_ms: u64,
4539 ) -> Self {
4540 Self {
4541 lane_id,
4542 worker_id,
4543 worker_instance_id,
4544 worker_capabilities,
4545 grant_ttl_ms,
4546 }
4547 }
4548}
4549
4550/// Outcome of `EngineBackend::claim_for_worker`. `None`-like shape
4551/// modelled as an enum so additive variants (e.g. `BackPressured {
4552/// retry_after_ms }`) do not force a wire break.
4553#[non_exhaustive]
4554#[derive(Clone, Debug, PartialEq, Eq)]
4555pub enum ClaimForWorkerOutcome {
4556 /// No eligible execution on this lane at this scan cycle.
4557 NoWork,
4558 /// Grant issued — worker proceeds to `claim_from_grant`.
4559 Granted(ClaimGrant),
4560}
4561
4562impl ClaimForWorkerOutcome {
4563 /// Build the `NoWork` variant.
4564 pub fn no_work() -> Self {
4565 Self::NoWork
4566 }
4567 /// Build the `Granted` variant.
4568 pub fn granted(grant: ClaimGrant) -> Self {
4569 Self::Granted(grant)
4570 }
4571}
4572
4573// ─── list_pending_waitpoints ───
4574
4575/// Inputs to `EngineBackend::list_pending_waitpoints` (RFC-017 §5, §8).
4576/// Pagination is part of the signature so a flow with 10k pending
4577/// waitpoints cannot force a single-round-trip read regardless of
4578/// backend.
4579#[non_exhaustive]
4580#[derive(Clone, Debug)]
4581pub struct ListPendingWaitpointsArgs {
4582 pub execution_id: ExecutionId,
4583 /// Exclusive cursor — `None` starts from the beginning.
4584 pub after: Option<WaitpointId>,
4585 /// Max page size. `None` → backend default (100). Backend-enforced
4586 /// cap: 1000.
4587 pub limit: Option<u32>,
4588}
4589
4590impl ListPendingWaitpointsArgs {
4591 pub fn new(execution_id: ExecutionId) -> Self {
4592 Self {
4593 execution_id,
4594 after: None,
4595 limit: None,
4596 }
4597 }
4598 pub fn with_after(mut self, after: WaitpointId) -> Self {
4599 self.after = Some(after);
4600 self
4601 }
4602 pub fn with_limit(mut self, limit: u32) -> Self {
4603 self.limit = Some(limit);
4604 self
4605 }
4606}
4607
4608/// Page of pending-waitpoint entries. Stage A preserves the existing
4609/// `PendingWaitpointInfo` shape; the §8 schema rewrite (HMAC
4610/// sanitisation + `(token_kid, token_fingerprint)` additive fields)
4611/// ships in Stage D alongside the HTTP wire-format deprecation.
4612#[non_exhaustive]
4613#[derive(Clone, Debug)]
4614pub struct ListPendingWaitpointsResult {
4615 pub entries: Vec<PendingWaitpointInfo>,
4616 /// Forward-only continuation cursor — `None` signals end-of-stream.
4617 pub next_cursor: Option<WaitpointId>,
4618}
4619
4620impl ListPendingWaitpointsResult {
4621 pub fn new(entries: Vec<PendingWaitpointInfo>) -> Self {
4622 Self {
4623 entries,
4624 next_cursor: None,
4625 }
4626 }
4627 pub fn with_next_cursor(mut self, cursor: WaitpointId) -> Self {
4628 self.next_cursor = Some(cursor);
4629 self
4630 }
4631}
4632
4633// ─── report_usage_admin ───
4634
4635/// Inputs to `EngineBackend::report_usage_admin` (RFC-017 §5 budget+
4636/// quota admin §5, round-1 F4). Admin-path peer of `report_usage` —
4637/// both wrap `ff_report_usage_and_check` on the Valkey side but the
4638/// admin call is worker-less, so it cannot reuse the lease-bound
4639/// `report_usage(&Handle, ...)` signature. `ReportUsageAdminArgs`
4640/// carries the same fields as [`ReportUsageArgs`] without a worker
4641/// handle — kept as a distinct type so future admin-only fields (e.g.
4642/// `actor_identity`, `audit_reason`) don't pollute the worker path.
4643#[non_exhaustive]
4644#[derive(Clone, Debug)]
4645pub struct ReportUsageAdminArgs {
4646 pub dimensions: Vec<String>,
4647 pub deltas: Vec<u64>,
4648 pub dedup_key: Option<String>,
4649 pub now: TimestampMs,
4650}
4651
4652impl ReportUsageAdminArgs {
4653 pub fn new(dimensions: Vec<String>, deltas: Vec<u64>, now: TimestampMs) -> Self {
4654 Self {
4655 dimensions,
4656 deltas,
4657 dedup_key: None,
4658 now,
4659 }
4660 }
4661 pub fn with_dedup_key(mut self, key: String) -> Self {
4662 self.dedup_key = Some(key);
4663 self
4664 }
4665}
4666
4667#[cfg(test)]
4668mod rfc_014_validation_tests {
4669 use super::*;
4670
4671 fn single(wp: &str) -> ResumeCondition {
4672 ResumeCondition::Single {
4673 waitpoint_key: wp.to_owned(),
4674 matcher: SignalMatcher::ByName("x".to_owned()),
4675 }
4676 }
4677
4678 #[test]
4679 fn single_passes_validate() {
4680 assert!(single("wpk:a").validate_composite().is_ok());
4681 }
4682
4683 #[test]
4684 fn allof_empty_members_rejected() {
4685 let c = ResumeCondition::Composite(CompositeBody::AllOf { members: vec![] });
4686 let e = c.validate_composite().unwrap_err();
4687 assert!(e.detail.contains("allof_empty_members"), "{}", e.detail);
4688 }
4689
4690 #[test]
4691 fn count_n_zero_rejected() {
4692 let c = ResumeCondition::Composite(CompositeBody::Count {
4693 n: 0,
4694 count_kind: CountKind::DistinctWaitpoints,
4695 matcher: None,
4696 waitpoints: vec!["wpk:a".to_owned()],
4697 });
4698 let e = c.validate_composite().unwrap_err();
4699 assert!(e.detail.contains("count_n_zero"), "{}", e.detail);
4700 }
4701
4702 #[test]
4703 fn count_waitpoints_empty_rejected() {
4704 let c = ResumeCondition::Composite(CompositeBody::Count {
4705 n: 1,
4706 count_kind: CountKind::DistinctSources,
4707 matcher: None,
4708 waitpoints: vec![],
4709 });
4710 let e = c.validate_composite().unwrap_err();
4711 assert!(e.detail.contains("count_waitpoints_empty"), "{}", e.detail);
4712 }
4713
4714 #[test]
4715 fn count_exceeds_waitpoint_set_rejected_only_for_distinct_waitpoints() {
4716 // n=3, only 2 waitpoints, DistinctWaitpoints → reject.
4717 let c = ResumeCondition::Composite(CompositeBody::Count {
4718 n: 3,
4719 count_kind: CountKind::DistinctWaitpoints,
4720 matcher: None,
4721 waitpoints: vec!["a".into(), "b".into()],
4722 });
4723 let e = c.validate_composite().unwrap_err();
4724 assert!(e.detail.contains("count_exceeds_waitpoint_set"), "{}", e.detail);
4725
4726 // Same cardinality, DistinctSignals → allowed (no upper bound).
4727 let c2 = ResumeCondition::Composite(CompositeBody::Count {
4728 n: 3,
4729 count_kind: CountKind::DistinctSignals,
4730 matcher: None,
4731 waitpoints: vec!["a".into(), "b".into()],
4732 });
4733 assert!(c2.validate_composite().is_ok());
4734 }
4735
4736 #[test]
4737 fn depth_4_accepted_depth_5_rejected() {
4738 // Build Depth-4: AllOf { AllOf { AllOf { AllOf { Single } } } }
4739 let leaf = single("wpk:leaf");
4740 let d4 = ResumeCondition::Composite(CompositeBody::AllOf {
4741 members: vec![ResumeCondition::Composite(CompositeBody::AllOf {
4742 members: vec![ResumeCondition::Composite(CompositeBody::AllOf {
4743 members: vec![ResumeCondition::Composite(CompositeBody::AllOf {
4744 members: vec![leaf.clone()],
4745 })],
4746 })],
4747 })],
4748 });
4749 assert!(d4.validate_composite().is_ok());
4750
4751 // Depth-5 → reject.
4752 let d5 = ResumeCondition::Composite(CompositeBody::AllOf {
4753 members: vec![d4],
4754 });
4755 let e = d5.validate_composite().unwrap_err();
4756 assert!(e.detail.contains("exceeds cap"), "{}", e.detail);
4757 }
4758}