celers_core/
task.rs

1use crate::state::TaskState;
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use std::collections::HashSet;
5use std::fmt;
6use uuid::Uuid;
7
8/// Unique identifier for a task
9pub type TaskId = Uuid;
10
11/// Batch utility functions for working with multiple tasks
12pub mod batch {
13    use super::{SerializedTask, TaskState, Uuid};
14
15    /// Validate a collection of tasks, returning all errors
16    ///
17    /// # Example
18    /// ```
19    /// use celers_core::{SerializedTask, task::batch};
20    ///
21    /// let tasks = vec![
22    ///     SerializedTask::new("task1".to_string(), vec![1, 2, 3]),
23    ///     SerializedTask::new("task2".to_string(), vec![4, 5, 6]),
24    /// ];
25    ///
26    /// let errors = batch::validate_all(&tasks);
27    /// assert!(errors.is_empty());
28    /// ```
29    #[must_use]
30    pub fn validate_all(tasks: &[SerializedTask]) -> Vec<(usize, String)> {
31        tasks
32            .iter()
33            .enumerate()
34            .filter_map(|(idx, task)| task.validate().err().map(|e| (idx, e)))
35            .collect()
36    }
37
38    /// Filter tasks by state
39    ///
40    /// # Example
41    /// ```
42    /// use celers_core::{SerializedTask, TaskState, task::batch};
43    ///
44    /// let mut tasks = vec![
45    ///     SerializedTask::new("task1".to_string(), vec![1, 2, 3]),
46    ///     SerializedTask::new("task2".to_string(), vec![4, 5, 6]),
47    /// ];
48    /// tasks[0].metadata.state = TaskState::Running;
49    ///
50    /// let running = batch::filter_by_state(&tasks, |s| matches!(s, TaskState::Running));
51    /// assert_eq!(running.len(), 1);
52    /// ```
53    #[must_use]
54    pub fn filter_by_state<F>(tasks: &[SerializedTask], predicate: F) -> Vec<&SerializedTask>
55    where
56        F: Fn(&TaskState) -> bool,
57    {
58        tasks
59            .iter()
60            .filter(|task| predicate(&task.metadata.state))
61            .collect()
62    }
63
64    /// Filter tasks by priority
65    ///
66    /// # Example
67    /// ```
68    /// use celers_core::{SerializedTask, task::batch};
69    ///
70    /// let tasks = vec![
71    ///     SerializedTask::new("task1".to_string(), vec![1]).with_priority(10),
72    ///     SerializedTask::new("task2".to_string(), vec![2]).with_priority(5),
73    ///     SerializedTask::new("task3".to_string(), vec![3]),
74    /// ];
75    ///
76    /// let high_priority = batch::filter_high_priority(&tasks);
77    /// assert_eq!(high_priority.len(), 2);
78    /// ```
79    #[must_use]
80    pub fn filter_high_priority(tasks: &[SerializedTask]) -> Vec<&SerializedTask> {
81        tasks
82            .iter()
83            .filter(|task| task.metadata.is_high_priority())
84            .collect()
85    }
86
87    /// Sort tasks by priority (highest first)
88    ///
89    /// # Example
90    /// ```
91    /// use celers_core::{SerializedTask, task::batch};
92    ///
93    /// let mut tasks = vec![
94    ///     SerializedTask::new("task1".to_string(), vec![1]).with_priority(5),
95    ///     SerializedTask::new("task2".to_string(), vec![2]).with_priority(10),
96    ///     SerializedTask::new("task3".to_string(), vec![3]).with_priority(1),
97    /// ];
98    ///
99    /// batch::sort_by_priority(&mut tasks);
100    /// assert_eq!(tasks[0].metadata.priority, 10);
101    /// assert_eq!(tasks[1].metadata.priority, 5);
102    /// assert_eq!(tasks[2].metadata.priority, 1);
103    /// ```
104    pub fn sort_by_priority(tasks: &mut [SerializedTask]) {
105        tasks.sort_by(|a, b| b.metadata.priority.cmp(&a.metadata.priority));
106    }
107
108    /// Count tasks by state
109    ///
110    /// # Example
111    /// ```
112    /// use celers_core::{SerializedTask, TaskState, task::batch};
113    ///
114    /// let mut tasks = vec![
115    ///     SerializedTask::new("task1".to_string(), vec![1]),
116    ///     SerializedTask::new("task2".to_string(), vec![2]),
117    /// ];
118    /// tasks[0].metadata.state = TaskState::Running;
119    ///
120    /// let counts = batch::count_by_state(&tasks);
121    /// assert_eq!(counts.get("RUNNING"), Some(&1));
122    /// assert_eq!(counts.get("PENDING"), Some(&1));
123    /// ```
124    #[must_use]
125    pub fn count_by_state(tasks: &[SerializedTask]) -> std::collections::HashMap<String, usize> {
126        let mut counts = std::collections::HashMap::new();
127        for task in tasks {
128            *counts
129                .entry(task.metadata.state.name().to_string())
130                .or_insert(0) += 1;
131        }
132        counts
133    }
134
135    /// Check if any tasks have expired
136    ///
137    /// # Example
138    /// ```
139    /// use celers_core::{SerializedTask, task::batch};
140    ///
141    /// let tasks = vec![
142    ///     SerializedTask::new("task1".to_string(), vec![1]).with_timeout(60),
143    ///     SerializedTask::new("task2".to_string(), vec![2]),
144    /// ];
145    ///
146    /// // Fresh tasks shouldn't be expired
147    /// assert!(!batch::has_expired_tasks(&tasks));
148    /// ```
149    #[inline]
150    #[must_use]
151    pub fn has_expired_tasks(tasks: &[SerializedTask]) -> bool {
152        tasks.iter().any(super::SerializedTask::is_expired)
153    }
154
155    /// Get tasks that have expired
156    ///
157    /// # Example
158    /// ```
159    /// use celers_core::{SerializedTask, task::batch};
160    ///
161    /// let tasks = vec![
162    ///     SerializedTask::new("task1".to_string(), vec![1]).with_timeout(60),
163    ///     SerializedTask::new("task2".to_string(), vec![2]),
164    /// ];
165    ///
166    /// let expired = batch::get_expired_tasks(&tasks);
167    /// // Fresh tasks shouldn't be expired
168    /// assert_eq!(expired.len(), 0);
169    /// ```
170    #[inline]
171    #[must_use]
172    pub fn get_expired_tasks(tasks: &[SerializedTask]) -> Vec<&SerializedTask> {
173        tasks.iter().filter(|task| task.is_expired()).collect()
174    }
175
176    /// Calculate total payload size for a collection of tasks
177    ///
178    /// # Example
179    /// ```
180    /// use celers_core::{SerializedTask, task::batch};
181    ///
182    /// let tasks = vec![
183    ///     SerializedTask::new("task1".to_string(), vec![1, 2, 3]),
184    ///     SerializedTask::new("task2".to_string(), vec![4, 5]),
185    /// ];
186    ///
187    /// let total_size = batch::total_payload_size(&tasks);
188    /// assert_eq!(total_size, 5);
189    /// ```
190    #[must_use]
191    pub fn total_payload_size(tasks: &[SerializedTask]) -> usize {
192        tasks.iter().map(super::SerializedTask::payload_size).sum()
193    }
194
195    /// Find tasks with dependencies
196    ///
197    /// # Example
198    /// ```
199    /// use celers_core::{SerializedTask, task::batch};
200    /// use uuid::Uuid;
201    ///
202    /// let parent_id = Uuid::new_v4();
203    /// let tasks = vec![
204    ///     SerializedTask::new("task1".to_string(), vec![1]),
205    ///     SerializedTask::new("task2".to_string(), vec![2]).with_dependency(parent_id),
206    /// ];
207    ///
208    /// let with_deps = batch::filter_with_dependencies(&tasks);
209    /// assert_eq!(with_deps.len(), 1);
210    /// ```
211    #[must_use]
212    pub fn filter_with_dependencies(tasks: &[SerializedTask]) -> Vec<&SerializedTask> {
213        tasks
214            .iter()
215            .filter(|task| task.metadata.has_dependencies())
216            .collect()
217    }
218
219    /// Find tasks that can be retried
220    ///
221    /// # Example
222    /// ```
223    /// use celers_core::{SerializedTask, TaskState, task::batch};
224    ///
225    /// let mut tasks = vec![
226    ///     SerializedTask::new("task1".to_string(), vec![1]).with_max_retries(3),
227    ///     SerializedTask::new("task2".to_string(), vec![2]).with_max_retries(3),
228    /// ];
229    /// tasks[0].metadata.state = TaskState::Failed("error".to_string());
230    /// tasks[1].metadata.state = TaskState::Succeeded(vec![]);
231    ///
232    /// let can_retry = batch::filter_retryable(&tasks);
233    /// assert_eq!(can_retry.len(), 1);
234    /// ```
235    #[must_use]
236    pub fn filter_retryable(tasks: &[SerializedTask]) -> Vec<&SerializedTask> {
237        tasks.iter().filter(|task| task.can_retry()).collect()
238    }
239
240    /// Find tasks by name pattern (contains)
241    ///
242    /// # Example
243    /// ```
244    /// use celers_core::{SerializedTask, task::batch};
245    ///
246    /// let tasks = vec![
247    ///     SerializedTask::new("process_data".to_string(), vec![1]),
248    ///     SerializedTask::new("process_image".to_string(), vec![2]),
249    ///     SerializedTask::new("send_email".to_string(), vec![3]),
250    /// ];
251    ///
252    /// let process_tasks = batch::filter_by_name_pattern(&tasks, "process");
253    /// assert_eq!(process_tasks.len(), 2);
254    /// ```
255    #[must_use]
256    pub fn filter_by_name_pattern<'a>(
257        tasks: &'a [SerializedTask],
258        pattern: &str,
259    ) -> Vec<&'a SerializedTask> {
260        tasks
261            .iter()
262            .filter(|task| task.metadata.name.contains(pattern))
263            .collect()
264    }
265
266    /// Group tasks by their workflow group ID
267    ///
268    /// # Example
269    /// ```
270    /// use celers_core::{SerializedTask, task::batch};
271    /// use uuid::Uuid;
272    ///
273    /// let group1 = Uuid::new_v4();
274    /// let group2 = Uuid::new_v4();
275    ///
276    /// let tasks = vec![
277    ///     SerializedTask::new("task1".to_string(), vec![1]).with_group_id(group1),
278    ///     SerializedTask::new("task2".to_string(), vec![2]).with_group_id(group1),
279    ///     SerializedTask::new("task3".to_string(), vec![3]).with_group_id(group2),
280    ///     SerializedTask::new("task4".to_string(), vec![4]),
281    /// ];
282    ///
283    /// let groups = batch::group_by_workflow_id(&tasks);
284    /// assert_eq!(groups.len(), 2);
285    /// ```
286    #[must_use]
287    pub fn group_by_workflow_id(
288        tasks: &[SerializedTask],
289    ) -> std::collections::HashMap<Uuid, Vec<&SerializedTask>> {
290        let mut groups = std::collections::HashMap::new();
291        for task in tasks {
292            if let Some(group_id) = task.metadata.group_id {
293                groups.entry(group_id).or_insert_with(Vec::new).push(task);
294            }
295        }
296        groups
297    }
298
299    /// Find terminal tasks (succeeded or failed)
300    ///
301    /// # Example
302    /// ```
303    /// use celers_core::{SerializedTask, TaskState, task::batch};
304    ///
305    /// let mut tasks = vec![
306    ///     SerializedTask::new("task1".to_string(), vec![1]),
307    ///     SerializedTask::new("task2".to_string(), vec![2]),
308    /// ];
309    /// tasks[0].metadata.state = TaskState::Succeeded(vec![1, 2, 3]);
310    ///
311    /// let terminal = batch::filter_terminal(&tasks);
312    /// assert_eq!(terminal.len(), 1);
313    /// ```
314    #[must_use]
315    pub fn filter_terminal(tasks: &[SerializedTask]) -> Vec<&SerializedTask> {
316        tasks.iter().filter(|task| task.is_terminal()).collect()
317    }
318
319    /// Find active tasks (pending, running, or retrying)
320    ///
321    /// # Example
322    /// ```
323    /// use celers_core::{SerializedTask, TaskState, task::batch};
324    ///
325    /// let mut tasks = vec![
326    ///     SerializedTask::new("task1".to_string(), vec![1]),
327    ///     SerializedTask::new("task2".to_string(), vec![2]),
328    /// ];
329    /// tasks[1].metadata.state = TaskState::Succeeded(vec![]);
330    ///
331    /// let active = batch::filter_active(&tasks);
332    /// assert_eq!(active.len(), 1);
333    /// ```
334    #[must_use]
335    pub fn filter_active(tasks: &[SerializedTask]) -> Vec<&SerializedTask> {
336        tasks.iter().filter(|task| task.is_active()).collect()
337    }
338
339    /// Calculate average payload size
340    ///
341    /// # Example
342    /// ```
343    /// use celers_core::{SerializedTask, task::batch};
344    ///
345    /// let tasks = vec![
346    ///     SerializedTask::new("task1".to_string(), vec![1, 2, 3]),
347    ///     SerializedTask::new("task2".to_string(), vec![4, 5]),
348    /// ];
349    ///
350    /// let avg = batch::average_payload_size(&tasks);
351    /// assert_eq!(avg, 2); // (3 + 2) / 2 = 2
352    /// ```
353    #[must_use]
354    pub fn average_payload_size(tasks: &[SerializedTask]) -> usize {
355        if tasks.is_empty() {
356            0
357        } else {
358            total_payload_size(tasks) / tasks.len()
359        }
360    }
361
362    /// Find the oldest task by creation time
363    ///
364    /// # Example
365    /// ```
366    /// use celers_core::{SerializedTask, task::batch};
367    ///
368    /// let tasks = vec![
369    ///     SerializedTask::new("task1".to_string(), vec![1]),
370    ///     SerializedTask::new("task2".to_string(), vec![2]),
371    /// ];
372    ///
373    /// let oldest = batch::find_oldest(&tasks);
374    /// assert!(oldest.is_some());
375    /// ```
376    #[must_use]
377    pub fn find_oldest(tasks: &[SerializedTask]) -> Option<&SerializedTask> {
378        tasks.iter().min_by_key(|task| task.metadata.created_at)
379    }
380
381    /// Find the newest task by creation time
382    ///
383    /// # Example
384    /// ```
385    /// use celers_core::{SerializedTask, task::batch};
386    ///
387    /// let tasks = vec![
388    ///     SerializedTask::new("task1".to_string(), vec![1]),
389    ///     SerializedTask::new("task2".to_string(), vec![2]),
390    /// ];
391    ///
392    /// let newest = batch::find_newest(&tasks);
393    /// assert!(newest.is_some());
394    /// ```
395    #[must_use]
396    pub fn find_newest(tasks: &[SerializedTask]) -> Option<&SerializedTask> {
397        tasks.iter().max_by_key(|task| task.metadata.created_at)
398    }
399}
400
401/// Task metadata including execution information
402#[derive(Debug, Clone, Serialize, Deserialize)]
403pub struct TaskMetadata {
404    /// Unique task identifier
405    pub id: TaskId,
406
407    /// Task name/type identifier
408    pub name: String,
409
410    /// Current state of the task
411    pub state: TaskState,
412
413    /// When the task was created
414    pub created_at: DateTime<Utc>,
415
416    /// When the task was last updated
417    pub updated_at: DateTime<Utc>,
418
419    /// Maximum number of retry attempts
420    pub max_retries: u32,
421
422    /// Task timeout in seconds
423    pub timeout_secs: Option<u64>,
424
425    /// Task priority (higher = more important)
426    pub priority: i32,
427
428    /// Group ID (for workflow grouping)
429    #[serde(skip_serializing_if = "Option::is_none")]
430    pub group_id: Option<Uuid>,
431
432    /// Chord ID (for barrier synchronization)
433    #[serde(skip_serializing_if = "Option::is_none")]
434    pub chord_id: Option<Uuid>,
435
436    /// Task dependencies (tasks that must complete before this task can execute)
437    #[serde(skip_serializing_if = "HashSet::is_empty", default)]
438    pub dependencies: HashSet<TaskId>,
439}
440
441impl TaskMetadata {
442    #[inline]
443    #[must_use]
444    pub fn new(name: String) -> Self {
445        let now = Utc::now();
446        Self {
447            id: Uuid::new_v4(),
448            name,
449            state: TaskState::Pending,
450            created_at: now,
451            updated_at: now,
452            max_retries: 3,
453            timeout_secs: None,
454            priority: 0,
455            group_id: None,
456            chord_id: None,
457            dependencies: HashSet::new(),
458        }
459    }
460
461    #[inline]
462    #[must_use]
463    pub fn with_max_retries(mut self, max_retries: u32) -> Self {
464        self.max_retries = max_retries;
465        self
466    }
467
468    #[inline]
469    #[must_use]
470    pub fn with_timeout(mut self, timeout_secs: u64) -> Self {
471        self.timeout_secs = Some(timeout_secs);
472        self
473    }
474
475    #[inline]
476    #[must_use]
477    pub fn with_priority(mut self, priority: i32) -> Self {
478        self.priority = priority;
479        self
480    }
481
482    /// Set the group ID for workflow grouping
483    #[inline]
484    #[must_use]
485    pub fn with_group_id(mut self, group_id: Uuid) -> Self {
486        self.group_id = Some(group_id);
487        self
488    }
489
490    /// Set the chord ID for barrier synchronization
491    #[inline]
492    #[must_use]
493    pub fn with_chord_id(mut self, chord_id: Uuid) -> Self {
494        self.chord_id = Some(chord_id);
495        self
496    }
497
498    /// Get the age of the task (time since creation)
499    #[inline]
500    #[must_use]
501    pub fn age(&self) -> chrono::Duration {
502        Utc::now() - self.created_at
503    }
504
505    /// Check if the task has expired based on its timeout
506    #[inline]
507    #[must_use]
508    #[allow(clippy::cast_possible_wrap)]
509    pub fn is_expired(&self) -> bool {
510        if let Some(timeout_secs) = self.timeout_secs {
511            let elapsed = (Utc::now() - self.created_at).num_seconds();
512            elapsed > timeout_secs as i64
513        } else {
514            false
515        }
516    }
517
518    /// Check if the task is in a terminal state (Succeeded or Failed)
519    #[inline]
520    #[must_use]
521    pub fn is_terminal(&self) -> bool {
522        self.state.is_terminal()
523    }
524
525    /// Check if the task is in a running or active state
526    #[inline]
527    #[must_use]
528    pub fn is_active(&self) -> bool {
529        matches!(
530            self.state,
531            TaskState::Pending | TaskState::Reserved | TaskState::Running | TaskState::Retrying(_)
532        )
533    }
534
535    /// Validate the task metadata
536    ///
537    /// Returns an error if any of the metadata fields are invalid:
538    /// - Name must not be empty
539    /// - Max retries must be reasonable (< 1000)
540    /// - Timeout must be at least 1 second if set
541    /// - Priority must be in valid range (-2147483648 to 2147483647)
542    ///
543    /// # Errors
544    ///
545    /// Returns an error if validation fails.
546    pub fn validate(&self) -> Result<(), String> {
547        if self.name.is_empty() {
548            return Err("Task name cannot be empty".to_string());
549        }
550
551        if self.max_retries > 1000 {
552            return Err("Max retries cannot exceed 1000".to_string());
553        }
554
555        if let Some(timeout) = self.timeout_secs {
556            if timeout == 0 {
557                return Err("Timeout must be at least 1 second".to_string());
558            }
559            if timeout > 86400 {
560                return Err("Timeout cannot exceed 24 hours (86400 seconds)".to_string());
561            }
562        }
563
564        Ok(())
565    }
566
567    /// Check if task has a timeout configured
568    #[inline]
569    #[must_use]
570    pub fn has_timeout(&self) -> bool {
571        self.timeout_secs.is_some()
572    }
573
574    /// Check if task is part of a group
575    #[inline]
576    #[must_use]
577    pub fn has_group_id(&self) -> bool {
578        self.group_id.is_some()
579    }
580
581    /// Check if task is part of a chord
582    #[inline]
583    #[must_use]
584    pub fn has_chord_id(&self) -> bool {
585        self.chord_id.is_some()
586    }
587
588    /// Check if task has custom priority (non-zero)
589    #[inline]
590    #[must_use]
591    pub const fn has_priority(&self) -> bool {
592        self.priority != 0
593    }
594
595    /// Check if task has high priority (priority > 0)
596    #[inline]
597    #[must_use]
598    pub const fn is_high_priority(&self) -> bool {
599        self.priority > 0
600    }
601
602    /// Check if task has low priority (priority < 0)
603    #[inline]
604    #[must_use]
605    pub const fn is_low_priority(&self) -> bool {
606        self.priority < 0
607    }
608
609    /// Add a task dependency
610    #[inline]
611    #[must_use]
612    pub fn with_dependency(mut self, dependency: TaskId) -> Self {
613        self.dependencies.insert(dependency);
614        self
615    }
616
617    /// Add multiple task dependencies
618    #[inline]
619    #[must_use]
620    pub fn with_dependencies(mut self, dependencies: impl IntoIterator<Item = TaskId>) -> Self {
621        self.dependencies.extend(dependencies);
622        self
623    }
624
625    /// Check if task has any dependencies
626    #[inline]
627    #[must_use]
628    pub fn has_dependencies(&self) -> bool {
629        !self.dependencies.is_empty()
630    }
631
632    /// Get the number of dependencies
633    #[inline]
634    #[must_use]
635    pub fn dependency_count(&self) -> usize {
636        self.dependencies.len()
637    }
638
639    /// Check if a specific task is a dependency
640    #[inline]
641    #[must_use]
642    pub fn depends_on(&self, task_id: &TaskId) -> bool {
643        self.dependencies.contains(task_id)
644    }
645
646    /// Remove a dependency
647    #[inline]
648    pub fn remove_dependency(&mut self, task_id: &TaskId) -> bool {
649        self.dependencies.remove(task_id)
650    }
651
652    /// Clear all dependencies
653    #[inline]
654    pub fn clear_dependencies(&mut self) {
655        self.dependencies.clear();
656    }
657
658    // ===== Convenience State Checks =====
659
660    /// Check if task is in Pending state
661    #[inline]
662    #[must_use]
663    pub fn is_pending(&self) -> bool {
664        matches!(self.state, TaskState::Pending)
665    }
666
667    /// Check if task is in Running state
668    #[inline]
669    #[must_use]
670    pub fn is_running(&self) -> bool {
671        matches!(self.state, TaskState::Running)
672    }
673
674    /// Check if task is in Succeeded state
675    #[inline]
676    #[must_use]
677    pub fn is_succeeded(&self) -> bool {
678        matches!(self.state, TaskState::Succeeded(_))
679    }
680
681    /// Check if task is in Failed state
682    #[inline]
683    #[must_use]
684    pub fn is_failed(&self) -> bool {
685        matches!(self.state, TaskState::Failed(_))
686    }
687
688    /// Check if task is in Retrying state
689    #[inline]
690    #[must_use]
691    pub fn is_retrying(&self) -> bool {
692        matches!(self.state, TaskState::Retrying(_))
693    }
694
695    /// Check if task is in Reserved state
696    #[inline]
697    #[must_use]
698    pub fn is_reserved(&self) -> bool {
699        matches!(self.state, TaskState::Reserved)
700    }
701
702    // ===== Time-related Helpers =====
703
704    /// Get remaining time before timeout (None if no timeout or already expired)
705    ///
706    /// # Example
707    /// ```
708    /// use celers_core::TaskMetadata;
709    ///
710    /// let task = TaskMetadata::new("test".to_string()).with_timeout(60);
711    /// if let Some(remaining) = task.time_remaining() {
712    ///     println!("Task has {} seconds remaining", remaining.num_seconds());
713    /// }
714    /// ```
715    #[inline]
716    #[must_use]
717    #[allow(clippy::cast_possible_wrap)]
718    pub fn time_remaining(&self) -> Option<chrono::Duration> {
719        self.timeout_secs.and_then(|timeout| {
720            let elapsed = Utc::now() - self.created_at;
721            let timeout_duration = chrono::Duration::seconds(timeout as i64);
722            let remaining = timeout_duration - elapsed;
723            if remaining.num_seconds() > 0 {
724                Some(remaining)
725            } else {
726                None
727            }
728        })
729    }
730
731    /// Get the time elapsed since task creation
732    ///
733    /// # Example
734    /// ```
735    /// use celers_core::TaskMetadata;
736    ///
737    /// let task = TaskMetadata::new("test".to_string());
738    /// let elapsed = task.time_elapsed();
739    /// assert!(elapsed.num_seconds() >= 0);
740    /// ```
741    #[inline]
742    #[must_use]
743    pub fn time_elapsed(&self) -> chrono::Duration {
744        Utc::now() - self.created_at
745    }
746
747    // ===== Retry Helpers =====
748
749    /// Check if task can be retried based on current retry count
750    ///
751    /// # Example
752    /// ```
753    /// use celers_core::{TaskMetadata, TaskState};
754    ///
755    /// let mut task = TaskMetadata::new("test".to_string()).with_max_retries(3);
756    /// task.state = TaskState::Failed("error".to_string());
757    /// assert!(task.can_retry());
758    ///
759    /// task.state = TaskState::Retrying(3);
760    /// assert!(!task.can_retry());
761    /// ```
762    #[inline]
763    #[must_use]
764    pub fn can_retry(&self) -> bool {
765        self.state.can_retry(self.max_retries)
766    }
767
768    /// Get current retry count
769    ///
770    /// # Example
771    /// ```
772    /// use celers_core::{TaskMetadata, TaskState};
773    ///
774    /// let mut task = TaskMetadata::new("test".to_string());
775    /// task.state = TaskState::Retrying(2);
776    /// assert_eq!(task.retry_count(), 2);
777    /// ```
778    #[inline]
779    #[must_use]
780    pub const fn retry_count(&self) -> u32 {
781        self.state.retry_count()
782    }
783
784    /// Get remaining retry attempts
785    ///
786    /// # Example
787    /// ```
788    /// use celers_core::{TaskMetadata, TaskState};
789    ///
790    /// let mut task = TaskMetadata::new("test".to_string()).with_max_retries(5);
791    /// task.state = TaskState::Retrying(2);
792    /// assert_eq!(task.retries_remaining(), 3);
793    /// ```
794    #[inline]
795    #[must_use]
796    pub const fn retries_remaining(&self) -> u32 {
797        let current = self.retry_count();
798        self.max_retries.saturating_sub(current)
799    }
800
801    // ===== Workflow Helpers =====
802
803    /// Check if task is part of any workflow (group or chord)
804    ///
805    /// # Example
806    /// ```
807    /// use celers_core::TaskMetadata;
808    /// use uuid::Uuid;
809    ///
810    /// let task = TaskMetadata::new("test".to_string())
811    ///     .with_group_id(Uuid::new_v4());
812    /// assert!(task.is_part_of_workflow());
813    /// ```
814    #[inline]
815    #[must_use]
816    pub fn is_part_of_workflow(&self) -> bool {
817        self.group_id.is_some() || self.chord_id.is_some()
818    }
819
820    /// Get group ID if task is part of a group
821    #[inline]
822    #[must_use]
823    pub fn get_group_id(&self) -> Option<&Uuid> {
824        self.group_id.as_ref()
825    }
826
827    /// Get chord ID if task is part of a chord
828    #[inline]
829    #[must_use]
830    pub fn get_chord_id(&self) -> Option<&Uuid> {
831        self.chord_id.as_ref()
832    }
833
834    // ===== State Transition Helpers =====
835
836    /// Update task state to Running
837    ///
838    /// # Example
839    /// ```
840    /// use celers_core::{TaskMetadata, TaskState};
841    ///
842    /// let mut task = TaskMetadata::new("test".to_string());
843    /// task.mark_as_running();
844    /// assert!(task.is_running());
845    /// ```
846    #[inline]
847    pub fn mark_as_running(&mut self) {
848        self.state = TaskState::Running;
849        self.updated_at = Utc::now();
850    }
851
852    /// Update task state to Succeeded with result
853    ///
854    /// # Example
855    /// ```
856    /// use celers_core::{TaskMetadata, TaskState};
857    ///
858    /// let mut task = TaskMetadata::new("test".to_string());
859    /// task.mark_as_succeeded(vec![1, 2, 3]);
860    /// assert!(task.is_succeeded());
861    /// ```
862    #[inline]
863    pub fn mark_as_succeeded(&mut self, result: Vec<u8>) {
864        self.state = TaskState::Succeeded(result);
865        self.updated_at = Utc::now();
866    }
867
868    /// Update task state to Failed with error message
869    ///
870    /// # Example
871    /// ```
872    /// use celers_core::{TaskMetadata, TaskState};
873    ///
874    /// let mut task = TaskMetadata::new("test".to_string());
875    /// task.mark_as_failed("Connection timeout");
876    /// assert!(task.is_failed());
877    /// ```
878    #[inline]
879    pub fn mark_as_failed(&mut self, error: impl Into<String>) {
880        self.state = TaskState::Failed(error.into());
881        self.updated_at = Utc::now();
882    }
883
884    /// Clone task with a new ID (useful for task retry/duplication)
885    ///
886    /// # Example
887    /// ```
888    /// use celers_core::TaskMetadata;
889    ///
890    /// let task = TaskMetadata::new("test".to_string()).with_priority(5);
891    /// let cloned = task.with_new_id();
892    /// assert_ne!(task.id, cloned.id);
893    /// assert_eq!(task.name, cloned.name);
894    /// assert_eq!(task.priority, cloned.priority);
895    /// ```
896    #[inline]
897    #[must_use]
898    pub fn with_new_id(&self) -> Self {
899        let now = Utc::now();
900        Self {
901            id: Uuid::new_v4(),
902            name: self.name.clone(),
903            state: TaskState::Pending,
904            created_at: now,
905            updated_at: now,
906            max_retries: self.max_retries,
907            timeout_secs: self.timeout_secs,
908            priority: self.priority,
909            group_id: self.group_id,
910            chord_id: self.chord_id,
911            dependencies: self.dependencies.clone(),
912        }
913    }
914}
915
916impl fmt::Display for TaskMetadata {
917    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
918        write!(
919            f,
920            "Task[{}] name={} state={} priority={} retries={}/{}",
921            &self.id.to_string()[..8],
922            self.name,
923            self.state,
924            self.priority,
925            self.state.retry_count(),
926            self.max_retries
927        )?;
928
929        if let Some(timeout) = self.timeout_secs {
930            write!(f, " timeout={timeout}s")?;
931        }
932
933        if let Some(chord_id) = self.chord_id {
934            write!(f, " chord={}", &chord_id.to_string()[..8])?;
935        }
936
937        Ok(())
938    }
939}
940
941/// Trait for tasks that can be executed
942#[async_trait::async_trait]
943pub trait Task: Send + Sync {
944    /// The input type for this task
945    type Input: Serialize + for<'de> Deserialize<'de> + Send;
946
947    /// The output type for this task
948    type Output: Serialize + for<'de> Deserialize<'de> + Send;
949
950    /// Execute the task with the given input
951    async fn execute(&self, input: Self::Input) -> crate::Result<Self::Output>;
952
953    /// Get the task name
954    fn name(&self) -> &str;
955}
956
957/// Serialized task ready for queue
958///
959/// # Zero-Copy Serialization Considerations
960///
961/// The current implementation uses `Vec<u8>` for the payload, which requires
962/// copying data during serialization and deserialization. For high-performance
963/// scenarios, consider these alternatives:
964///
965/// 1. **`Bytes` from `bytes` crate**: Provides cheap cloning via reference counting
966///    ```ignore
967///    use bytes::Bytes;
968///    pub payload: Bytes,
969///    ```
970///
971/// 2. **`Arc<[u8]>`**: Reference-counted slice for shared ownership
972///    ```ignore
973///    use std::sync::Arc;
974///    pub payload: Arc<[u8]>,
975///    ```
976///
977/// 3. **Borrowed payloads with lifetimes**: For truly zero-copy deserialization
978///    ```ignore
979///    #[derive(Deserialize)]
980///    pub struct SerializedTask<'a> {
981///        #[serde(borrow)]
982///        pub payload: &'a [u8],
983///    }
984///    ```
985///
986/// Trade-offs:
987/// - `Vec<u8>`: Simple, owned data, but requires copying
988/// - `Bytes`: Cheap cloning, but requires external dependency
989/// - `Arc<[u8]>`: Cheap cloning, but atomic operations have overhead
990/// - Borrowed: True zero-copy, but lifetime complexity and limited use cases
991///
992/// For most use cases, the current `Vec<u8>` approach provides a good balance
993/// of simplicity and performance. Consider alternatives only when profiling shows
994/// serialization as a bottleneck.
995#[derive(Debug, Clone, Serialize, Deserialize)]
996pub struct SerializedTask {
997    /// Task metadata
998    pub metadata: TaskMetadata,
999
1000    /// Serialized task payload
1001    pub payload: Vec<u8>,
1002}
1003
1004impl SerializedTask {
1005    #[inline]
1006    #[must_use]
1007    pub fn new(name: String, payload: Vec<u8>) -> Self {
1008        Self {
1009            metadata: TaskMetadata::new(name),
1010            payload,
1011        }
1012    }
1013
1014    #[inline]
1015    #[must_use]
1016    pub fn with_priority(mut self, priority: i32) -> Self {
1017        self.metadata.priority = priority;
1018        self
1019    }
1020
1021    #[inline]
1022    #[must_use]
1023    pub fn with_max_retries(mut self, max_retries: u32) -> Self {
1024        self.metadata.max_retries = max_retries;
1025        self
1026    }
1027
1028    #[inline]
1029    #[must_use]
1030    pub fn with_timeout(mut self, timeout_secs: u64) -> Self {
1031        self.metadata.timeout_secs = Some(timeout_secs);
1032        self
1033    }
1034
1035    /// Set the group ID for workflow grouping
1036    #[inline]
1037    #[must_use]
1038    pub fn with_group_id(mut self, group_id: Uuid) -> Self {
1039        self.metadata.group_id = Some(group_id);
1040        self
1041    }
1042
1043    /// Set the chord ID for barrier synchronization
1044    #[inline]
1045    #[must_use]
1046    pub fn with_chord_id(mut self, chord_id: Uuid) -> Self {
1047        self.metadata.chord_id = Some(chord_id);
1048        self
1049    }
1050
1051    /// Get the age of the task (time since creation)
1052    #[inline]
1053    #[must_use]
1054    pub fn age(&self) -> chrono::Duration {
1055        self.metadata.age()
1056    }
1057
1058    /// Check if the task has expired based on its timeout
1059    #[inline]
1060    #[must_use]
1061    pub fn is_expired(&self) -> bool {
1062        self.metadata.is_expired()
1063    }
1064
1065    /// Check if the task is in a terminal state (Success or Failure)
1066    #[inline]
1067    #[must_use]
1068    pub fn is_terminal(&self) -> bool {
1069        self.metadata.is_terminal()
1070    }
1071
1072    /// Check if the task is in a running or active state
1073    #[inline]
1074    #[must_use]
1075    pub fn is_active(&self) -> bool {
1076        self.metadata.is_active()
1077    }
1078
1079    /// Validate the serialized task
1080    ///
1081    /// Validates both metadata and payload constraints:
1082    /// - Delegates metadata validation to `TaskMetadata::validate()`
1083    /// - Checks payload size (must be < 1MB by default)
1084    ///
1085    /// # Errors
1086    ///
1087    /// Returns an error if validation fails.
1088    pub fn validate(&self) -> Result<(), String> {
1089        self.metadata.validate()?;
1090
1091        if self.payload.is_empty() {
1092            return Err("Task payload cannot be empty".to_string());
1093        }
1094
1095        if self.payload.len() > 1_048_576 {
1096            return Err(format!(
1097                "Task payload too large: {} bytes (max 1MB)",
1098                self.payload.len()
1099            ));
1100        }
1101
1102        Ok(())
1103    }
1104
1105    /// Validate with custom payload size limit
1106    ///
1107    /// # Errors
1108    ///
1109    /// Returns an error if validation fails.
1110    pub fn validate_with_limit(&self, max_payload_bytes: usize) -> Result<(), String> {
1111        self.metadata.validate()?;
1112
1113        if self.payload.is_empty() {
1114            return Err("Task payload cannot be empty".to_string());
1115        }
1116
1117        if self.payload.len() > max_payload_bytes {
1118            return Err(format!(
1119                "Task payload too large: {} bytes (max {} bytes)",
1120                self.payload.len(),
1121                max_payload_bytes
1122            ));
1123        }
1124
1125        Ok(())
1126    }
1127
1128    /// Check if task has a timeout configured
1129    #[inline]
1130    #[must_use]
1131    pub fn has_timeout(&self) -> bool {
1132        self.metadata.has_timeout()
1133    }
1134
1135    /// Check if task is part of a group
1136    #[inline]
1137    #[must_use]
1138    pub fn has_group_id(&self) -> bool {
1139        self.metadata.has_group_id()
1140    }
1141
1142    /// Check if task is part of a chord
1143    #[inline]
1144    #[must_use]
1145    pub fn has_chord_id(&self) -> bool {
1146        self.metadata.has_chord_id()
1147    }
1148
1149    /// Check if task has custom priority (non-zero)
1150    #[inline]
1151    #[must_use]
1152    pub fn has_priority(&self) -> bool {
1153        self.metadata.has_priority()
1154    }
1155
1156    /// Get payload size in bytes
1157    #[inline]
1158    #[must_use]
1159    pub const fn payload_size(&self) -> usize {
1160        self.payload.len()
1161    }
1162
1163    /// Check if payload is empty
1164    #[inline]
1165    #[must_use]
1166    pub fn has_empty_payload(&self) -> bool {
1167        self.payload.is_empty()
1168    }
1169
1170    /// Add a task dependency
1171    #[inline]
1172    #[must_use]
1173    pub fn with_dependency(mut self, dependency: TaskId) -> Self {
1174        self.metadata.dependencies.insert(dependency);
1175        self
1176    }
1177
1178    /// Add multiple task dependencies
1179    #[inline]
1180    #[must_use]
1181    pub fn with_dependencies(mut self, dependencies: impl IntoIterator<Item = TaskId>) -> Self {
1182        self.metadata.dependencies.extend(dependencies);
1183        self
1184    }
1185
1186    /// Check if task has any dependencies
1187    #[inline]
1188    #[must_use]
1189    pub fn has_dependencies(&self) -> bool {
1190        self.metadata.has_dependencies()
1191    }
1192
1193    /// Get the number of dependencies
1194    #[inline]
1195    #[must_use]
1196    pub fn dependency_count(&self) -> usize {
1197        self.metadata.dependency_count()
1198    }
1199
1200    /// Check if a specific task is a dependency
1201    #[inline]
1202    #[must_use]
1203    pub fn depends_on(&self, task_id: &TaskId) -> bool {
1204        self.metadata.depends_on(task_id)
1205    }
1206
1207    /// Check if task has high priority (priority > 0)
1208    #[inline]
1209    #[must_use]
1210    pub fn is_high_priority(&self) -> bool {
1211        self.metadata.is_high_priority()
1212    }
1213
1214    /// Check if task has low priority (priority < 0)
1215    #[inline]
1216    #[must_use]
1217    pub fn is_low_priority(&self) -> bool {
1218        self.metadata.is_low_priority()
1219    }
1220
1221    // ===== Convenience State Checks (Delegated) =====
1222
1223    /// Check if task is in Pending state
1224    #[inline]
1225    #[must_use]
1226    pub fn is_pending(&self) -> bool {
1227        self.metadata.is_pending()
1228    }
1229
1230    /// Check if task is in Running state
1231    #[inline]
1232    #[must_use]
1233    pub fn is_running(&self) -> bool {
1234        self.metadata.is_running()
1235    }
1236
1237    /// Check if task is in Succeeded state
1238    #[inline]
1239    #[must_use]
1240    pub fn is_succeeded(&self) -> bool {
1241        self.metadata.is_succeeded()
1242    }
1243
1244    /// Check if task is in Failed state
1245    #[inline]
1246    #[must_use]
1247    pub fn is_failed(&self) -> bool {
1248        self.metadata.is_failed()
1249    }
1250
1251    /// Check if task is in Retrying state
1252    #[inline]
1253    #[must_use]
1254    pub fn is_retrying(&self) -> bool {
1255        self.metadata.is_retrying()
1256    }
1257
1258    /// Check if task is in Reserved state
1259    #[inline]
1260    #[must_use]
1261    pub fn is_reserved(&self) -> bool {
1262        self.metadata.is_reserved()
1263    }
1264
1265    // ===== Time-related Helpers (Delegated) =====
1266
1267    /// Get remaining time before timeout
1268    #[inline]
1269    #[must_use]
1270    pub fn time_remaining(&self) -> Option<chrono::Duration> {
1271        self.metadata.time_remaining()
1272    }
1273
1274    /// Get the time elapsed since task creation
1275    #[inline]
1276    #[must_use]
1277    pub fn time_elapsed(&self) -> chrono::Duration {
1278        self.metadata.time_elapsed()
1279    }
1280
1281    // ===== Retry Helpers (Delegated) =====
1282
1283    /// Check if task can be retried
1284    #[inline]
1285    #[must_use]
1286    pub fn can_retry(&self) -> bool {
1287        self.metadata.can_retry()
1288    }
1289
1290    /// Get current retry count
1291    #[inline]
1292    #[must_use]
1293    pub const fn retry_count(&self) -> u32 {
1294        self.metadata.retry_count()
1295    }
1296
1297    /// Get remaining retry attempts
1298    #[inline]
1299    #[must_use]
1300    pub const fn retries_remaining(&self) -> u32 {
1301        self.metadata.retries_remaining()
1302    }
1303
1304    // ===== Workflow Helpers (Delegated) =====
1305
1306    /// Check if task is part of any workflow
1307    #[inline]
1308    #[must_use]
1309    pub fn is_part_of_workflow(&self) -> bool {
1310        self.metadata.is_part_of_workflow()
1311    }
1312
1313    /// Get group ID if task is part of a group
1314    #[inline]
1315    #[must_use]
1316    pub fn get_group_id(&self) -> Option<&Uuid> {
1317        self.metadata.get_group_id()
1318    }
1319
1320    /// Get chord ID if task is part of a chord
1321    #[inline]
1322    #[must_use]
1323    pub fn get_chord_id(&self) -> Option<&Uuid> {
1324        self.metadata.get_chord_id()
1325    }
1326}
1327
1328impl fmt::Display for SerializedTask {
1329    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1330        write!(
1331            f,
1332            "SerializedTask[{}] name={} payload={}B state={}",
1333            &self.metadata.id.to_string()[..8],
1334            self.metadata.name,
1335            self.payload.len(),
1336            self.metadata.state
1337        )?;
1338        if self.metadata.has_priority() {
1339            write!(f, " priority={}", self.metadata.priority)?;
1340        }
1341        if let Some(group_id) = self.metadata.group_id {
1342            write!(f, " group={}", &group_id.to_string()[..8])?;
1343        }
1344        if let Some(chord_id) = self.metadata.chord_id {
1345            write!(f, " chord={}", &chord_id.to_string()[..8])?;
1346        }
1347        Ok(())
1348    }
1349}
1350
1351#[cfg(test)]
1352mod tests {
1353    use super::*;
1354
1355    #[test]
1356    fn test_task_metadata_creation() {
1357        let metadata = TaskMetadata::new("test_task".to_string())
1358            .with_max_retries(5)
1359            .with_timeout(60)
1360            .with_priority(10);
1361
1362        assert_eq!(metadata.name, "test_task");
1363        assert_eq!(metadata.max_retries, 5);
1364        assert_eq!(metadata.timeout_secs, Some(60));
1365        assert_eq!(metadata.priority, 10);
1366        assert_eq!(metadata.state, TaskState::Pending);
1367    }
1368
1369    #[test]
1370    fn test_task_dependencies() {
1371        let dep1 = TaskId::new_v4();
1372        let dep2 = TaskId::new_v4();
1373
1374        let metadata = TaskMetadata::new("test_task".to_string())
1375            .with_dependency(dep1)
1376            .with_dependency(dep2);
1377
1378        assert!(metadata.has_dependencies());
1379        assert_eq!(metadata.dependency_count(), 2);
1380        assert!(metadata.depends_on(&dep1));
1381        assert!(metadata.depends_on(&dep2));
1382    }
1383
1384    #[test]
1385    fn test_task_with_dependencies() {
1386        let dep1 = TaskId::new_v4();
1387        let dep2 = TaskId::new_v4();
1388        let deps = vec![dep1, dep2];
1389
1390        let metadata = TaskMetadata::new("test_task".to_string()).with_dependencies(deps);
1391
1392        assert_eq!(metadata.dependency_count(), 2);
1393        assert!(metadata.depends_on(&dep1));
1394        assert!(metadata.depends_on(&dep2));
1395    }
1396
1397    #[test]
1398    fn test_remove_dependency() {
1399        let dep1 = TaskId::new_v4();
1400        let dep2 = TaskId::new_v4();
1401
1402        let mut metadata = TaskMetadata::new("test_task".to_string())
1403            .with_dependency(dep1)
1404            .with_dependency(dep2);
1405
1406        assert_eq!(metadata.dependency_count(), 2);
1407
1408        let removed = metadata.remove_dependency(&dep1);
1409        assert!(removed);
1410        assert_eq!(metadata.dependency_count(), 1);
1411        assert!(!metadata.depends_on(&dep1));
1412        assert!(metadata.depends_on(&dep2));
1413    }
1414
1415    #[test]
1416    fn test_clear_dependencies() {
1417        let dep1 = TaskId::new_v4();
1418        let dep2 = TaskId::new_v4();
1419
1420        let mut metadata = TaskMetadata::new("test_task".to_string())
1421            .with_dependency(dep1)
1422            .with_dependency(dep2);
1423
1424        assert!(metadata.has_dependencies());
1425        metadata.clear_dependencies();
1426        assert!(!metadata.has_dependencies());
1427        assert_eq!(metadata.dependency_count(), 0);
1428    }
1429
1430    #[test]
1431    fn test_serialized_task_dependencies() {
1432        let dep1 = TaskId::new_v4();
1433        let dep2 = TaskId::new_v4();
1434
1435        let task = SerializedTask::new("test_task".to_string(), vec![1, 2, 3])
1436            .with_dependency(dep1)
1437            .with_dependency(dep2);
1438
1439        assert!(task.has_dependencies());
1440        assert_eq!(task.dependency_count(), 2);
1441        assert!(task.depends_on(&dep1));
1442        assert!(task.depends_on(&dep2));
1443    }
1444
1445    // Integration tests for full task lifecycle
1446    #[cfg(test)]
1447    mod integration_tests {
1448        use super::*;
1449        use crate::{StateHistory, TaskState};
1450
1451        #[test]
1452        fn test_complete_task_lifecycle() {
1453            // Create a task
1454            let mut task = SerializedTask::new("process_data".to_string(), vec![1, 2, 3, 4, 5])
1455                .with_priority(5)
1456                .with_max_retries(3)
1457                .with_timeout(60);
1458
1459            assert_eq!(task.metadata.state, TaskState::Pending);
1460            assert!(task.is_active());
1461            assert!(!task.is_terminal());
1462
1463            // Simulate task progression through states
1464            let mut history = StateHistory::with_initial(task.metadata.state.clone());
1465
1466            // Task is received by worker
1467            task.metadata.state = TaskState::Received;
1468            history.transition(task.metadata.state.clone());
1469
1470            // Task is reserved
1471            task.metadata.state = TaskState::Reserved;
1472            history.transition(task.metadata.state.clone());
1473
1474            // Task starts running
1475            task.metadata.state = TaskState::Running;
1476            history.transition(task.metadata.state.clone());
1477
1478            // Task completes successfully
1479            task.metadata.state = TaskState::Succeeded(vec![10, 20, 30]);
1480            history.transition(task.metadata.state.clone());
1481
1482            assert!(task.is_terminal());
1483            assert!(!task.is_active());
1484            assert_eq!(history.transition_count(), 4);
1485        }
1486
1487        #[test]
1488        fn test_task_retry_lifecycle() {
1489            let mut task =
1490                SerializedTask::new("failing_task".to_string(), vec![1, 2, 3]).with_max_retries(3);
1491
1492            let mut history = StateHistory::with_initial(TaskState::Pending);
1493
1494            // First attempt fails
1495            task.metadata.state = TaskState::Running;
1496            history.transition(task.metadata.state.clone());
1497
1498            task.metadata.state = TaskState::Failed("Network error".to_string());
1499            history.transition(task.metadata.state.clone());
1500
1501            // Check if can retry
1502            assert!(task.metadata.state.can_retry(task.metadata.max_retries));
1503
1504            // First retry
1505            task.metadata.state = TaskState::Retrying(1);
1506            history.transition(task.metadata.state.clone());
1507            assert_eq!(task.metadata.state.retry_count(), 1);
1508
1509            task.metadata.state = TaskState::Failed("Still failing".to_string());
1510            history.transition(task.metadata.state.clone());
1511
1512            // Second retry
1513            task.metadata.state = TaskState::Retrying(2);
1514            history.transition(task.metadata.state.clone());
1515            assert_eq!(task.metadata.state.retry_count(), 2);
1516
1517            // Finally succeeds
1518            task.metadata.state = TaskState::Succeeded(vec![]);
1519            history.transition(task.metadata.state.clone());
1520
1521            assert!(task.is_terminal());
1522            assert_eq!(history.transition_count(), 6);
1523        }
1524
1525        #[test]
1526        fn test_task_with_dependencies_lifecycle() {
1527            // Create parent tasks
1528            let parent1_id = TaskId::new_v4();
1529            let parent2_id = TaskId::new_v4();
1530
1531            // Create child task that depends on parents
1532            let child_task = SerializedTask::new("child_task".to_string(), vec![1, 2, 3])
1533                .with_dependency(parent1_id)
1534                .with_dependency(parent2_id)
1535                .with_priority(10);
1536
1537            assert!(child_task.has_dependencies());
1538            assert_eq!(child_task.dependency_count(), 2);
1539            assert!(child_task.depends_on(&parent1_id));
1540            assert!(child_task.depends_on(&parent2_id));
1541
1542            // Verify task properties
1543            assert_eq!(child_task.metadata.priority, 10);
1544            assert!(child_task.is_high_priority());
1545        }
1546
1547        #[test]
1548        fn test_task_serialization_roundtrip() {
1549            let original = SerializedTask::new("test_task".to_string(), vec![1, 2, 3, 4, 5])
1550                .with_priority(5)
1551                .with_max_retries(3)
1552                .with_timeout(120)
1553                .with_dependency(TaskId::new_v4());
1554
1555            // Serialize to JSON
1556            let json = serde_json::to_string(&original).expect("Failed to serialize");
1557
1558            // Deserialize back
1559            let deserialized: SerializedTask =
1560                serde_json::from_str(&json).expect("Failed to deserialize");
1561
1562            assert_eq!(deserialized.metadata.name, original.metadata.name);
1563            assert_eq!(deserialized.metadata.priority, original.metadata.priority);
1564            assert_eq!(
1565                deserialized.metadata.max_retries,
1566                original.metadata.max_retries
1567            );
1568            assert_eq!(
1569                deserialized.metadata.timeout_secs,
1570                original.metadata.timeout_secs
1571            );
1572            assert_eq!(deserialized.payload, original.payload);
1573            assert_eq!(deserialized.dependency_count(), original.dependency_count());
1574        }
1575
1576        #[test]
1577        fn test_task_validation_lifecycle() {
1578            // Valid task
1579            let valid_task = SerializedTask::new("valid_task".to_string(), vec![1, 2, 3])
1580                .with_max_retries(5)
1581                .with_timeout(30);
1582
1583            assert!(valid_task.validate().is_ok());
1584
1585            // Invalid task - empty name
1586            let mut invalid_task = SerializedTask::new(String::new(), vec![1, 2, 3]);
1587            assert!(invalid_task.metadata.validate().is_err());
1588
1589            // Invalid task - excessive retries
1590            invalid_task =
1591                SerializedTask::new("task".to_string(), vec![1, 2, 3]).with_max_retries(10000);
1592            assert!(invalid_task.metadata.validate().is_err());
1593
1594            // Invalid task - zero timeout
1595            let mut invalid_metadata = TaskMetadata::new("task".to_string());
1596            invalid_metadata.timeout_secs = Some(0);
1597            assert!(invalid_metadata.validate().is_err());
1598        }
1599
1600        #[test]
1601        fn test_task_expiration_lifecycle() {
1602            // Create task with 1 second timeout
1603            let task =
1604                SerializedTask::new("expiring_task".to_string(), vec![1, 2, 3]).with_timeout(1);
1605
1606            // Task should not be expired immediately
1607            assert!(!task.is_expired());
1608
1609            // Wait for task to expire
1610            std::thread::sleep(std::time::Duration::from_secs(2));
1611
1612            // Task should now be expired
1613            assert!(task.is_expired());
1614        }
1615
1616        #[test]
1617        fn test_workflow_with_multiple_dependencies() {
1618            use crate::TaskDag;
1619
1620            // Create a workflow: task1 -> task2 -> task3
1621            //                      \-> task4 -^
1622            let mut dag = TaskDag::new();
1623
1624            let task1 = TaskId::new_v4();
1625            let task2 = TaskId::new_v4();
1626            let task3 = TaskId::new_v4();
1627            let task4 = TaskId::new_v4();
1628
1629            dag.add_node(task1, "load_data");
1630            dag.add_node(task2, "transform_data");
1631            dag.add_node(task3, "save_results");
1632            dag.add_node(task4, "validate_data");
1633
1634            // task2 depends on task1
1635            dag.add_dependency(task2, task1).unwrap();
1636            // task4 depends on task1
1637            dag.add_dependency(task4, task1).unwrap();
1638            // task3 depends on both task2 and task4
1639            dag.add_dependency(task3, task2).unwrap();
1640            dag.add_dependency(task3, task4).unwrap();
1641
1642            // Validate DAG
1643            assert!(dag.validate().is_ok());
1644
1645            // Get execution order
1646            let order = dag.topological_sort().unwrap();
1647            assert_eq!(order.len(), 4);
1648
1649            // task1 should be first
1650            assert_eq!(order[0], task1);
1651            // task3 should be last
1652            assert_eq!(order[3], task3);
1653        }
1654
1655        #[test]
1656        fn test_task_state_history_full_lifecycle() {
1657            let mut history = StateHistory::with_initial(TaskState::Pending);
1658
1659            // Simulate complete lifecycle
1660            history.transition(TaskState::Received);
1661            history.transition(TaskState::Reserved);
1662            history.transition(TaskState::Running);
1663            history.transition(TaskState::Failed("Temporary error".to_string()));
1664            history.transition(TaskState::Retrying(1));
1665            history.transition(TaskState::Running);
1666            history.transition(TaskState::Succeeded(vec![1, 2, 3]));
1667
1668            assert_eq!(history.transition_count(), 7);
1669            assert!(history.current_state().unwrap().is_terminal());
1670
1671            // Check states we transitioned TO
1672            assert!(history.has_been_in_state("RECEIVED"));
1673            assert!(history.has_been_in_state("RESERVED"));
1674            assert!(history.has_been_in_state("RUNNING"));
1675            assert!(history.has_been_in_state("FAILURE"));
1676            assert!(history.has_been_in_state("RETRYING"));
1677            assert!(history.has_been_in_state("SUCCESS"));
1678
1679            // Current state should be SUCCESS
1680            assert_eq!(history.current_state().unwrap().name(), "SUCCESS");
1681        }
1682    }
1683}