Skip to main content

rustvello_proto/status/
mod.rs

1mod concurrency;
2mod machine;
3#[cfg(test)]
4mod tests;
5
6use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use std::fmt;
9use std::str::FromStr;
10use std::sync::LazyLock;
11
12use crate::identifiers::RunnerId;
13
14pub use concurrency::ConcurrencyControlType;
15pub use machine::{
16    compute_new_owner, status_record_transition, validate_ownership, validate_transition,
17    OwnershipError, StatusMachineError, StatusTransitionError,
18};
19
20// ============================================================================
21// InvocationStatus enum
22// ============================================================================
23
24/// The lifecycle status of an invocation.
25///
26/// Follows a strict state machine — not all transitions are valid.
27/// Mirrors pynenc's `InvocationStatus` enum exactly.
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
29#[non_exhaustive]
30pub enum InvocationStatus {
31    /// Invocation created and registered
32    Registered,
33    /// Paused due to concurrency control (transient — will be rerouted)
34    ConcurrencyControlled,
35    /// Permanently blocked by concurrency control
36    ConcurrencyControlledFinal,
37    /// Re-queued in the broker for another execution attempt
38    Rerouted,
39    /// Queued in the broker, waiting for a runner
40    Pending,
41    /// Pending recovery after runner failure (timeout exceeded)
42    PendingRecovery,
43    /// Currently being executed by a runner
44    Running,
45    /// Being re-executed during recovery (owner runner inactive)
46    RunningRecovery,
47    /// Task execution is paused
48    Paused,
49    /// Task execution has been resumed after pause
50    Resumed,
51    /// Task execution has been killed
52    Killed,
53    /// Completed successfully
54    Success,
55    /// Failed with an error
56    Failed,
57    /// Marked for retry after a failure
58    Retry,
59}
60
61/// All status variants, for iteration.
62pub const ALL_STATUSES: &[InvocationStatus] = &[
63    InvocationStatus::Registered,
64    InvocationStatus::ConcurrencyControlled,
65    InvocationStatus::ConcurrencyControlledFinal,
66    InvocationStatus::Rerouted,
67    InvocationStatus::Pending,
68    InvocationStatus::PendingRecovery,
69    InvocationStatus::Running,
70    InvocationStatus::RunningRecovery,
71    InvocationStatus::Paused,
72    InvocationStatus::Resumed,
73    InvocationStatus::Killed,
74    InvocationStatus::Success,
75    InvocationStatus::Failed,
76    InvocationStatus::Retry,
77];
78
79impl InvocationStatus {
80    /// Returns true if this is a terminal (final) status.
81    #[inline]
82    pub fn is_terminal(&self) -> bool {
83        STATUS_CONFIG.definition(*self).is_final
84    }
85
86    /// Returns true if this status means the invocation can be picked up by a runner.
87    #[inline]
88    pub fn is_available_for_run(&self) -> bool {
89        STATUS_CONFIG.definition(*self).available_for_run
90    }
91
92    /// Returns the set of valid next states from this status.
93    #[inline]
94    pub fn valid_transitions(&self) -> &[InvocationStatus] {
95        &STATUS_CONFIG.definition(*self).allowed_transitions
96    }
97
98    /// Check if transitioning to `next` is valid.
99    #[inline]
100    pub fn can_transition_to(&self, next: InvocationStatus) -> bool {
101        self.valid_transitions().contains(&next)
102    }
103
104    /// Returns all terminal statuses.
105    pub fn final_statuses() -> &'static [InvocationStatus] {
106        &STATUS_CONFIG.final_statuses
107    }
108
109    /// Returns all statuses where invocations can be picked up by runners.
110    pub fn available_for_run_statuses() -> &'static [InvocationStatus] {
111        &STATUS_CONFIG.available_for_run_statuses
112    }
113}
114
115impl fmt::Display for InvocationStatus {
116    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117        match self {
118            Self::Registered => write!(f, "REGISTERED"),
119            Self::ConcurrencyControlled => write!(f, "CONCURRENCY_CONTROLLED"),
120            Self::ConcurrencyControlledFinal => write!(f, "CONCURRENCY_CONTROLLED_FINAL"),
121            Self::Rerouted => write!(f, "REROUTED"),
122            Self::Pending => write!(f, "PENDING"),
123            Self::PendingRecovery => write!(f, "PENDING_RECOVERY"),
124            Self::Running => write!(f, "RUNNING"),
125            Self::RunningRecovery => write!(f, "RUNNING_RECOVERY"),
126            Self::Paused => write!(f, "PAUSED"),
127            Self::Resumed => write!(f, "RESUMED"),
128            Self::Killed => write!(f, "KILLED"),
129            Self::Success => write!(f, "SUCCESS"),
130            Self::Failed => write!(f, "FAILED"),
131            Self::Retry => write!(f, "RETRY"),
132        }
133    }
134}
135
136impl FromStr for InvocationStatus {
137    type Err = String;
138
139    fn from_str(s: &str) -> Result<Self, Self::Err> {
140        match s.to_uppercase().as_str() {
141            "REGISTERED" => Ok(Self::Registered),
142            "CONCURRENCY_CONTROLLED" => Ok(Self::ConcurrencyControlled),
143            "CONCURRENCY_CONTROLLED_FINAL" => Ok(Self::ConcurrencyControlledFinal),
144            "REROUTED" => Ok(Self::Rerouted),
145            "PENDING" => Ok(Self::Pending),
146            "PENDING_RECOVERY" => Ok(Self::PendingRecovery),
147            "RUNNING" => Ok(Self::Running),
148            "RUNNING_RECOVERY" => Ok(Self::RunningRecovery),
149            "PAUSED" => Ok(Self::Paused),
150            "RESUMED" => Ok(Self::Resumed),
151            "KILLED" => Ok(Self::Killed),
152            "SUCCESS" => Ok(Self::Success),
153            "FAILED" => Ok(Self::Failed),
154            "RETRY" => Ok(Self::Retry),
155            other => Err(format!("unknown invocation status: {other}")),
156        }
157    }
158}
159
160// ============================================================================
161// InvocationStatusRecord
162// ============================================================================
163
164/// A status change record with ownership and timestamp.
165#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
166pub struct InvocationStatusRecord {
167    pub status: InvocationStatus,
168    pub runner_id: Option<RunnerId>,
169    pub timestamp: DateTime<Utc>,
170}
171
172impl InvocationStatusRecord {
173    pub fn new(status: InvocationStatus, runner_id: Option<RunnerId>) -> Self {
174        Self {
175            status,
176            runner_id,
177            timestamp: Utc::now(),
178        }
179    }
180}
181
182// ============================================================================
183// StatusDefinition — declarative rules for each status
184// ============================================================================
185
186/// Declarative definition of status behavior and ownership rules.
187///
188/// Mirrors pynenc's `StatusDefinition` dataclass exactly.
189#[derive(Debug, Clone)]
190pub struct StatusDefinition {
191    /// Valid next statuses from this status.
192    pub allowed_transitions: Vec<InvocationStatus>,
193    /// Terminates invocation lifecycle.
194    pub is_final: bool,
195    /// Can be picked up by runners via broker.
196    pub available_for_run: bool,
197    /// Only the owning runner can modify when in this status.
198    pub requires_ownership: bool,
199    /// Claims ownership on entry (sets runner_id).
200    pub acquires_ownership: bool,
201    /// Releases ownership on entry (clears runner_id).
202    pub releases_ownership: bool,
203    /// Bypasses ownership validation (for recovery scenarios).
204    pub overrides_ownership: bool,
205}
206
207impl StatusDefinition {
208    const fn new() -> Self {
209        Self {
210            allowed_transitions: Vec::new(),
211            is_final: false,
212            available_for_run: false,
213            requires_ownership: false,
214            acquires_ownership: false,
215            releases_ownership: false,
216            overrides_ownership: false,
217        }
218    }
219}
220
221// ============================================================================
222// StatusConfiguration — complete config for all statuses
223// ============================================================================
224
225/// Complete configuration for invocation status behavior.
226pub(super) struct StatusConfiguration {
227    /// Definition for the "no status yet" (None → Registered) transition.
228    pub(super) initial: StatusDefinition,
229    /// Definitions indexed by status.
230    definitions: Vec<(InvocationStatus, StatusDefinition)>,
231    /// Cached: all terminal statuses.
232    pub(super) final_statuses: Vec<InvocationStatus>,
233    /// Cached: all available-for-run statuses.
234    pub(super) available_for_run_statuses: Vec<InvocationStatus>,
235}
236
237impl StatusConfiguration {
238    pub(super) fn definition(&self, status: InvocationStatus) -> &StatusDefinition {
239        self.definitions
240            .iter()
241            .find(|(s, _)| *s == status)
242            .map_or_else(
243                || panic!("missing StatusDefinition for {status:?}"),
244                |(_, d)| d,
245            )
246    }
247}
248
249/// Build the static status configuration. Mirrors pynenc's `_CONFIG` exactly.
250fn build_config() -> StatusConfiguration {
251    use InvocationStatus::*;
252
253    let initial = StatusDefinition {
254        allowed_transitions: vec![Registered],
255        ..StatusDefinition::new()
256    };
257
258    let definitions = vec![
259        (
260            Registered,
261            StatusDefinition {
262                allowed_transitions: vec![
263                    Pending,
264                    ConcurrencyControlled,
265                    ConcurrencyControlledFinal,
266                ],
267                available_for_run: true,
268                releases_ownership: true,
269                ..StatusDefinition::new()
270            },
271        ),
272        (
273            ConcurrencyControlled,
274            StatusDefinition {
275                allowed_transitions: vec![Rerouted],
276                releases_ownership: true,
277                ..StatusDefinition::new()
278            },
279        ),
280        (
281            Rerouted,
282            StatusDefinition {
283                allowed_transitions: vec![Pending, ConcurrencyControlled],
284                available_for_run: true,
285                releases_ownership: true,
286                ..StatusDefinition::new()
287            },
288        ),
289        (
290            Pending,
291            StatusDefinition {
292                // An invocation can FAIL without running by the CYCLE-CONTROL mechanism
293                // to avoid deadlocks.
294                // PENDING_RECOVERY is for timeout recovery without ownership validation.
295                allowed_transitions: vec![Running, Killed, Rerouted, Failed, PendingRecovery],
296                requires_ownership: true,
297                acquires_ownership: true,
298                ..StatusDefinition::new()
299            },
300        ),
301        (
302            PendingRecovery,
303            StatusDefinition {
304                allowed_transitions: vec![Rerouted],
305                releases_ownership: true,
306                overrides_ownership: true,
307                ..StatusDefinition::new()
308            },
309        ),
310        (
311            Running,
312            StatusDefinition {
313                allowed_transitions: vec![Paused, Killed, Retry, Success, Failed, RunningRecovery],
314                requires_ownership: true,
315                ..StatusDefinition::new()
316            },
317        ),
318        (
319            RunningRecovery,
320            StatusDefinition {
321                allowed_transitions: vec![Rerouted],
322                releases_ownership: true,
323                overrides_ownership: true,
324                ..StatusDefinition::new()
325            },
326        ),
327        (
328            Paused,
329            StatusDefinition {
330                allowed_transitions: vec![Resumed, Killed],
331                requires_ownership: true,
332                ..StatusDefinition::new()
333            },
334        ),
335        (
336            Resumed,
337            StatusDefinition {
338                allowed_transitions: vec![Paused, Killed, Retry, Success, Failed],
339                requires_ownership: true,
340                ..StatusDefinition::new()
341            },
342        ),
343        (
344            Killed,
345            StatusDefinition {
346                allowed_transitions: vec![Rerouted],
347                releases_ownership: true,
348                ..StatusDefinition::new()
349            },
350        ),
351        (
352            Retry,
353            StatusDefinition {
354                allowed_transitions: vec![Pending],
355                available_for_run: true,
356                releases_ownership: true,
357                ..StatusDefinition::new()
358            },
359        ),
360        (
361            Success,
362            StatusDefinition {
363                is_final: true,
364                releases_ownership: true,
365                ..StatusDefinition::new()
366            },
367        ),
368        (
369            Failed,
370            StatusDefinition {
371                is_final: true,
372                releases_ownership: true,
373                ..StatusDefinition::new()
374            },
375        ),
376        (
377            ConcurrencyControlledFinal,
378            StatusDefinition {
379                is_final: true,
380                releases_ownership: true,
381                ..StatusDefinition::new()
382            },
383        ),
384    ];
385
386    let final_statuses: Vec<_> = definitions
387        .iter()
388        .filter(|(_, d)| d.is_final)
389        .map(|(s, _)| *s)
390        .collect();
391    let available_for_run_statuses: Vec<_> = definitions
392        .iter()
393        .filter(|(_, d)| d.available_for_run)
394        .map(|(s, _)| *s)
395        .collect();
396
397    StatusConfiguration {
398        initial,
399        definitions,
400        final_statuses,
401        available_for_run_statuses,
402    }
403}
404
405pub(super) static STATUS_CONFIG: LazyLock<StatusConfiguration> = LazyLock::new(build_config);
406
407/// Get the status definition for a given status.
408pub fn get_status_definition(status: InvocationStatus) -> &'static StatusDefinition {
409    STATUS_CONFIG.definition(status)
410}
411
412/// Get the status definition for the initial (None) state.
413pub fn get_initial_definition() -> &'static StatusDefinition {
414    &STATUS_CONFIG.initial
415}