1use serde::{Deserialize, Serialize};
2use std::collections::HashMap;
3use std::fmt;
4use std::time::{SystemTime, UNIX_EPOCH};
5
6#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
8pub enum TaskState {
9 Pending,
11
12 Received,
14
15 Reserved,
17
18 Running,
20
21 Retrying(u32),
23
24 Succeeded(Vec<u8>),
26
27 Failed(String),
29
30 Revoked,
32
33 Rejected,
35
36 Custom {
38 name: String,
40 metadata: Option<Vec<u8>>,
42 },
43}
44
45impl TaskState {
46 #[inline]
48 #[must_use]
49 pub const fn is_terminal(&self) -> bool {
50 matches!(
51 self,
52 TaskState::Succeeded(_)
53 | TaskState::Failed(_)
54 | TaskState::Revoked
55 | TaskState::Rejected
56 )
57 }
58
59 #[must_use]
61 pub fn custom(name: impl Into<String>) -> Self {
62 Self::Custom {
63 name: name.into(),
64 metadata: None,
65 }
66 }
67
68 #[must_use]
70 pub fn custom_with_metadata(name: impl Into<String>, metadata: Vec<u8>) -> Self {
71 Self::Custom {
72 name: name.into(),
73 metadata: Some(metadata),
74 }
75 }
76
77 #[inline]
79 #[must_use]
80 pub const fn is_custom(&self) -> bool {
81 matches!(self, TaskState::Custom { .. })
82 }
83
84 #[inline]
86 #[must_use]
87 pub fn custom_name(&self) -> Option<&str> {
88 match self {
89 TaskState::Custom { name, .. } => Some(name),
90 _ => None,
91 }
92 }
93
94 #[inline]
96 #[must_use]
97 pub fn custom_metadata(&self) -> Option<&[u8]> {
98 match self {
99 TaskState::Custom { metadata, .. } => metadata.as_deref(),
100 _ => None,
101 }
102 }
103
104 #[inline]
106 #[must_use]
107 pub const fn is_revoked(&self) -> bool {
108 matches!(self, TaskState::Revoked)
109 }
110
111 #[inline]
113 #[must_use]
114 pub const fn is_rejected(&self) -> bool {
115 matches!(self, TaskState::Rejected)
116 }
117
118 #[inline]
120 #[must_use]
121 pub const fn is_received(&self) -> bool {
122 matches!(self, TaskState::Received)
123 }
124
125 #[inline]
127 #[must_use]
128 pub const fn can_retry(&self, max_retries: u32) -> bool {
129 match self {
130 TaskState::Failed(_) => true,
131 TaskState::Retrying(count) => *count < max_retries,
132 _ => false,
133 }
134 }
135
136 #[inline]
138 #[must_use]
139 pub const fn retry_count(&self) -> u32 {
140 match self {
141 TaskState::Retrying(count) => *count,
142 _ => 0,
143 }
144 }
145
146 #[inline]
148 #[must_use]
149 pub const fn is_active(&self) -> bool {
150 !self.is_terminal()
151 }
152
153 #[inline]
155 #[must_use]
156 pub const fn is_pending(&self) -> bool {
157 matches!(self, TaskState::Pending)
158 }
159
160 #[inline]
162 #[must_use]
163 pub const fn is_reserved(&self) -> bool {
164 matches!(self, TaskState::Reserved)
165 }
166
167 #[inline]
169 #[must_use]
170 pub const fn is_running(&self) -> bool {
171 matches!(self, TaskState::Running)
172 }
173
174 #[inline]
176 #[must_use]
177 pub const fn is_retrying(&self) -> bool {
178 matches!(self, TaskState::Retrying(_))
179 }
180
181 #[inline]
183 #[must_use]
184 pub const fn is_succeeded(&self) -> bool {
185 matches!(self, TaskState::Succeeded(_))
186 }
187
188 #[inline]
190 #[must_use]
191 pub const fn is_failed(&self) -> bool {
192 matches!(self, TaskState::Failed(_))
193 }
194
195 #[inline]
197 #[must_use]
198 pub fn success_result(&self) -> Option<&[u8]> {
199 match self {
200 TaskState::Succeeded(result) => Some(result),
201 _ => None,
202 }
203 }
204
205 #[inline]
207 #[must_use]
208 pub fn error_message(&self) -> Option<&str> {
209 match self {
210 TaskState::Failed(error) => Some(error),
211 _ => None,
212 }
213 }
214}
215
216impl fmt::Display for TaskState {
217 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
218 match self {
219 TaskState::Pending => write!(f, "PENDING"),
220 TaskState::Received => write!(f, "RECEIVED"),
221 TaskState::Reserved => write!(f, "RESERVED"),
222 TaskState::Running => write!(f, "RUNNING"),
223 TaskState::Retrying(count) => write!(f, "RETRYING({count})"),
224 TaskState::Succeeded(_) => write!(f, "SUCCEEDED"),
225 TaskState::Failed(err) => write!(f, "FAILED: {err}"),
226 TaskState::Revoked => write!(f, "REVOKED"),
227 TaskState::Rejected => write!(f, "REJECTED"),
228 TaskState::Custom { name, .. } => write!(f, "CUSTOM({name})"),
229 }
230 }
231}
232
233impl TaskState {
234 #[inline]
236 #[must_use]
237 pub fn name(&self) -> &str {
238 match self {
239 TaskState::Pending => "PENDING",
240 TaskState::Received => "RECEIVED",
241 TaskState::Reserved => "RESERVED",
242 TaskState::Running => "RUNNING",
243 TaskState::Retrying(_) => "RETRYING",
244 TaskState::Succeeded(_) => "SUCCESS",
245 TaskState::Failed(_) => "FAILURE",
246 TaskState::Revoked => "REVOKED",
247 TaskState::Rejected => "REJECTED",
248 TaskState::Custom { name, .. } => name,
249 }
250 }
251}
252
253#[derive(Debug, Clone, Serialize, Deserialize)]
259pub struct StateTransition {
260 pub from: TaskState,
262 pub to: TaskState,
264 pub timestamp: f64,
266 pub reason: Option<String>,
268 pub metadata: Option<HashMap<String, serde_json::Value>>,
270}
271
272impl StateTransition {
273 #[must_use]
275 pub fn new(from: TaskState, to: TaskState) -> Self {
276 Self {
277 from,
278 to,
279 timestamp: SystemTime::now()
280 .duration_since(UNIX_EPOCH)
281 .unwrap_or_default()
282 .as_secs_f64(),
283 reason: None,
284 metadata: None,
285 }
286 }
287
288 #[must_use]
290 pub fn with_reason(mut self, reason: impl Into<String>) -> Self {
291 self.reason = Some(reason.into());
292 self
293 }
294
295 #[must_use]
297 pub fn with_metadata(mut self, metadata: HashMap<String, serde_json::Value>) -> Self {
298 self.metadata = Some(metadata);
299 self
300 }
301
302 #[must_use]
304 pub fn with_meta(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
305 self.metadata
306 .get_or_insert_with(HashMap::new)
307 .insert(key.into(), value);
308 self
309 }
310}
311
312#[derive(Debug, Clone, Serialize, Deserialize, Default)]
314pub struct StateHistory {
315 pub current: Option<TaskState>,
317 pub transitions: Vec<StateTransition>,
319}
320
321impl StateHistory {
322 #[must_use]
324 pub fn new() -> Self {
325 Self::default()
326 }
327
328 #[must_use]
330 pub fn with_initial(state: TaskState) -> Self {
331 Self {
332 current: Some(state),
333 transitions: Vec::new(),
334 }
335 }
336
337 pub fn transition(&mut self, to: TaskState) -> Option<StateTransition> {
339 let from = self.current.take()?;
340 let transition = StateTransition::new(from, to.clone());
341 self.current = Some(to);
342 self.transitions.push(transition.clone());
343 Some(transition)
344 }
345
346 pub fn transition_with_reason(
348 &mut self,
349 to: TaskState,
350 reason: impl Into<String>,
351 ) -> Option<StateTransition> {
352 let from = self.current.take()?;
353 let transition = StateTransition::new(from, to.clone()).with_reason(reason);
354 self.current = Some(to);
355 self.transitions.push(transition.clone());
356 Some(transition)
357 }
358
359 #[inline]
361 #[must_use]
362 pub fn current_state(&self) -> Option<&TaskState> {
363 self.current.as_ref()
364 }
365
366 #[inline]
368 #[must_use]
369 pub fn get_transitions(&self) -> &[StateTransition] {
370 &self.transitions
371 }
372
373 #[inline]
375 #[must_use]
376 pub fn last_transition(&self) -> Option<&StateTransition> {
377 self.transitions.last()
378 }
379
380 #[inline]
382 #[must_use]
383 pub const fn transition_count(&self) -> usize {
384 self.transitions.len()
385 }
386
387 #[inline]
389 #[must_use]
390 pub fn has_been_in_state(&self, state_name: &str) -> bool {
391 self.transitions.iter().any(|t| t.to.name() == state_name)
392 || self
393 .current
394 .as_ref()
395 .is_some_and(|s| s.name() == state_name)
396 }
397
398 #[must_use]
400 pub fn time_in_state(&self, state_name: &str) -> Option<f64> {
401 let mut total_time = 0.0;
402 let mut entry_time: Option<f64> = None;
403
404 for transition in &self.transitions {
405 if transition.from.name() == state_name {
406 if let Some(entry) = entry_time {
407 total_time += transition.timestamp - entry;
408 entry_time = None;
409 }
410 }
411 if transition.to.name() == state_name {
412 entry_time = Some(transition.timestamp);
413 }
414 }
415
416 if let Some(entry) = entry_time {
418 if self
419 .current
420 .as_ref()
421 .is_some_and(|s| s.name() == state_name)
422 {
423 let now = SystemTime::now()
424 .duration_since(UNIX_EPOCH)
425 .unwrap_or_default()
426 .as_secs_f64();
427 total_time += now - entry;
428 }
429 }
430
431 if total_time > 0.0 || entry_time.is_some() {
432 Some(total_time)
433 } else {
434 None
435 }
436 }
437}
438
439#[cfg(test)]
440mod tests {
441 use super::*;
442
443 #[test]
444 fn test_terminal_states() {
445 assert!(TaskState::Succeeded(vec![]).is_terminal());
446 assert!(TaskState::Failed("error".to_string()).is_terminal());
447 assert!(!TaskState::Pending.is_terminal());
448 assert!(!TaskState::Running.is_terminal());
449 }
450
451 #[test]
452 fn test_retry_logic() {
453 assert!(TaskState::Failed("error".to_string()).can_retry(3));
454 assert!(TaskState::Retrying(2).can_retry(3));
455 assert!(!TaskState::Retrying(3).can_retry(3));
456 assert!(!TaskState::Succeeded(vec![]).can_retry(3));
457 }
458
459 #[cfg(test)]
461 mod proptests {
462 use super::*;
463 use proptest::prelude::*;
464
465 fn task_state_strategy() -> impl Strategy<Value = TaskState> {
467 prop_oneof![
468 Just(TaskState::Pending),
469 Just(TaskState::Received),
470 Just(TaskState::Reserved),
471 Just(TaskState::Running),
472 (0u32..100).prop_map(TaskState::Retrying),
473 prop::collection::vec(any::<u8>(), 0..100).prop_map(TaskState::Succeeded),
474 any::<String>().prop_map(TaskState::Failed),
475 Just(TaskState::Revoked),
476 Just(TaskState::Rejected),
477 ]
478 }
479
480 proptest! {
481 #[test]
482 fn test_terminal_states_are_consistent(state in task_state_strategy()) {
483 if state.is_terminal() {
485 prop_assert!(!state.is_active());
486 } else {
487 prop_assert!(state.is_active());
488 }
489 }
490
491 #[test]
492 fn test_retry_count_is_non_negative(count in 0u32..1000) {
493 let state = TaskState::Retrying(count);
494 prop_assert_eq!(state.retry_count(), count);
495 prop_assert!(state.is_retrying());
496 }
497
498 #[test]
499 fn test_can_retry_respects_max_retries(current_retry in 0u32..100, max_retries in 0u32..100) {
500 let state = TaskState::Retrying(current_retry);
501 let can_retry = state.can_retry(max_retries);
502
503 if current_retry < max_retries {
504 prop_assert!(can_retry, "Should be able to retry when current_retry < max_retries");
505 } else {
506 prop_assert!(!can_retry, "Should not be able to retry when current_retry >= max_retries");
507 }
508 }
509
510 #[test]
511 fn test_failed_state_can_always_retry_once(max_retries in 1u32..100) {
512 let state = TaskState::Failed("error".to_string());
513 prop_assert!(state.can_retry(max_retries));
514 }
515
516 #[test]
517 fn test_terminal_states_cannot_retry(max_retries in 1u32..100) {
518 let terminal_states = vec![
519 TaskState::Succeeded(vec![1, 2, 3]),
520 TaskState::Revoked,
521 TaskState::Rejected,
522 ];
523
524 for state in terminal_states {
525 if !matches!(state, TaskState::Failed(_)) {
526 prop_assert!(!state.can_retry(max_retries) || state.is_failed());
527 }
528 }
529 }
530
531 #[test]
532 fn test_state_name_is_consistent(state in task_state_strategy()) {
533 let name = state.name();
534 prop_assert!(!name.is_empty(), "State name should never be empty");
535
536 match &state {
538 TaskState::Pending => prop_assert_eq!(name, "PENDING"),
539 TaskState::Received => prop_assert_eq!(name, "RECEIVED"),
540 TaskState::Reserved => prop_assert_eq!(name, "RESERVED"),
541 TaskState::Running => prop_assert_eq!(name, "RUNNING"),
542 TaskState::Retrying(_) => prop_assert_eq!(name, "RETRYING"),
543 TaskState::Succeeded(_) => prop_assert_eq!(name, "SUCCESS"),
544 TaskState::Failed(_) => prop_assert_eq!(name, "FAILURE"),
545 TaskState::Revoked => prop_assert_eq!(name, "REVOKED"),
546 TaskState::Rejected => prop_assert_eq!(name, "REJECTED"),
547 TaskState::Custom { name: custom_name, .. } => prop_assert_eq!(name, custom_name),
548 }
549 }
550
551 #[test]
552 fn test_success_result_only_for_succeeded(result in prop::collection::vec(any::<u8>(), 0..100)) {
553 let success_state = TaskState::Succeeded(result.clone());
554 prop_assert_eq!(success_state.success_result(), Some(result.as_slice()));
555
556 let other_states = vec![
557 TaskState::Pending,
558 TaskState::Running,
559 TaskState::Failed("error".to_string()),
560 ];
561
562 for state in other_states {
563 prop_assert_eq!(state.success_result(), None);
564 }
565 }
566
567 #[test]
568 fn test_error_message_only_for_failed(error_msg in any::<String>()) {
569 let failed_state = TaskState::Failed(error_msg.clone());
570 prop_assert_eq!(failed_state.error_message(), Some(error_msg.as_str()));
571
572 let other_states = vec![
573 TaskState::Pending,
574 TaskState::Running,
575 TaskState::Succeeded(vec![]),
576 ];
577
578 for state in other_states {
579 prop_assert_eq!(state.error_message(), None);
580 }
581 }
582
583 #[test]
584 fn test_state_history_transitions_accumulate(
585 num_transitions in 1usize..20,
586 ) {
587 let mut history = StateHistory::with_initial(TaskState::Pending);
588
589 for i in 0..num_transitions {
590 let new_state = if i % 2 == 0 {
591 TaskState::Running
592 } else {
593 TaskState::Pending
594 };
595 history.transition(new_state);
596 }
597
598 prop_assert_eq!(history.transition_count(), num_transitions);
599 prop_assert!(history.last_transition().is_some());
600 }
601
602 #[test]
603 fn test_state_history_current_state_is_latest(state in task_state_strategy()) {
604 let mut history = StateHistory::with_initial(TaskState::Pending);
605 history.transition(state.clone());
606
607 prop_assert_eq!(history.current_state(), Some(&state));
608 }
609 }
610 }
611}