Skip to main content

ff_core/
state.rs

1use std::fmt;
2
3use serde::{Deserialize, Serialize};
4
5// ── Dimension A — Lifecycle Phase ──
6
7/// What major phase of existence is this execution in?
8#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
9#[serde(rename_all = "snake_case")]
10pub enum LifecyclePhase {
11    /// Accepted by engine, not yet resolved to runnable/delayed. Transient.
12    Submitted,
13    /// Eligible or potentially eligible for claiming.
14    Runnable,
15    /// Currently owned by a worker lease and in progress.
16    Active,
17    /// Intentionally paused, waiting for signal/approval/callback.
18    Suspended,
19    /// Execution is finished. No further state transitions except replay.
20    Terminal,
21}
22
23// ── Dimension B — Ownership State ──
24
25/// Who, if anyone, is currently allowed to mutate active execution state?
26#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
27#[serde(rename_all = "snake_case")]
28pub enum OwnershipState {
29    /// No current lease.
30    Unowned,
31    /// A worker holds a valid lease.
32    Leased,
33    /// Lease TTL passed without renewal. Execution awaits reclaim.
34    LeaseExpiredReclaimable,
35    /// Lease was explicitly revoked by operator or engine.
36    LeaseRevoked,
37}
38
39// ── Dimension C — Eligibility State ──
40
41/// Can the execution be claimed for work right now?
42#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
43#[serde(rename_all = "snake_case")]
44pub enum EligibilityState {
45    /// Ready for claiming.
46    EligibleNow,
47    /// Delayed until a future timestamp.
48    NotEligibleUntilTime,
49    /// Waiting on upstream executions in a flow/DAG.
50    BlockedByDependencies,
51    /// Budget limit reached.
52    BlockedByBudget,
53    /// Quota or rate-limit reached.
54    BlockedByQuota,
55    /// No capable/available worker matches requirements.
56    BlockedByRoute,
57    /// Lane is paused or draining.
58    BlockedByLaneState,
59    /// Operator hold.
60    BlockedByOperator,
61    /// Used when lifecycle_phase is active, suspended, or terminal.
62    NotApplicable,
63}
64
65// ── Dimension D — Blocking Reason ──
66
67/// What is the most specific explanation for lack of forward progress?
68#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
69#[serde(rename_all = "snake_case")]
70pub enum BlockingReason {
71    /// Not blocked.
72    None,
73    /// Eligible but no worker has claimed yet.
74    WaitingForWorker,
75    /// Delayed for retry backoff.
76    WaitingForRetryBackoff,
77    /// Delayed after suspension resume.
78    WaitingForResumeDelay,
79    /// Worker-initiated explicit delay.
80    WaitingForDelay,
81    /// Suspended, waiting for a generic signal.
82    WaitingForSignal,
83    /// Suspended, waiting for human approval.
84    WaitingForApproval,
85    /// Suspended, waiting for external callback.
86    WaitingForCallback,
87    /// Suspended, waiting for tool completion.
88    WaitingForToolResult,
89    /// Blocked on child/dependency executions.
90    WaitingForChildren,
91    /// Budget exhausted.
92    WaitingForBudget,
93    /// Quota/rate-limit window full.
94    WaitingForQuota,
95    /// No worker with required capabilities available.
96    WaitingForCapableWorker,
97    /// No worker in required region/locality.
98    WaitingForLocalityMatch,
99    /// Operator placed a hold.
100    PausedByOperator,
101    /// Policy rule prevents progress (e.g. lane pause).
102    PausedByPolicy,
103    /// Flow cancellation with let_active_finish blocked this unclaimed member.
104    PausedByFlowCancel,
105}
106
107// ── Dimension E — Terminal Outcome ──
108
109/// If the execution is terminal, how did it end?
110#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
111#[serde(rename_all = "snake_case")]
112pub enum TerminalOutcome {
113    /// Not terminal.
114    None,
115    /// Completed successfully with result.
116    Success,
117    /// Failed after exhausting retries or by explicit failure.
118    Failed,
119    /// Intentionally terminated by user, operator, or policy.
120    Cancelled,
121    /// Deadline, TTL, or suspension timeout elapsed.
122    Expired,
123    /// Required dependency failed, making this execution impossible.
124    Skipped,
125}
126
127// ── Dimension F — Attempt State ──
128
129/// What is happening at the concrete run-attempt layer?
130#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
131#[serde(rename_all = "snake_case")]
132pub enum AttemptState {
133    /// No attempt context (e.g. freshly submitted, or skipped before any attempt).
134    None,
135    /// Awaiting initial claim.
136    PendingFirstAttempt,
137    /// An attempt is actively executing.
138    RunningAttempt,
139    /// Current attempt was interrupted (crash, reclaim, suspension, delay, waiting children).
140    AttemptInterrupted,
141    /// Awaiting retry after failure.
142    PendingRetryAttempt,
143    /// Awaiting replay after terminal state.
144    PendingReplayAttempt,
145    /// The final attempt has concluded.
146    AttemptTerminal,
147}
148
149// ── Derived: Public State ──
150
151/// Engine-computed user-facing label. Derived from the state vector.
152#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
153#[serde(rename_all = "snake_case")]
154#[non_exhaustive]
155pub enum PublicState {
156    /// Eligible and waiting for a worker to claim.
157    Waiting,
158    /// Not yet eligible due to time-based delay.
159    Delayed,
160    /// Blocked by budget, quota, or rate-limit policy.
161    RateLimited,
162    /// Blocked on child/dependency executions.
163    WaitingChildren,
164    /// Currently being processed by a worker.
165    Active,
166    /// Intentionally paused, waiting for signal/approval/callback.
167    Suspended,
168    /// Suspension signal satisfied — awaiting `claim_resumed` to start
169    /// the next attempt. Transient state between `Suspended` and
170    /// `Active`; emitted by the Postgres suspend_signal path (and the
171    /// Valkey equivalent via `ff_deliver_signal` when the waitpoint
172    /// fires). See `ff-backend-postgres::suspend_ops`.
173    Resumable,
174    /// Terminal: finished successfully.
175    Completed,
176    /// Terminal: finished unsuccessfully.
177    Failed,
178    /// Terminal: intentionally terminated.
179    Cancelled,
180    /// Terminal: deadline/TTL elapsed.
181    Expired,
182    /// Terminal: impossible to run because required dependency failed.
183    Skipped,
184}
185
186impl PublicState {
187    /// Snake-case string form. Round-trips with
188    /// `ff_script::functions::suspension::parse_public_state` and the
189    /// raw strings Postgres stores in `ff_exec_core.public_state`.
190    pub fn as_str(self) -> &'static str {
191        match self {
192            Self::Waiting => "waiting",
193            Self::Delayed => "delayed",
194            Self::RateLimited => "rate_limited",
195            Self::WaitingChildren => "waiting_children",
196            Self::Active => "active",
197            Self::Suspended => "suspended",
198            Self::Resumable => "resumable",
199            Self::Completed => "completed",
200            Self::Failed => "failed",
201            Self::Cancelled => "cancelled",
202            Self::Expired => "expired",
203            Self::Skipped => "skipped",
204        }
205    }
206}
207
208impl fmt::Display for PublicState {
209    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
210        f.write_str(self.as_str())
211    }
212}
213
214// ── State Vector ──
215
216/// The full 6+1 dimension execution state vector.
217#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
218pub struct StateVector {
219    pub lifecycle_phase: LifecyclePhase,
220    pub ownership_state: OwnershipState,
221    pub eligibility_state: EligibilityState,
222    pub blocking_reason: BlockingReason,
223    pub terminal_outcome: TerminalOutcome,
224    pub attempt_state: AttemptState,
225    /// Engine-derived user-facing label.
226    pub public_state: PublicState,
227}
228
229impl StateVector {
230    /// Derive public_state from the other 6 dimensions per RFC-001 §2.4.
231    ///
232    /// Never panics. In distributed systems, constraint violations can occur
233    /// via partial writes or reconciler drift. Impossible combinations log a
234    /// warning and return a safe fallback instead of crashing.
235    pub fn derive_public_state(&self) -> PublicState {
236        match self.lifecycle_phase {
237            LifecyclePhase::Terminal => match self.terminal_outcome {
238                TerminalOutcome::Success => PublicState::Completed,
239                TerminalOutcome::Failed => PublicState::Failed,
240                TerminalOutcome::Cancelled => PublicState::Cancelled,
241                TerminalOutcome::Expired => PublicState::Expired,
242                TerminalOutcome::Skipped => PublicState::Skipped,
243                TerminalOutcome::None => {
244                    // V4 violation: terminal without outcome. Corrupt state —
245                    // surface as Failed so operators notice and investigate.
246                    // No logging here (ff-core has no tracing dep); callers
247                    // detect this via is_consistent() returning false.
248                    PublicState::Failed
249                }
250            },
251            LifecyclePhase::Suspended => PublicState::Suspended,
252            LifecyclePhase::Active => PublicState::Active,
253            LifecyclePhase::Runnable => match self.eligibility_state {
254                EligibilityState::EligibleNow => PublicState::Waiting,
255                EligibilityState::NotEligibleUntilTime => PublicState::Delayed,
256                EligibilityState::BlockedByDependencies => PublicState::WaitingChildren,
257                EligibilityState::BlockedByBudget | EligibilityState::BlockedByQuota => {
258                    PublicState::RateLimited
259                }
260                EligibilityState::BlockedByRoute
261                | EligibilityState::BlockedByLaneState
262                | EligibilityState::BlockedByOperator => PublicState::Waiting,
263                EligibilityState::NotApplicable => {
264                    // Constraint violation: runnable should not have not_applicable.
265                    // Surface as Waiting — the index reconciler will correct this.
266                    PublicState::Waiting
267                }
268            },
269            LifecyclePhase::Submitted => PublicState::Waiting,
270        }
271    }
272
273    /// Check if the stored public_state matches the derived value.
274    pub fn is_consistent(&self) -> bool {
275        self.public_state == self.derive_public_state()
276    }
277}
278
279// ── Attempt Lifecycle (RFC-002) ──
280
281/// Per-attempt lifecycle states.
282#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
283#[serde(rename_all = "snake_case")]
284pub enum AttemptLifecycle {
285    /// Attempt record exists; worker has not yet acquired a lease.
286    Created,
287    /// Worker has acquired a lease and is actively executing.
288    Started,
289    /// Attempt is paused because the execution intentionally suspended.
290    Suspended,
291    /// Attempt completed successfully.
292    EndedSuccess,
293    /// Attempt failed.
294    EndedFailure,
295    /// Attempt was cancelled.
296    EndedCancelled,
297    /// Attempt was interrupted by lease expiry/revocation and the execution was reclaimed.
298    InterruptedReclaimed,
299}
300
301impl AttemptLifecycle {
302    /// Whether this attempt lifecycle state is terminal.
303    pub fn is_terminal(self) -> bool {
304        matches!(
305            self,
306            Self::EndedSuccess
307                | Self::EndedFailure
308                | Self::EndedCancelled
309                | Self::InterruptedReclaimed
310        )
311    }
312}
313
314// ── Attempt Type (RFC-002) ──
315
316/// Why this attempt was created.
317#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
318#[serde(rename_all = "snake_case")]
319pub enum AttemptType {
320    /// First attempt for this execution.
321    Initial,
322    /// Retry after a failed attempt.
323    Retry,
324    /// Reclaim after lease expiry or revocation.
325    Reclaim,
326    /// Replay of a terminal execution.
327    Replay,
328    /// Fallback progression to next provider/model.
329    Fallback,
330}
331
332// ── Lane State (RFC-009) ──
333
334/// Lane operational state.
335#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
336#[serde(rename_all = "snake_case")]
337pub enum LaneState {
338    /// Lane is accepting and processing work.
339    Active,
340    /// Lane is accepting submissions but not claiming (paused).
341    Paused,
342    /// Lane is not accepting new submissions but processing existing.
343    Draining,
344    /// Lane is disabled.
345    Disabled,
346}
347
348#[cfg(test)]
349mod tests {
350    use super::*;
351
352    #[test]
353    fn derive_public_state_terminal_success() {
354        let sv = StateVector {
355            lifecycle_phase: LifecyclePhase::Terminal,
356            ownership_state: OwnershipState::Unowned,
357            eligibility_state: EligibilityState::NotApplicable,
358            blocking_reason: BlockingReason::None,
359            terminal_outcome: TerminalOutcome::Success,
360            attempt_state: AttemptState::AttemptTerminal,
361            public_state: PublicState::Completed,
362        };
363        assert_eq!(sv.derive_public_state(), PublicState::Completed);
364        assert!(sv.is_consistent());
365    }
366
367    #[test]
368    fn derive_public_state_active() {
369        let sv = StateVector {
370            lifecycle_phase: LifecyclePhase::Active,
371            ownership_state: OwnershipState::Leased,
372            eligibility_state: EligibilityState::NotApplicable,
373            blocking_reason: BlockingReason::None,
374            terminal_outcome: TerminalOutcome::None,
375            attempt_state: AttemptState::RunningAttempt,
376            public_state: PublicState::Active,
377        };
378        assert_eq!(sv.derive_public_state(), PublicState::Active);
379        assert!(sv.is_consistent());
380    }
381
382    #[test]
383    fn derive_public_state_active_lease_expired_still_active() {
384        // D3: Active dominates ownership nuance
385        let sv = StateVector {
386            lifecycle_phase: LifecyclePhase::Active,
387            ownership_state: OwnershipState::LeaseExpiredReclaimable,
388            eligibility_state: EligibilityState::NotApplicable,
389            blocking_reason: BlockingReason::None,
390            terminal_outcome: TerminalOutcome::None,
391            attempt_state: AttemptState::AttemptInterrupted,
392            public_state: PublicState::Active,
393        };
394        assert_eq!(sv.derive_public_state(), PublicState::Active);
395    }
396
397    #[test]
398    fn derive_public_state_runnable_eligible() {
399        let sv = StateVector {
400            lifecycle_phase: LifecyclePhase::Runnable,
401            ownership_state: OwnershipState::Unowned,
402            eligibility_state: EligibilityState::EligibleNow,
403            blocking_reason: BlockingReason::WaitingForWorker,
404            terminal_outcome: TerminalOutcome::None,
405            attempt_state: AttemptState::PendingFirstAttempt,
406            public_state: PublicState::Waiting,
407        };
408        assert_eq!(sv.derive_public_state(), PublicState::Waiting);
409    }
410
411    #[test]
412    fn derive_public_state_delayed() {
413        let sv = StateVector {
414            lifecycle_phase: LifecyclePhase::Runnable,
415            ownership_state: OwnershipState::Unowned,
416            eligibility_state: EligibilityState::NotEligibleUntilTime,
417            blocking_reason: BlockingReason::WaitingForRetryBackoff,
418            terminal_outcome: TerminalOutcome::None,
419            attempt_state: AttemptState::PendingRetryAttempt,
420            public_state: PublicState::Delayed,
421        };
422        assert_eq!(sv.derive_public_state(), PublicState::Delayed);
423    }
424
425    #[test]
426    fn derive_public_state_waiting_children() {
427        let sv = StateVector {
428            lifecycle_phase: LifecyclePhase::Runnable,
429            ownership_state: OwnershipState::Unowned,
430            eligibility_state: EligibilityState::BlockedByDependencies,
431            blocking_reason: BlockingReason::WaitingForChildren,
432            terminal_outcome: TerminalOutcome::None,
433            attempt_state: AttemptState::PendingFirstAttempt,
434            public_state: PublicState::WaitingChildren,
435        };
436        assert_eq!(sv.derive_public_state(), PublicState::WaitingChildren);
437    }
438
439    #[test]
440    fn derive_public_state_rate_limited() {
441        let sv = StateVector {
442            lifecycle_phase: LifecyclePhase::Runnable,
443            ownership_state: OwnershipState::Unowned,
444            eligibility_state: EligibilityState::BlockedByBudget,
445            blocking_reason: BlockingReason::WaitingForBudget,
446            terminal_outcome: TerminalOutcome::None,
447            attempt_state: AttemptState::PendingFirstAttempt,
448            public_state: PublicState::RateLimited,
449        };
450        assert_eq!(sv.derive_public_state(), PublicState::RateLimited);
451    }
452
453    #[test]
454    fn derive_public_state_suspended() {
455        let sv = StateVector {
456            lifecycle_phase: LifecyclePhase::Suspended,
457            ownership_state: OwnershipState::Unowned,
458            eligibility_state: EligibilityState::NotApplicable,
459            blocking_reason: BlockingReason::WaitingForApproval,
460            terminal_outcome: TerminalOutcome::None,
461            attempt_state: AttemptState::AttemptInterrupted,
462            public_state: PublicState::Suspended,
463        };
464        assert_eq!(sv.derive_public_state(), PublicState::Suspended);
465    }
466
467    #[test]
468    fn derive_public_state_submitted_collapses_to_waiting() {
469        let sv = StateVector {
470            lifecycle_phase: LifecyclePhase::Submitted,
471            ownership_state: OwnershipState::Unowned,
472            eligibility_state: EligibilityState::NotApplicable,
473            blocking_reason: BlockingReason::None,
474            terminal_outcome: TerminalOutcome::None,
475            attempt_state: AttemptState::None,
476            public_state: PublicState::Waiting,
477        };
478        assert_eq!(sv.derive_public_state(), PublicState::Waiting);
479    }
480
481    #[test]
482    fn derive_public_state_skipped() {
483        let sv = StateVector {
484            lifecycle_phase: LifecyclePhase::Terminal,
485            ownership_state: OwnershipState::Unowned,
486            eligibility_state: EligibilityState::NotApplicable,
487            blocking_reason: BlockingReason::None,
488            terminal_outcome: TerminalOutcome::Skipped,
489            attempt_state: AttemptState::None,
490            public_state: PublicState::Skipped,
491        };
492        assert_eq!(sv.derive_public_state(), PublicState::Skipped);
493    }
494
495    #[test]
496    fn attempt_lifecycle_terminal_check() {
497        assert!(AttemptLifecycle::EndedSuccess.is_terminal());
498        assert!(AttemptLifecycle::EndedFailure.is_terminal());
499        assert!(AttemptLifecycle::EndedCancelled.is_terminal());
500        assert!(AttemptLifecycle::InterruptedReclaimed.is_terminal());
501        assert!(!AttemptLifecycle::Created.is_terminal());
502        assert!(!AttemptLifecycle::Started.is_terminal());
503        assert!(!AttemptLifecycle::Suspended.is_terminal());
504    }
505
506    #[test]
507    fn public_state_resumable_roundtrip_display_and_serde() {
508        // Display ↔ snake_case form
509        assert_eq!(PublicState::Resumable.to_string(), "resumable");
510        assert_eq!(PublicState::Resumable.as_str(), "resumable");
511
512        // serde round-trip using the snake_case form
513        let json = serde_json::to_string(&PublicState::Resumable).unwrap();
514        assert_eq!(json, "\"resumable\"");
515        let parsed: PublicState = serde_json::from_str(&json).unwrap();
516        assert_eq!(parsed, PublicState::Resumable);
517    }
518
519    #[test]
520    fn serde_roundtrip_lifecycle_phase() {
521        let phase = LifecyclePhase::Active;
522        let json = serde_json::to_string(&phase).unwrap();
523        assert_eq!(json, "\"active\"");
524        let parsed: LifecyclePhase = serde_json::from_str(&json).unwrap();
525        assert_eq!(parsed, phase);
526    }
527
528    #[test]
529    fn serde_roundtrip_blocking_reason() {
530        let reason = BlockingReason::PausedByFlowCancel;
531        let json = serde_json::to_string(&reason).unwrap();
532        assert_eq!(json, "\"paused_by_flow_cancel\"");
533        let parsed: BlockingReason = serde_json::from_str(&json).unwrap();
534        assert_eq!(parsed, reason);
535    }
536
537    #[test]
538    fn serde_roundtrip_state_vector() {
539        let sv = StateVector {
540            lifecycle_phase: LifecyclePhase::Active,
541            ownership_state: OwnershipState::Leased,
542            eligibility_state: EligibilityState::NotApplicable,
543            blocking_reason: BlockingReason::None,
544            terminal_outcome: TerminalOutcome::None,
545            attempt_state: AttemptState::RunningAttempt,
546            public_state: PublicState::Active,
547        };
548        let json = serde_json::to_string(&sv).unwrap();
549        let parsed: StateVector = serde_json::from_str(&json).unwrap();
550        assert_eq!(sv, parsed);
551    }
552}