celers_core/
state.rs

1use serde::{Deserialize, Serialize};
2use std::collections::HashMap;
3use std::fmt;
4use std::time::{SystemTime, UNIX_EPOCH};
5
6/// Task state enumeration with strict state machine transitions
7#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
8pub enum TaskState {
9    /// Task is queued but not yet picked up by a worker
10    Pending,
11
12    /// Task has been received by a worker
13    Received,
14
15    /// Task has been reserved by a worker but not yet executed
16    Reserved,
17
18    /// Task is currently being executed
19    Running,
20
21    /// Task is being retried (includes retry count)
22    Retrying(u32),
23
24    /// Task completed successfully with result
25    Succeeded(Vec<u8>),
26
27    /// Task failed with error message
28    Failed(String),
29
30    /// Task has been revoked
31    Revoked,
32
33    /// Task has been rejected
34    Rejected,
35
36    /// Custom user-defined state with optional metadata
37    Custom {
38        /// Custom state name
39        name: String,
40        /// Optional custom metadata as JSON bytes
41        metadata: Option<Vec<u8>>,
42    },
43}
44
45impl TaskState {
46    /// Check if the task is in a terminal state
47    #[inline]
48    #[must_use]
49    pub const fn is_terminal(&self) -> bool {
50        matches!(
51            self,
52            TaskState::Succeeded(_)
53                | TaskState::Failed(_)
54                | TaskState::Revoked
55                | TaskState::Rejected
56        )
57    }
58
59    /// Create a custom state with a name
60    #[must_use]
61    pub fn custom(name: impl Into<String>) -> Self {
62        Self::Custom {
63            name: name.into(),
64            metadata: None,
65        }
66    }
67
68    /// Create a custom state with name and JSON metadata
69    #[must_use]
70    pub fn custom_with_metadata(name: impl Into<String>, metadata: Vec<u8>) -> Self {
71        Self::Custom {
72            name: name.into(),
73            metadata: Some(metadata),
74        }
75    }
76
77    /// Check if this is a custom state
78    #[inline]
79    #[must_use]
80    pub const fn is_custom(&self) -> bool {
81        matches!(self, TaskState::Custom { .. })
82    }
83
84    /// Get the custom state name if this is a custom state
85    #[inline]
86    #[must_use]
87    pub fn custom_name(&self) -> Option<&str> {
88        match self {
89            TaskState::Custom { name, .. } => Some(name),
90            _ => None,
91        }
92    }
93
94    /// Get the custom state metadata if this is a custom state with metadata
95    #[inline]
96    #[must_use]
97    pub fn custom_metadata(&self) -> Option<&[u8]> {
98        match self {
99            TaskState::Custom { metadata, .. } => metadata.as_deref(),
100            _ => None,
101        }
102    }
103
104    /// Check if the task is revoked
105    #[inline]
106    #[must_use]
107    pub const fn is_revoked(&self) -> bool {
108        matches!(self, TaskState::Revoked)
109    }
110
111    /// Check if the task is rejected
112    #[inline]
113    #[must_use]
114    pub const fn is_rejected(&self) -> bool {
115        matches!(self, TaskState::Rejected)
116    }
117
118    /// Check if the task is received
119    #[inline]
120    #[must_use]
121    pub const fn is_received(&self) -> bool {
122        matches!(self, TaskState::Received)
123    }
124
125    /// Check if the task can be retried
126    #[inline]
127    #[must_use]
128    pub const fn can_retry(&self, max_retries: u32) -> bool {
129        match self {
130            TaskState::Failed(_) => true,
131            TaskState::Retrying(count) => *count < max_retries,
132            _ => false,
133        }
134    }
135
136    /// Get the retry count
137    #[inline]
138    #[must_use]
139    pub const fn retry_count(&self) -> u32 {
140        match self {
141            TaskState::Retrying(count) => *count,
142            _ => 0,
143        }
144    }
145
146    /// Check if the task is in an active (non-terminal) state
147    #[inline]
148    #[must_use]
149    pub const fn is_active(&self) -> bool {
150        !self.is_terminal()
151    }
152
153    /// Check if the task is pending
154    #[inline]
155    #[must_use]
156    pub const fn is_pending(&self) -> bool {
157        matches!(self, TaskState::Pending)
158    }
159
160    /// Check if the task is reserved
161    #[inline]
162    #[must_use]
163    pub const fn is_reserved(&self) -> bool {
164        matches!(self, TaskState::Reserved)
165    }
166
167    /// Check if the task is running
168    #[inline]
169    #[must_use]
170    pub const fn is_running(&self) -> bool {
171        matches!(self, TaskState::Running)
172    }
173
174    /// Check if the task is retrying
175    #[inline]
176    #[must_use]
177    pub const fn is_retrying(&self) -> bool {
178        matches!(self, TaskState::Retrying(_))
179    }
180
181    /// Check if the task succeeded
182    #[inline]
183    #[must_use]
184    pub const fn is_succeeded(&self) -> bool {
185        matches!(self, TaskState::Succeeded(_))
186    }
187
188    /// Check if the task failed
189    #[inline]
190    #[must_use]
191    pub const fn is_failed(&self) -> bool {
192        matches!(self, TaskState::Failed(_))
193    }
194
195    /// Get the success result if the task succeeded
196    #[inline]
197    #[must_use]
198    pub fn success_result(&self) -> Option<&[u8]> {
199        match self {
200            TaskState::Succeeded(result) => Some(result),
201            _ => None,
202        }
203    }
204
205    /// Get the error message if the task failed
206    #[inline]
207    #[must_use]
208    pub fn error_message(&self) -> Option<&str> {
209        match self {
210            TaskState::Failed(error) => Some(error),
211            _ => None,
212        }
213    }
214}
215
216impl fmt::Display for TaskState {
217    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
218        match self {
219            TaskState::Pending => write!(f, "PENDING"),
220            TaskState::Received => write!(f, "RECEIVED"),
221            TaskState::Reserved => write!(f, "RESERVED"),
222            TaskState::Running => write!(f, "RUNNING"),
223            TaskState::Retrying(count) => write!(f, "RETRYING({count})"),
224            TaskState::Succeeded(_) => write!(f, "SUCCEEDED"),
225            TaskState::Failed(err) => write!(f, "FAILED: {err}"),
226            TaskState::Revoked => write!(f, "REVOKED"),
227            TaskState::Rejected => write!(f, "REJECTED"),
228            TaskState::Custom { name, .. } => write!(f, "CUSTOM({name})"),
229        }
230    }
231}
232
233impl TaskState {
234    /// Get a short string representation of the state name
235    #[inline]
236    #[must_use]
237    pub fn name(&self) -> &str {
238        match self {
239            TaskState::Pending => "PENDING",
240            TaskState::Received => "RECEIVED",
241            TaskState::Reserved => "RESERVED",
242            TaskState::Running => "RUNNING",
243            TaskState::Retrying(_) => "RETRYING",
244            TaskState::Succeeded(_) => "SUCCESS",
245            TaskState::Failed(_) => "FAILURE",
246            TaskState::Revoked => "REVOKED",
247            TaskState::Rejected => "REJECTED",
248            TaskState::Custom { name, .. } => name,
249        }
250    }
251}
252
253// ============================================================================
254// State Transitions
255// ============================================================================
256
257/// A state transition record
258#[derive(Debug, Clone, Serialize, Deserialize)]
259pub struct StateTransition {
260    /// Previous state
261    pub from: TaskState,
262    /// New state
263    pub to: TaskState,
264    /// Unix timestamp when the transition occurred
265    pub timestamp: f64,
266    /// Optional reason for the transition
267    pub reason: Option<String>,
268    /// Optional additional metadata
269    pub metadata: Option<HashMap<String, serde_json::Value>>,
270}
271
272impl StateTransition {
273    /// Create a new state transition
274    #[must_use]
275    pub fn new(from: TaskState, to: TaskState) -> Self {
276        Self {
277            from,
278            to,
279            timestamp: SystemTime::now()
280                .duration_since(UNIX_EPOCH)
281                .unwrap_or_default()
282                .as_secs_f64(),
283            reason: None,
284            metadata: None,
285        }
286    }
287
288    /// Add a reason for the transition
289    #[must_use]
290    pub fn with_reason(mut self, reason: impl Into<String>) -> Self {
291        self.reason = Some(reason.into());
292        self
293    }
294
295    /// Add metadata to the transition
296    #[must_use]
297    pub fn with_metadata(mut self, metadata: HashMap<String, serde_json::Value>) -> Self {
298        self.metadata = Some(metadata);
299        self
300    }
301
302    /// Add a single metadata key-value pair
303    #[must_use]
304    pub fn with_meta(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
305        self.metadata
306            .get_or_insert_with(HashMap::new)
307            .insert(key.into(), value);
308        self
309    }
310}
311
312/// Tracks state transitions for a task
313#[derive(Debug, Clone, Serialize, Deserialize, Default)]
314pub struct StateHistory {
315    /// Current state
316    pub current: Option<TaskState>,
317    /// List of state transitions
318    pub transitions: Vec<StateTransition>,
319}
320
321impl StateHistory {
322    /// Create a new state history
323    #[must_use]
324    pub fn new() -> Self {
325        Self::default()
326    }
327
328    /// Create a new state history with initial state
329    #[must_use]
330    pub fn with_initial(state: TaskState) -> Self {
331        Self {
332            current: Some(state),
333            transitions: Vec::new(),
334        }
335    }
336
337    /// Transition to a new state
338    pub fn transition(&mut self, to: TaskState) -> Option<StateTransition> {
339        let from = self.current.take()?;
340        let transition = StateTransition::new(from, to.clone());
341        self.current = Some(to);
342        self.transitions.push(transition.clone());
343        Some(transition)
344    }
345
346    /// Transition to a new state with a reason
347    pub fn transition_with_reason(
348        &mut self,
349        to: TaskState,
350        reason: impl Into<String>,
351    ) -> Option<StateTransition> {
352        let from = self.current.take()?;
353        let transition = StateTransition::new(from, to.clone()).with_reason(reason);
354        self.current = Some(to);
355        self.transitions.push(transition.clone());
356        Some(transition)
357    }
358
359    /// Get the current state
360    #[inline]
361    #[must_use]
362    pub fn current_state(&self) -> Option<&TaskState> {
363        self.current.as_ref()
364    }
365
366    /// Get all transitions
367    #[inline]
368    #[must_use]
369    pub fn get_transitions(&self) -> &[StateTransition] {
370        &self.transitions
371    }
372
373    /// Get the last transition
374    #[inline]
375    #[must_use]
376    pub fn last_transition(&self) -> Option<&StateTransition> {
377        self.transitions.last()
378    }
379
380    /// Get the number of transitions
381    #[inline]
382    #[must_use]
383    pub const fn transition_count(&self) -> usize {
384        self.transitions.len()
385    }
386
387    /// Check if task has ever been in a specific state
388    #[inline]
389    #[must_use]
390    pub fn has_been_in_state(&self, state_name: &str) -> bool {
391        self.transitions.iter().any(|t| t.to.name() == state_name)
392            || self
393                .current
394                .as_ref()
395                .is_some_and(|s| s.name() == state_name)
396    }
397
398    /// Get the time spent in a specific state (returns None if never in that state)
399    #[must_use]
400    pub fn time_in_state(&self, state_name: &str) -> Option<f64> {
401        let mut total_time = 0.0;
402        let mut entry_time: Option<f64> = None;
403
404        for transition in &self.transitions {
405            if transition.from.name() == state_name {
406                if let Some(entry) = entry_time {
407                    total_time += transition.timestamp - entry;
408                    entry_time = None;
409                }
410            }
411            if transition.to.name() == state_name {
412                entry_time = Some(transition.timestamp);
413            }
414        }
415
416        // If still in the state, add time until now
417        if let Some(entry) = entry_time {
418            if self
419                .current
420                .as_ref()
421                .is_some_and(|s| s.name() == state_name)
422            {
423                let now = SystemTime::now()
424                    .duration_since(UNIX_EPOCH)
425                    .unwrap_or_default()
426                    .as_secs_f64();
427                total_time += now - entry;
428            }
429        }
430
431        if total_time > 0.0 || entry_time.is_some() {
432            Some(total_time)
433        } else {
434            None
435        }
436    }
437}
438
439#[cfg(test)]
440mod tests {
441    use super::*;
442
443    #[test]
444    fn test_terminal_states() {
445        assert!(TaskState::Succeeded(vec![]).is_terminal());
446        assert!(TaskState::Failed("error".to_string()).is_terminal());
447        assert!(!TaskState::Pending.is_terminal());
448        assert!(!TaskState::Running.is_terminal());
449    }
450
451    #[test]
452    fn test_retry_logic() {
453        assert!(TaskState::Failed("error".to_string()).can_retry(3));
454        assert!(TaskState::Retrying(2).can_retry(3));
455        assert!(!TaskState::Retrying(3).can_retry(3));
456        assert!(!TaskState::Succeeded(vec![]).can_retry(3));
457    }
458
459    // Property-based tests
460    #[cfg(test)]
461    mod proptests {
462        use super::*;
463        use proptest::prelude::*;
464
465        // Strategy for generating TaskState
466        fn task_state_strategy() -> impl Strategy<Value = TaskState> {
467            prop_oneof![
468                Just(TaskState::Pending),
469                Just(TaskState::Received),
470                Just(TaskState::Reserved),
471                Just(TaskState::Running),
472                (0u32..100).prop_map(TaskState::Retrying),
473                prop::collection::vec(any::<u8>(), 0..100).prop_map(TaskState::Succeeded),
474                any::<String>().prop_map(TaskState::Failed),
475                Just(TaskState::Revoked),
476                Just(TaskState::Rejected),
477            ]
478        }
479
480        proptest! {
481            #[test]
482            fn test_terminal_states_are_consistent(state in task_state_strategy()) {
483                // Property: If a state is terminal, it should not be active
484                if state.is_terminal() {
485                    prop_assert!(!state.is_active());
486                } else {
487                    prop_assert!(state.is_active());
488                }
489            }
490
491            #[test]
492            fn test_retry_count_is_non_negative(count in 0u32..1000) {
493                let state = TaskState::Retrying(count);
494                prop_assert_eq!(state.retry_count(), count);
495                prop_assert!(state.is_retrying());
496            }
497
498            #[test]
499            fn test_can_retry_respects_max_retries(current_retry in 0u32..100, max_retries in 0u32..100) {
500                let state = TaskState::Retrying(current_retry);
501                let can_retry = state.can_retry(max_retries);
502
503                if current_retry < max_retries {
504                    prop_assert!(can_retry, "Should be able to retry when current_retry < max_retries");
505                } else {
506                    prop_assert!(!can_retry, "Should not be able to retry when current_retry >= max_retries");
507                }
508            }
509
510            #[test]
511            fn test_failed_state_can_always_retry_once(max_retries in 1u32..100) {
512                let state = TaskState::Failed("error".to_string());
513                prop_assert!(state.can_retry(max_retries));
514            }
515
516            #[test]
517            fn test_terminal_states_cannot_retry(max_retries in 1u32..100) {
518                let terminal_states = vec![
519                    TaskState::Succeeded(vec![1, 2, 3]),
520                    TaskState::Revoked,
521                    TaskState::Rejected,
522                ];
523
524                for state in terminal_states {
525                    if !matches!(state, TaskState::Failed(_)) {
526                        prop_assert!(!state.can_retry(max_retries) || state.is_failed());
527                    }
528                }
529            }
530
531            #[test]
532            fn test_state_name_is_consistent(state in task_state_strategy()) {
533                let name = state.name();
534                prop_assert!(!name.is_empty(), "State name should never be empty");
535
536                // Name should match the state type
537                match &state {
538                    TaskState::Pending => prop_assert_eq!(name, "PENDING"),
539                    TaskState::Received => prop_assert_eq!(name, "RECEIVED"),
540                    TaskState::Reserved => prop_assert_eq!(name, "RESERVED"),
541                    TaskState::Running => prop_assert_eq!(name, "RUNNING"),
542                    TaskState::Retrying(_) => prop_assert_eq!(name, "RETRYING"),
543                    TaskState::Succeeded(_) => prop_assert_eq!(name, "SUCCESS"),
544                    TaskState::Failed(_) => prop_assert_eq!(name, "FAILURE"),
545                    TaskState::Revoked => prop_assert_eq!(name, "REVOKED"),
546                    TaskState::Rejected => prop_assert_eq!(name, "REJECTED"),
547                    TaskState::Custom { name: custom_name, .. } => prop_assert_eq!(name, custom_name),
548                }
549            }
550
551            #[test]
552            fn test_success_result_only_for_succeeded(result in prop::collection::vec(any::<u8>(), 0..100)) {
553                let success_state = TaskState::Succeeded(result.clone());
554                prop_assert_eq!(success_state.success_result(), Some(result.as_slice()));
555
556                let other_states = vec![
557                    TaskState::Pending,
558                    TaskState::Running,
559                    TaskState::Failed("error".to_string()),
560                ];
561
562                for state in other_states {
563                    prop_assert_eq!(state.success_result(), None);
564                }
565            }
566
567            #[test]
568            fn test_error_message_only_for_failed(error_msg in any::<String>()) {
569                let failed_state = TaskState::Failed(error_msg.clone());
570                prop_assert_eq!(failed_state.error_message(), Some(error_msg.as_str()));
571
572                let other_states = vec![
573                    TaskState::Pending,
574                    TaskState::Running,
575                    TaskState::Succeeded(vec![]),
576                ];
577
578                for state in other_states {
579                    prop_assert_eq!(state.error_message(), None);
580                }
581            }
582
583            #[test]
584            fn test_state_history_transitions_accumulate(
585                num_transitions in 1usize..20,
586            ) {
587                let mut history = StateHistory::with_initial(TaskState::Pending);
588
589                for i in 0..num_transitions {
590                    let new_state = if i % 2 == 0 {
591                        TaskState::Running
592                    } else {
593                        TaskState::Pending
594                    };
595                    history.transition(new_state);
596                }
597
598                prop_assert_eq!(history.transition_count(), num_transitions);
599                prop_assert!(history.last_transition().is_some());
600            }
601
602            #[test]
603            fn test_state_history_current_state_is_latest(state in task_state_strategy()) {
604                let mut history = StateHistory::with_initial(TaskState::Pending);
605                history.transition(state.clone());
606
607                prop_assert_eq!(history.current_state(), Some(&state));
608            }
609        }
610    }
611}