use crate::state::TaskState;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::fmt;
use uuid::Uuid;
pub type TaskId = Uuid;
pub mod batch {
use super::{SerializedTask, TaskState, Uuid};
#[must_use]
pub fn validate_all(tasks: &[SerializedTask]) -> Vec<(usize, String)> {
tasks
.iter()
.enumerate()
.filter_map(|(idx, task)| task.validate().err().map(|e| (idx, e)))
.collect()
}
#[must_use]
pub fn filter_by_state<F>(tasks: &[SerializedTask], predicate: F) -> Vec<&SerializedTask>
where
F: Fn(&TaskState) -> bool,
{
tasks
.iter()
.filter(|task| predicate(&task.metadata.state))
.collect()
}
#[must_use]
pub fn filter_high_priority(tasks: &[SerializedTask]) -> Vec<&SerializedTask> {
tasks
.iter()
.filter(|task| task.metadata.is_high_priority())
.collect()
}
pub fn sort_by_priority(tasks: &mut [SerializedTask]) {
tasks.sort_by_key(|b| std::cmp::Reverse(b.metadata.priority));
}
#[must_use]
pub fn count_by_state(tasks: &[SerializedTask]) -> std::collections::HashMap<String, usize> {
let mut counts = std::collections::HashMap::new();
for task in tasks {
*counts
.entry(task.metadata.state.name().to_string())
.or_insert(0) += 1;
}
counts
}
#[inline]
#[must_use]
pub fn has_expired_tasks(tasks: &[SerializedTask]) -> bool {
tasks.iter().any(super::SerializedTask::is_expired)
}
#[inline]
#[must_use]
pub fn get_expired_tasks(tasks: &[SerializedTask]) -> Vec<&SerializedTask> {
tasks.iter().filter(|task| task.is_expired()).collect()
}
#[must_use]
pub fn total_payload_size(tasks: &[SerializedTask]) -> usize {
tasks.iter().map(super::SerializedTask::payload_size).sum()
}
#[must_use]
pub fn filter_with_dependencies(tasks: &[SerializedTask]) -> Vec<&SerializedTask> {
tasks
.iter()
.filter(|task| task.metadata.has_dependencies())
.collect()
}
#[must_use]
pub fn filter_retryable(tasks: &[SerializedTask]) -> Vec<&SerializedTask> {
tasks.iter().filter(|task| task.can_retry()).collect()
}
#[must_use]
pub fn filter_by_name_pattern<'a>(
tasks: &'a [SerializedTask],
pattern: &str,
) -> Vec<&'a SerializedTask> {
tasks
.iter()
.filter(|task| task.metadata.name.contains(pattern))
.collect()
}
#[must_use]
pub fn group_by_workflow_id(
tasks: &[SerializedTask],
) -> std::collections::HashMap<Uuid, Vec<&SerializedTask>> {
let mut groups = std::collections::HashMap::new();
for task in tasks {
if let Some(group_id) = task.metadata.group_id {
groups.entry(group_id).or_insert_with(Vec::new).push(task);
}
}
groups
}
#[must_use]
pub fn filter_terminal(tasks: &[SerializedTask]) -> Vec<&SerializedTask> {
tasks.iter().filter(|task| task.is_terminal()).collect()
}
#[must_use]
pub fn filter_active(tasks: &[SerializedTask]) -> Vec<&SerializedTask> {
tasks.iter().filter(|task| task.is_active()).collect()
}
#[must_use]
pub fn average_payload_size(tasks: &[SerializedTask]) -> usize {
if tasks.is_empty() {
0
} else {
total_payload_size(tasks) / tasks.len()
}
}
#[must_use]
pub fn find_oldest(tasks: &[SerializedTask]) -> Option<&SerializedTask> {
tasks.iter().min_by_key(|task| task.metadata.created_at)
}
#[must_use]
pub fn find_newest(tasks: &[SerializedTask]) -> Option<&SerializedTask> {
tasks.iter().max_by_key(|task| task.metadata.created_at)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskMetadata {
pub id: TaskId,
pub name: String,
pub state: TaskState,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub max_retries: u32,
pub timeout_secs: Option<u64>,
pub priority: i32,
#[serde(skip_serializing_if = "Option::is_none")]
pub group_id: Option<Uuid>,
#[serde(skip_serializing_if = "Option::is_none")]
pub chord_id: Option<Uuid>,
#[serde(skip_serializing_if = "HashSet::is_empty", default)]
pub dependencies: HashSet<TaskId>,
}
impl TaskMetadata {
#[inline]
#[must_use]
pub fn new(name: String) -> Self {
let now = Utc::now();
Self {
id: Uuid::new_v4(),
name,
state: TaskState::Pending,
created_at: now,
updated_at: now,
max_retries: 3,
timeout_secs: None,
priority: 0,
group_id: None,
chord_id: None,
dependencies: HashSet::new(),
}
}
#[inline]
#[must_use]
pub fn with_max_retries(mut self, max_retries: u32) -> Self {
self.max_retries = max_retries;
self
}
#[inline]
#[must_use]
pub fn with_timeout(mut self, timeout_secs: u64) -> Self {
self.timeout_secs = Some(timeout_secs);
self
}
#[inline]
#[must_use]
pub fn with_priority(mut self, priority: i32) -> Self {
self.priority = priority;
self
}
#[inline]
#[must_use]
pub fn with_group_id(mut self, group_id: Uuid) -> Self {
self.group_id = Some(group_id);
self
}
#[inline]
#[must_use]
pub fn with_chord_id(mut self, chord_id: Uuid) -> Self {
self.chord_id = Some(chord_id);
self
}
#[inline]
#[must_use]
pub fn age(&self) -> chrono::Duration {
Utc::now() - self.created_at
}
#[inline]
#[must_use]
#[allow(clippy::cast_possible_wrap)]
pub fn is_expired(&self) -> bool {
if let Some(timeout_secs) = self.timeout_secs {
let elapsed = (Utc::now() - self.created_at).num_seconds();
elapsed > timeout_secs as i64
} else {
false
}
}
#[inline]
#[must_use]
pub fn is_terminal(&self) -> bool {
self.state.is_terminal()
}
#[inline]
#[must_use]
pub fn is_active(&self) -> bool {
matches!(
self.state,
TaskState::Pending | TaskState::Reserved | TaskState::Running | TaskState::Retrying(_)
)
}
pub fn validate(&self) -> Result<(), String> {
if self.name.is_empty() {
return Err("Task name cannot be empty".to_string());
}
if self.max_retries > 1000 {
return Err("Max retries cannot exceed 1000".to_string());
}
if let Some(timeout) = self.timeout_secs {
if timeout == 0 {
return Err("Timeout must be at least 1 second".to_string());
}
if timeout > 86400 {
return Err("Timeout cannot exceed 24 hours (86400 seconds)".to_string());
}
}
Ok(())
}
#[inline]
#[must_use]
pub fn has_timeout(&self) -> bool {
self.timeout_secs.is_some()
}
#[inline]
#[must_use]
pub fn has_group_id(&self) -> bool {
self.group_id.is_some()
}
#[inline]
#[must_use]
pub fn has_chord_id(&self) -> bool {
self.chord_id.is_some()
}
#[inline]
#[must_use]
pub const fn has_priority(&self) -> bool {
self.priority != 0
}
#[inline]
#[must_use]
pub const fn is_high_priority(&self) -> bool {
self.priority > 0
}
#[inline]
#[must_use]
pub const fn is_low_priority(&self) -> bool {
self.priority < 0
}
#[inline]
#[must_use]
pub fn with_dependency(mut self, dependency: TaskId) -> Self {
self.dependencies.insert(dependency);
self
}
#[inline]
#[must_use]
pub fn with_dependencies(mut self, dependencies: impl IntoIterator<Item = TaskId>) -> Self {
self.dependencies.extend(dependencies);
self
}
#[inline]
#[must_use]
pub fn has_dependencies(&self) -> bool {
!self.dependencies.is_empty()
}
#[inline]
#[must_use]
pub fn dependency_count(&self) -> usize {
self.dependencies.len()
}
#[inline]
#[must_use]
pub fn depends_on(&self, task_id: &TaskId) -> bool {
self.dependencies.contains(task_id)
}
#[inline]
pub fn remove_dependency(&mut self, task_id: &TaskId) -> bool {
self.dependencies.remove(task_id)
}
#[inline]
pub fn clear_dependencies(&mut self) {
self.dependencies.clear();
}
#[inline]
#[must_use]
pub fn is_pending(&self) -> bool {
matches!(self.state, TaskState::Pending)
}
#[inline]
#[must_use]
pub fn is_running(&self) -> bool {
matches!(self.state, TaskState::Running)
}
#[inline]
#[must_use]
pub fn is_succeeded(&self) -> bool {
matches!(self.state, TaskState::Succeeded(_))
}
#[inline]
#[must_use]
pub fn is_failed(&self) -> bool {
matches!(self.state, TaskState::Failed(_))
}
#[inline]
#[must_use]
pub fn is_retrying(&self) -> bool {
matches!(self.state, TaskState::Retrying(_))
}
#[inline]
#[must_use]
pub fn is_reserved(&self) -> bool {
matches!(self.state, TaskState::Reserved)
}
#[inline]
#[must_use]
#[allow(clippy::cast_possible_wrap)]
pub fn time_remaining(&self) -> Option<chrono::Duration> {
self.timeout_secs.and_then(|timeout| {
let elapsed = Utc::now() - self.created_at;
let timeout_duration = chrono::Duration::seconds(timeout as i64);
let remaining = timeout_duration - elapsed;
if remaining.num_seconds() > 0 {
Some(remaining)
} else {
None
}
})
}
#[inline]
#[must_use]
pub fn time_elapsed(&self) -> chrono::Duration {
Utc::now() - self.created_at
}
#[inline]
#[must_use]
pub fn can_retry(&self) -> bool {
self.state.can_retry(self.max_retries)
}
#[inline]
#[must_use]
pub const fn retry_count(&self) -> u32 {
self.state.retry_count()
}
#[inline]
#[must_use]
pub const fn retries_remaining(&self) -> u32 {
let current = self.retry_count();
self.max_retries.saturating_sub(current)
}
#[inline]
#[must_use]
pub fn is_part_of_workflow(&self) -> bool {
self.group_id.is_some() || self.chord_id.is_some()
}
#[inline]
#[must_use]
pub fn get_group_id(&self) -> Option<&Uuid> {
self.group_id.as_ref()
}
#[inline]
#[must_use]
pub fn get_chord_id(&self) -> Option<&Uuid> {
self.chord_id.as_ref()
}
#[inline]
pub fn mark_as_running(&mut self) {
self.state = TaskState::Running;
self.updated_at = Utc::now();
}
#[inline]
pub fn mark_as_succeeded(&mut self, result: Vec<u8>) {
self.state = TaskState::Succeeded(result);
self.updated_at = Utc::now();
}
#[inline]
pub fn mark_as_failed(&mut self, error: impl Into<String>) {
self.state = TaskState::Failed(error.into());
self.updated_at = Utc::now();
}
#[inline]
#[must_use]
pub fn with_new_id(&self) -> Self {
let now = Utc::now();
Self {
id: Uuid::new_v4(),
name: self.name.clone(),
state: TaskState::Pending,
created_at: now,
updated_at: now,
max_retries: self.max_retries,
timeout_secs: self.timeout_secs,
priority: self.priority,
group_id: self.group_id,
chord_id: self.chord_id,
dependencies: self.dependencies.clone(),
}
}
}
impl fmt::Display for TaskMetadata {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Task[{}] name={} state={} priority={} retries={}/{}",
&self.id.to_string()[..8],
self.name,
self.state,
self.priority,
self.state.retry_count(),
self.max_retries
)?;
if let Some(timeout) = self.timeout_secs {
write!(f, " timeout={timeout}s")?;
}
if let Some(chord_id) = self.chord_id {
write!(f, " chord={}", &chord_id.to_string()[..8])?;
}
Ok(())
}
}
#[async_trait::async_trait]
pub trait Task: Send + Sync {
type Input: Serialize + for<'de> Deserialize<'de> + Send;
type Output: Serialize + for<'de> Deserialize<'de> + Send;
async fn execute(&self, input: Self::Input) -> crate::Result<Self::Output>;
fn name(&self) -> &str;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SerializedTask {
pub metadata: TaskMetadata,
pub payload: Vec<u8>,
}
impl SerializedTask {
#[inline]
#[must_use]
pub fn new(name: String, payload: Vec<u8>) -> Self {
Self {
metadata: TaskMetadata::new(name),
payload,
}
}
#[inline]
#[must_use]
pub fn with_priority(mut self, priority: i32) -> Self {
self.metadata.priority = priority;
self
}
#[inline]
#[must_use]
pub fn with_max_retries(mut self, max_retries: u32) -> Self {
self.metadata.max_retries = max_retries;
self
}
#[inline]
#[must_use]
pub fn with_timeout(mut self, timeout_secs: u64) -> Self {
self.metadata.timeout_secs = Some(timeout_secs);
self
}
#[inline]
#[must_use]
pub fn with_group_id(mut self, group_id: Uuid) -> Self {
self.metadata.group_id = Some(group_id);
self
}
#[inline]
#[must_use]
pub fn with_chord_id(mut self, chord_id: Uuid) -> Self {
self.metadata.chord_id = Some(chord_id);
self
}
#[inline]
#[must_use]
pub fn age(&self) -> chrono::Duration {
self.metadata.age()
}
#[inline]
#[must_use]
pub fn is_expired(&self) -> bool {
self.metadata.is_expired()
}
#[inline]
#[must_use]
pub fn is_terminal(&self) -> bool {
self.metadata.is_terminal()
}
#[inline]
#[must_use]
pub fn is_active(&self) -> bool {
self.metadata.is_active()
}
pub fn validate(&self) -> Result<(), String> {
self.metadata.validate()?;
if self.payload.is_empty() {
return Err("Task payload cannot be empty".to_string());
}
if self.payload.len() > 1_048_576 {
return Err(format!(
"Task payload too large: {} bytes (max 1MB)",
self.payload.len()
));
}
Ok(())
}
pub fn validate_with_limit(&self, max_payload_bytes: usize) -> Result<(), String> {
self.metadata.validate()?;
if self.payload.is_empty() {
return Err("Task payload cannot be empty".to_string());
}
if self.payload.len() > max_payload_bytes {
return Err(format!(
"Task payload too large: {} bytes (max {} bytes)",
self.payload.len(),
max_payload_bytes
));
}
Ok(())
}
#[inline]
#[must_use]
pub fn has_timeout(&self) -> bool {
self.metadata.has_timeout()
}
#[inline]
#[must_use]
pub fn has_group_id(&self) -> bool {
self.metadata.has_group_id()
}
#[inline]
#[must_use]
pub fn has_chord_id(&self) -> bool {
self.metadata.has_chord_id()
}
#[inline]
#[must_use]
pub fn has_priority(&self) -> bool {
self.metadata.has_priority()
}
#[inline]
#[must_use]
pub const fn payload_size(&self) -> usize {
self.payload.len()
}
#[inline]
#[must_use]
pub fn has_empty_payload(&self) -> bool {
self.payload.is_empty()
}
#[inline]
#[must_use]
pub fn with_dependency(mut self, dependency: TaskId) -> Self {
self.metadata.dependencies.insert(dependency);
self
}
#[inline]
#[must_use]
pub fn with_dependencies(mut self, dependencies: impl IntoIterator<Item = TaskId>) -> Self {
self.metadata.dependencies.extend(dependencies);
self
}
#[inline]
#[must_use]
pub fn has_dependencies(&self) -> bool {
self.metadata.has_dependencies()
}
#[inline]
#[must_use]
pub fn dependency_count(&self) -> usize {
self.metadata.dependency_count()
}
#[inline]
#[must_use]
pub fn depends_on(&self, task_id: &TaskId) -> bool {
self.metadata.depends_on(task_id)
}
#[inline]
#[must_use]
pub fn is_high_priority(&self) -> bool {
self.metadata.is_high_priority()
}
#[inline]
#[must_use]
pub fn is_low_priority(&self) -> bool {
self.metadata.is_low_priority()
}
#[inline]
#[must_use]
pub fn is_pending(&self) -> bool {
self.metadata.is_pending()
}
#[inline]
#[must_use]
pub fn is_running(&self) -> bool {
self.metadata.is_running()
}
#[inline]
#[must_use]
pub fn is_succeeded(&self) -> bool {
self.metadata.is_succeeded()
}
#[inline]
#[must_use]
pub fn is_failed(&self) -> bool {
self.metadata.is_failed()
}
#[inline]
#[must_use]
pub fn is_retrying(&self) -> bool {
self.metadata.is_retrying()
}
#[inline]
#[must_use]
pub fn is_reserved(&self) -> bool {
self.metadata.is_reserved()
}
#[inline]
#[must_use]
pub fn time_remaining(&self) -> Option<chrono::Duration> {
self.metadata.time_remaining()
}
#[inline]
#[must_use]
pub fn time_elapsed(&self) -> chrono::Duration {
self.metadata.time_elapsed()
}
#[inline]
#[must_use]
pub fn can_retry(&self) -> bool {
self.metadata.can_retry()
}
#[inline]
#[must_use]
pub const fn retry_count(&self) -> u32 {
self.metadata.retry_count()
}
#[inline]
#[must_use]
pub const fn retries_remaining(&self) -> u32 {
self.metadata.retries_remaining()
}
#[inline]
#[must_use]
pub fn is_part_of_workflow(&self) -> bool {
self.metadata.is_part_of_workflow()
}
#[inline]
#[must_use]
pub fn get_group_id(&self) -> Option<&Uuid> {
self.metadata.get_group_id()
}
#[inline]
#[must_use]
pub fn get_chord_id(&self) -> Option<&Uuid> {
self.metadata.get_chord_id()
}
}
impl fmt::Display for SerializedTask {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"SerializedTask[{}] name={} payload={}B state={}",
&self.metadata.id.to_string()[..8],
self.metadata.name,
self.payload.len(),
self.metadata.state
)?;
if self.metadata.has_priority() {
write!(f, " priority={}", self.metadata.priority)?;
}
if let Some(group_id) = self.metadata.group_id {
write!(f, " group={}", &group_id.to_string()[..8])?;
}
if let Some(chord_id) = self.metadata.chord_id {
write!(f, " chord={}", &chord_id.to_string()[..8])?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_task_metadata_creation() {
let metadata = TaskMetadata::new("test_task".to_string())
.with_max_retries(5)
.with_timeout(60)
.with_priority(10);
assert_eq!(metadata.name, "test_task");
assert_eq!(metadata.max_retries, 5);
assert_eq!(metadata.timeout_secs, Some(60));
assert_eq!(metadata.priority, 10);
assert_eq!(metadata.state, TaskState::Pending);
}
#[test]
fn test_task_dependencies() {
let dep1 = TaskId::new_v4();
let dep2 = TaskId::new_v4();
let metadata = TaskMetadata::new("test_task".to_string())
.with_dependency(dep1)
.with_dependency(dep2);
assert!(metadata.has_dependencies());
assert_eq!(metadata.dependency_count(), 2);
assert!(metadata.depends_on(&dep1));
assert!(metadata.depends_on(&dep2));
}
#[test]
fn test_task_with_dependencies() {
let dep1 = TaskId::new_v4();
let dep2 = TaskId::new_v4();
let deps = vec![dep1, dep2];
let metadata = TaskMetadata::new("test_task".to_string()).with_dependencies(deps);
assert_eq!(metadata.dependency_count(), 2);
assert!(metadata.depends_on(&dep1));
assert!(metadata.depends_on(&dep2));
}
#[test]
fn test_remove_dependency() {
let dep1 = TaskId::new_v4();
let dep2 = TaskId::new_v4();
let mut metadata = TaskMetadata::new("test_task".to_string())
.with_dependency(dep1)
.with_dependency(dep2);
assert_eq!(metadata.dependency_count(), 2);
let removed = metadata.remove_dependency(&dep1);
assert!(removed);
assert_eq!(metadata.dependency_count(), 1);
assert!(!metadata.depends_on(&dep1));
assert!(metadata.depends_on(&dep2));
}
#[test]
fn test_clear_dependencies() {
let dep1 = TaskId::new_v4();
let dep2 = TaskId::new_v4();
let mut metadata = TaskMetadata::new("test_task".to_string())
.with_dependency(dep1)
.with_dependency(dep2);
assert!(metadata.has_dependencies());
metadata.clear_dependencies();
assert!(!metadata.has_dependencies());
assert_eq!(metadata.dependency_count(), 0);
}
#[test]
fn test_serialized_task_dependencies() {
let dep1 = TaskId::new_v4();
let dep2 = TaskId::new_v4();
let task = SerializedTask::new("test_task".to_string(), vec![1, 2, 3])
.with_dependency(dep1)
.with_dependency(dep2);
assert!(task.has_dependencies());
assert_eq!(task.dependency_count(), 2);
assert!(task.depends_on(&dep1));
assert!(task.depends_on(&dep2));
}
#[cfg(test)]
mod integration_tests {
use super::*;
use crate::{StateHistory, TaskState};
#[test]
fn test_complete_task_lifecycle() {
let mut task = SerializedTask::new("process_data".to_string(), vec![1, 2, 3, 4, 5])
.with_priority(5)
.with_max_retries(3)
.with_timeout(60);
assert_eq!(task.metadata.state, TaskState::Pending);
assert!(task.is_active());
assert!(!task.is_terminal());
let mut history = StateHistory::with_initial(task.metadata.state.clone());
task.metadata.state = TaskState::Received;
history.transition(task.metadata.state.clone());
task.metadata.state = TaskState::Reserved;
history.transition(task.metadata.state.clone());
task.metadata.state = TaskState::Running;
history.transition(task.metadata.state.clone());
task.metadata.state = TaskState::Succeeded(vec![10, 20, 30]);
history.transition(task.metadata.state.clone());
assert!(task.is_terminal());
assert!(!task.is_active());
assert_eq!(history.transition_count(), 4);
}
#[test]
fn test_task_retry_lifecycle() {
let mut task =
SerializedTask::new("failing_task".to_string(), vec![1, 2, 3]).with_max_retries(3);
let mut history = StateHistory::with_initial(TaskState::Pending);
task.metadata.state = TaskState::Running;
history.transition(task.metadata.state.clone());
task.metadata.state = TaskState::Failed("Network error".to_string());
history.transition(task.metadata.state.clone());
assert!(task.metadata.state.can_retry(task.metadata.max_retries));
task.metadata.state = TaskState::Retrying(1);
history.transition(task.metadata.state.clone());
assert_eq!(task.metadata.state.retry_count(), 1);
task.metadata.state = TaskState::Failed("Still failing".to_string());
history.transition(task.metadata.state.clone());
task.metadata.state = TaskState::Retrying(2);
history.transition(task.metadata.state.clone());
assert_eq!(task.metadata.state.retry_count(), 2);
task.metadata.state = TaskState::Succeeded(vec![]);
history.transition(task.metadata.state.clone());
assert!(task.is_terminal());
assert_eq!(history.transition_count(), 6);
}
#[test]
fn test_task_with_dependencies_lifecycle() {
let parent1_id = TaskId::new_v4();
let parent2_id = TaskId::new_v4();
let child_task = SerializedTask::new("child_task".to_string(), vec![1, 2, 3])
.with_dependency(parent1_id)
.with_dependency(parent2_id)
.with_priority(10);
assert!(child_task.has_dependencies());
assert_eq!(child_task.dependency_count(), 2);
assert!(child_task.depends_on(&parent1_id));
assert!(child_task.depends_on(&parent2_id));
assert_eq!(child_task.metadata.priority, 10);
assert!(child_task.is_high_priority());
}
#[test]
fn test_task_serialization_roundtrip() {
let original = SerializedTask::new("test_task".to_string(), vec![1, 2, 3, 4, 5])
.with_priority(5)
.with_max_retries(3)
.with_timeout(120)
.with_dependency(TaskId::new_v4());
let json = serde_json::to_string(&original).expect("Failed to serialize");
let deserialized: SerializedTask =
serde_json::from_str(&json).expect("Failed to deserialize");
assert_eq!(deserialized.metadata.name, original.metadata.name);
assert_eq!(deserialized.metadata.priority, original.metadata.priority);
assert_eq!(
deserialized.metadata.max_retries,
original.metadata.max_retries
);
assert_eq!(
deserialized.metadata.timeout_secs,
original.metadata.timeout_secs
);
assert_eq!(deserialized.payload, original.payload);
assert_eq!(deserialized.dependency_count(), original.dependency_count());
}
#[test]
fn test_task_validation_lifecycle() {
let valid_task = SerializedTask::new("valid_task".to_string(), vec![1, 2, 3])
.with_max_retries(5)
.with_timeout(30);
assert!(valid_task.validate().is_ok());
let mut invalid_task = SerializedTask::new(String::new(), vec![1, 2, 3]);
assert!(invalid_task.metadata.validate().is_err());
invalid_task =
SerializedTask::new("task".to_string(), vec![1, 2, 3]).with_max_retries(10000);
assert!(invalid_task.metadata.validate().is_err());
let mut invalid_metadata = TaskMetadata::new("task".to_string());
invalid_metadata.timeout_secs = Some(0);
assert!(invalid_metadata.validate().is_err());
}
#[test]
fn test_task_expiration_lifecycle() {
let task =
SerializedTask::new("expiring_task".to_string(), vec![1, 2, 3]).with_timeout(1);
assert!(!task.is_expired());
std::thread::sleep(std::time::Duration::from_secs(2));
assert!(task.is_expired());
}
#[test]
fn test_workflow_with_multiple_dependencies() {
use crate::TaskDag;
let mut dag = TaskDag::new();
let task1 = TaskId::new_v4();
let task2 = TaskId::new_v4();
let task3 = TaskId::new_v4();
let task4 = TaskId::new_v4();
dag.add_node(task1, "load_data");
dag.add_node(task2, "transform_data");
dag.add_node(task3, "save_results");
dag.add_node(task4, "validate_data");
dag.add_dependency(task2, task1).unwrap();
dag.add_dependency(task4, task1).unwrap();
dag.add_dependency(task3, task2).unwrap();
dag.add_dependency(task3, task4).unwrap();
assert!(dag.validate().is_ok());
let order = dag.topological_sort().unwrap();
assert_eq!(order.len(), 4);
assert_eq!(order[0], task1);
assert_eq!(order[3], task3);
}
#[test]
fn test_task_state_history_full_lifecycle() {
let mut history = StateHistory::with_initial(TaskState::Pending);
history.transition(TaskState::Received);
history.transition(TaskState::Reserved);
history.transition(TaskState::Running);
history.transition(TaskState::Failed("Temporary error".to_string()));
history.transition(TaskState::Retrying(1));
history.transition(TaskState::Running);
history.transition(TaskState::Succeeded(vec![1, 2, 3]));
assert_eq!(history.transition_count(), 7);
assert!(history.current_state().unwrap().is_terminal());
assert!(history.has_been_in_state("RECEIVED"));
assert!(history.has_been_in_state("RESERVED"));
assert!(history.has_been_in_state("RUNNING"));
assert!(history.has_been_in_state("FAILURE"));
assert!(history.has_been_in_state("RETRYING"));
assert!(history.has_been_in_state("SUCCESS"));
assert_eq!(history.current_state().unwrap().name(), "SUCCESS");
}
}
}