1use crate::state::TaskState;
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use std::collections::HashSet;
5use std::fmt;
6use uuid::Uuid;
7
8pub type TaskId = Uuid;
10
11pub mod batch {
13 use super::{SerializedTask, TaskState, Uuid};
14
15 #[must_use]
30 pub fn validate_all(tasks: &[SerializedTask]) -> Vec<(usize, String)> {
31 tasks
32 .iter()
33 .enumerate()
34 .filter_map(|(idx, task)| task.validate().err().map(|e| (idx, e)))
35 .collect()
36 }
37
38 #[must_use]
54 pub fn filter_by_state<F>(tasks: &[SerializedTask], predicate: F) -> Vec<&SerializedTask>
55 where
56 F: Fn(&TaskState) -> bool,
57 {
58 tasks
59 .iter()
60 .filter(|task| predicate(&task.metadata.state))
61 .collect()
62 }
63
64 #[must_use]
80 pub fn filter_high_priority(tasks: &[SerializedTask]) -> Vec<&SerializedTask> {
81 tasks
82 .iter()
83 .filter(|task| task.metadata.is_high_priority())
84 .collect()
85 }
86
87 pub fn sort_by_priority(tasks: &mut [SerializedTask]) {
105 tasks.sort_by(|a, b| b.metadata.priority.cmp(&a.metadata.priority));
106 }
107
108 #[must_use]
125 pub fn count_by_state(tasks: &[SerializedTask]) -> std::collections::HashMap<String, usize> {
126 let mut counts = std::collections::HashMap::new();
127 for task in tasks {
128 *counts
129 .entry(task.metadata.state.name().to_string())
130 .or_insert(0) += 1;
131 }
132 counts
133 }
134
135 #[inline]
150 #[must_use]
151 pub fn has_expired_tasks(tasks: &[SerializedTask]) -> bool {
152 tasks.iter().any(super::SerializedTask::is_expired)
153 }
154
155 #[inline]
171 #[must_use]
172 pub fn get_expired_tasks(tasks: &[SerializedTask]) -> Vec<&SerializedTask> {
173 tasks.iter().filter(|task| task.is_expired()).collect()
174 }
175
176 #[must_use]
191 pub fn total_payload_size(tasks: &[SerializedTask]) -> usize {
192 tasks.iter().map(super::SerializedTask::payload_size).sum()
193 }
194
195 #[must_use]
212 pub fn filter_with_dependencies(tasks: &[SerializedTask]) -> Vec<&SerializedTask> {
213 tasks
214 .iter()
215 .filter(|task| task.metadata.has_dependencies())
216 .collect()
217 }
218
219 #[must_use]
236 pub fn filter_retryable(tasks: &[SerializedTask]) -> Vec<&SerializedTask> {
237 tasks.iter().filter(|task| task.can_retry()).collect()
238 }
239
240 #[must_use]
256 pub fn filter_by_name_pattern<'a>(
257 tasks: &'a [SerializedTask],
258 pattern: &str,
259 ) -> Vec<&'a SerializedTask> {
260 tasks
261 .iter()
262 .filter(|task| task.metadata.name.contains(pattern))
263 .collect()
264 }
265
266 #[must_use]
287 pub fn group_by_workflow_id(
288 tasks: &[SerializedTask],
289 ) -> std::collections::HashMap<Uuid, Vec<&SerializedTask>> {
290 let mut groups = std::collections::HashMap::new();
291 for task in tasks {
292 if let Some(group_id) = task.metadata.group_id {
293 groups.entry(group_id).or_insert_with(Vec::new).push(task);
294 }
295 }
296 groups
297 }
298
299 #[must_use]
315 pub fn filter_terminal(tasks: &[SerializedTask]) -> Vec<&SerializedTask> {
316 tasks.iter().filter(|task| task.is_terminal()).collect()
317 }
318
319 #[must_use]
335 pub fn filter_active(tasks: &[SerializedTask]) -> Vec<&SerializedTask> {
336 tasks.iter().filter(|task| task.is_active()).collect()
337 }
338
339 #[must_use]
354 pub fn average_payload_size(tasks: &[SerializedTask]) -> usize {
355 if tasks.is_empty() {
356 0
357 } else {
358 total_payload_size(tasks) / tasks.len()
359 }
360 }
361
362 #[must_use]
377 pub fn find_oldest(tasks: &[SerializedTask]) -> Option<&SerializedTask> {
378 tasks.iter().min_by_key(|task| task.metadata.created_at)
379 }
380
381 #[must_use]
396 pub fn find_newest(tasks: &[SerializedTask]) -> Option<&SerializedTask> {
397 tasks.iter().max_by_key(|task| task.metadata.created_at)
398 }
399}
400
401#[derive(Debug, Clone, Serialize, Deserialize)]
403pub struct TaskMetadata {
404 pub id: TaskId,
406
407 pub name: String,
409
410 pub state: TaskState,
412
413 pub created_at: DateTime<Utc>,
415
416 pub updated_at: DateTime<Utc>,
418
419 pub max_retries: u32,
421
422 pub timeout_secs: Option<u64>,
424
425 pub priority: i32,
427
428 #[serde(skip_serializing_if = "Option::is_none")]
430 pub group_id: Option<Uuid>,
431
432 #[serde(skip_serializing_if = "Option::is_none")]
434 pub chord_id: Option<Uuid>,
435
436 #[serde(skip_serializing_if = "HashSet::is_empty", default)]
438 pub dependencies: HashSet<TaskId>,
439}
440
441impl TaskMetadata {
442 #[inline]
443 #[must_use]
444 pub fn new(name: String) -> Self {
445 let now = Utc::now();
446 Self {
447 id: Uuid::new_v4(),
448 name,
449 state: TaskState::Pending,
450 created_at: now,
451 updated_at: now,
452 max_retries: 3,
453 timeout_secs: None,
454 priority: 0,
455 group_id: None,
456 chord_id: None,
457 dependencies: HashSet::new(),
458 }
459 }
460
461 #[inline]
462 #[must_use]
463 pub fn with_max_retries(mut self, max_retries: u32) -> Self {
464 self.max_retries = max_retries;
465 self
466 }
467
468 #[inline]
469 #[must_use]
470 pub fn with_timeout(mut self, timeout_secs: u64) -> Self {
471 self.timeout_secs = Some(timeout_secs);
472 self
473 }
474
475 #[inline]
476 #[must_use]
477 pub fn with_priority(mut self, priority: i32) -> Self {
478 self.priority = priority;
479 self
480 }
481
482 #[inline]
484 #[must_use]
485 pub fn with_group_id(mut self, group_id: Uuid) -> Self {
486 self.group_id = Some(group_id);
487 self
488 }
489
490 #[inline]
492 #[must_use]
493 pub fn with_chord_id(mut self, chord_id: Uuid) -> Self {
494 self.chord_id = Some(chord_id);
495 self
496 }
497
498 #[inline]
500 #[must_use]
501 pub fn age(&self) -> chrono::Duration {
502 Utc::now() - self.created_at
503 }
504
505 #[inline]
507 #[must_use]
508 #[allow(clippy::cast_possible_wrap)]
509 pub fn is_expired(&self) -> bool {
510 if let Some(timeout_secs) = self.timeout_secs {
511 let elapsed = (Utc::now() - self.created_at).num_seconds();
512 elapsed > timeout_secs as i64
513 } else {
514 false
515 }
516 }
517
518 #[inline]
520 #[must_use]
521 pub fn is_terminal(&self) -> bool {
522 self.state.is_terminal()
523 }
524
525 #[inline]
527 #[must_use]
528 pub fn is_active(&self) -> bool {
529 matches!(
530 self.state,
531 TaskState::Pending | TaskState::Reserved | TaskState::Running | TaskState::Retrying(_)
532 )
533 }
534
535 pub fn validate(&self) -> Result<(), String> {
547 if self.name.is_empty() {
548 return Err("Task name cannot be empty".to_string());
549 }
550
551 if self.max_retries > 1000 {
552 return Err("Max retries cannot exceed 1000".to_string());
553 }
554
555 if let Some(timeout) = self.timeout_secs {
556 if timeout == 0 {
557 return Err("Timeout must be at least 1 second".to_string());
558 }
559 if timeout > 86400 {
560 return Err("Timeout cannot exceed 24 hours (86400 seconds)".to_string());
561 }
562 }
563
564 Ok(())
565 }
566
567 #[inline]
569 #[must_use]
570 pub fn has_timeout(&self) -> bool {
571 self.timeout_secs.is_some()
572 }
573
574 #[inline]
576 #[must_use]
577 pub fn has_group_id(&self) -> bool {
578 self.group_id.is_some()
579 }
580
581 #[inline]
583 #[must_use]
584 pub fn has_chord_id(&self) -> bool {
585 self.chord_id.is_some()
586 }
587
588 #[inline]
590 #[must_use]
591 pub const fn has_priority(&self) -> bool {
592 self.priority != 0
593 }
594
595 #[inline]
597 #[must_use]
598 pub const fn is_high_priority(&self) -> bool {
599 self.priority > 0
600 }
601
602 #[inline]
604 #[must_use]
605 pub const fn is_low_priority(&self) -> bool {
606 self.priority < 0
607 }
608
609 #[inline]
611 #[must_use]
612 pub fn with_dependency(mut self, dependency: TaskId) -> Self {
613 self.dependencies.insert(dependency);
614 self
615 }
616
617 #[inline]
619 #[must_use]
620 pub fn with_dependencies(mut self, dependencies: impl IntoIterator<Item = TaskId>) -> Self {
621 self.dependencies.extend(dependencies);
622 self
623 }
624
625 #[inline]
627 #[must_use]
628 pub fn has_dependencies(&self) -> bool {
629 !self.dependencies.is_empty()
630 }
631
632 #[inline]
634 #[must_use]
635 pub fn dependency_count(&self) -> usize {
636 self.dependencies.len()
637 }
638
639 #[inline]
641 #[must_use]
642 pub fn depends_on(&self, task_id: &TaskId) -> bool {
643 self.dependencies.contains(task_id)
644 }
645
646 #[inline]
648 pub fn remove_dependency(&mut self, task_id: &TaskId) -> bool {
649 self.dependencies.remove(task_id)
650 }
651
652 #[inline]
654 pub fn clear_dependencies(&mut self) {
655 self.dependencies.clear();
656 }
657
658 #[inline]
662 #[must_use]
663 pub fn is_pending(&self) -> bool {
664 matches!(self.state, TaskState::Pending)
665 }
666
667 #[inline]
669 #[must_use]
670 pub fn is_running(&self) -> bool {
671 matches!(self.state, TaskState::Running)
672 }
673
674 #[inline]
676 #[must_use]
677 pub fn is_succeeded(&self) -> bool {
678 matches!(self.state, TaskState::Succeeded(_))
679 }
680
681 #[inline]
683 #[must_use]
684 pub fn is_failed(&self) -> bool {
685 matches!(self.state, TaskState::Failed(_))
686 }
687
688 #[inline]
690 #[must_use]
691 pub fn is_retrying(&self) -> bool {
692 matches!(self.state, TaskState::Retrying(_))
693 }
694
695 #[inline]
697 #[must_use]
698 pub fn is_reserved(&self) -> bool {
699 matches!(self.state, TaskState::Reserved)
700 }
701
702 #[inline]
716 #[must_use]
717 #[allow(clippy::cast_possible_wrap)]
718 pub fn time_remaining(&self) -> Option<chrono::Duration> {
719 self.timeout_secs.and_then(|timeout| {
720 let elapsed = Utc::now() - self.created_at;
721 let timeout_duration = chrono::Duration::seconds(timeout as i64);
722 let remaining = timeout_duration - elapsed;
723 if remaining.num_seconds() > 0 {
724 Some(remaining)
725 } else {
726 None
727 }
728 })
729 }
730
731 #[inline]
742 #[must_use]
743 pub fn time_elapsed(&self) -> chrono::Duration {
744 Utc::now() - self.created_at
745 }
746
747 #[inline]
763 #[must_use]
764 pub fn can_retry(&self) -> bool {
765 self.state.can_retry(self.max_retries)
766 }
767
768 #[inline]
779 #[must_use]
780 pub const fn retry_count(&self) -> u32 {
781 self.state.retry_count()
782 }
783
784 #[inline]
795 #[must_use]
796 pub const fn retries_remaining(&self) -> u32 {
797 let current = self.retry_count();
798 self.max_retries.saturating_sub(current)
799 }
800
801 #[inline]
815 #[must_use]
816 pub fn is_part_of_workflow(&self) -> bool {
817 self.group_id.is_some() || self.chord_id.is_some()
818 }
819
820 #[inline]
822 #[must_use]
823 pub fn get_group_id(&self) -> Option<&Uuid> {
824 self.group_id.as_ref()
825 }
826
827 #[inline]
829 #[must_use]
830 pub fn get_chord_id(&self) -> Option<&Uuid> {
831 self.chord_id.as_ref()
832 }
833
834 #[inline]
847 pub fn mark_as_running(&mut self) {
848 self.state = TaskState::Running;
849 self.updated_at = Utc::now();
850 }
851
852 #[inline]
863 pub fn mark_as_succeeded(&mut self, result: Vec<u8>) {
864 self.state = TaskState::Succeeded(result);
865 self.updated_at = Utc::now();
866 }
867
868 #[inline]
879 pub fn mark_as_failed(&mut self, error: impl Into<String>) {
880 self.state = TaskState::Failed(error.into());
881 self.updated_at = Utc::now();
882 }
883
884 #[inline]
897 #[must_use]
898 pub fn with_new_id(&self) -> Self {
899 let now = Utc::now();
900 Self {
901 id: Uuid::new_v4(),
902 name: self.name.clone(),
903 state: TaskState::Pending,
904 created_at: now,
905 updated_at: now,
906 max_retries: self.max_retries,
907 timeout_secs: self.timeout_secs,
908 priority: self.priority,
909 group_id: self.group_id,
910 chord_id: self.chord_id,
911 dependencies: self.dependencies.clone(),
912 }
913 }
914}
915
916impl fmt::Display for TaskMetadata {
917 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
918 write!(
919 f,
920 "Task[{}] name={} state={} priority={} retries={}/{}",
921 &self.id.to_string()[..8],
922 self.name,
923 self.state,
924 self.priority,
925 self.state.retry_count(),
926 self.max_retries
927 )?;
928
929 if let Some(timeout) = self.timeout_secs {
930 write!(f, " timeout={timeout}s")?;
931 }
932
933 if let Some(chord_id) = self.chord_id {
934 write!(f, " chord={}", &chord_id.to_string()[..8])?;
935 }
936
937 Ok(())
938 }
939}
940
941#[async_trait::async_trait]
943pub trait Task: Send + Sync {
944 type Input: Serialize + for<'de> Deserialize<'de> + Send;
946
947 type Output: Serialize + for<'de> Deserialize<'de> + Send;
949
950 async fn execute(&self, input: Self::Input) -> crate::Result<Self::Output>;
952
953 fn name(&self) -> &str;
955}
956
957#[derive(Debug, Clone, Serialize, Deserialize)]
996pub struct SerializedTask {
997 pub metadata: TaskMetadata,
999
1000 pub payload: Vec<u8>,
1002}
1003
1004impl SerializedTask {
1005 #[inline]
1006 #[must_use]
1007 pub fn new(name: String, payload: Vec<u8>) -> Self {
1008 Self {
1009 metadata: TaskMetadata::new(name),
1010 payload,
1011 }
1012 }
1013
1014 #[inline]
1015 #[must_use]
1016 pub fn with_priority(mut self, priority: i32) -> Self {
1017 self.metadata.priority = priority;
1018 self
1019 }
1020
1021 #[inline]
1022 #[must_use]
1023 pub fn with_max_retries(mut self, max_retries: u32) -> Self {
1024 self.metadata.max_retries = max_retries;
1025 self
1026 }
1027
1028 #[inline]
1029 #[must_use]
1030 pub fn with_timeout(mut self, timeout_secs: u64) -> Self {
1031 self.metadata.timeout_secs = Some(timeout_secs);
1032 self
1033 }
1034
1035 #[inline]
1037 #[must_use]
1038 pub fn with_group_id(mut self, group_id: Uuid) -> Self {
1039 self.metadata.group_id = Some(group_id);
1040 self
1041 }
1042
1043 #[inline]
1045 #[must_use]
1046 pub fn with_chord_id(mut self, chord_id: Uuid) -> Self {
1047 self.metadata.chord_id = Some(chord_id);
1048 self
1049 }
1050
1051 #[inline]
1053 #[must_use]
1054 pub fn age(&self) -> chrono::Duration {
1055 self.metadata.age()
1056 }
1057
1058 #[inline]
1060 #[must_use]
1061 pub fn is_expired(&self) -> bool {
1062 self.metadata.is_expired()
1063 }
1064
1065 #[inline]
1067 #[must_use]
1068 pub fn is_terminal(&self) -> bool {
1069 self.metadata.is_terminal()
1070 }
1071
1072 #[inline]
1074 #[must_use]
1075 pub fn is_active(&self) -> bool {
1076 self.metadata.is_active()
1077 }
1078
1079 pub fn validate(&self) -> Result<(), String> {
1089 self.metadata.validate()?;
1090
1091 if self.payload.is_empty() {
1092 return Err("Task payload cannot be empty".to_string());
1093 }
1094
1095 if self.payload.len() > 1_048_576 {
1096 return Err(format!(
1097 "Task payload too large: {} bytes (max 1MB)",
1098 self.payload.len()
1099 ));
1100 }
1101
1102 Ok(())
1103 }
1104
1105 pub fn validate_with_limit(&self, max_payload_bytes: usize) -> Result<(), String> {
1111 self.metadata.validate()?;
1112
1113 if self.payload.is_empty() {
1114 return Err("Task payload cannot be empty".to_string());
1115 }
1116
1117 if self.payload.len() > max_payload_bytes {
1118 return Err(format!(
1119 "Task payload too large: {} bytes (max {} bytes)",
1120 self.payload.len(),
1121 max_payload_bytes
1122 ));
1123 }
1124
1125 Ok(())
1126 }
1127
1128 #[inline]
1130 #[must_use]
1131 pub fn has_timeout(&self) -> bool {
1132 self.metadata.has_timeout()
1133 }
1134
1135 #[inline]
1137 #[must_use]
1138 pub fn has_group_id(&self) -> bool {
1139 self.metadata.has_group_id()
1140 }
1141
1142 #[inline]
1144 #[must_use]
1145 pub fn has_chord_id(&self) -> bool {
1146 self.metadata.has_chord_id()
1147 }
1148
1149 #[inline]
1151 #[must_use]
1152 pub fn has_priority(&self) -> bool {
1153 self.metadata.has_priority()
1154 }
1155
1156 #[inline]
1158 #[must_use]
1159 pub const fn payload_size(&self) -> usize {
1160 self.payload.len()
1161 }
1162
1163 #[inline]
1165 #[must_use]
1166 pub fn has_empty_payload(&self) -> bool {
1167 self.payload.is_empty()
1168 }
1169
1170 #[inline]
1172 #[must_use]
1173 pub fn with_dependency(mut self, dependency: TaskId) -> Self {
1174 self.metadata.dependencies.insert(dependency);
1175 self
1176 }
1177
1178 #[inline]
1180 #[must_use]
1181 pub fn with_dependencies(mut self, dependencies: impl IntoIterator<Item = TaskId>) -> Self {
1182 self.metadata.dependencies.extend(dependencies);
1183 self
1184 }
1185
1186 #[inline]
1188 #[must_use]
1189 pub fn has_dependencies(&self) -> bool {
1190 self.metadata.has_dependencies()
1191 }
1192
1193 #[inline]
1195 #[must_use]
1196 pub fn dependency_count(&self) -> usize {
1197 self.metadata.dependency_count()
1198 }
1199
1200 #[inline]
1202 #[must_use]
1203 pub fn depends_on(&self, task_id: &TaskId) -> bool {
1204 self.metadata.depends_on(task_id)
1205 }
1206
1207 #[inline]
1209 #[must_use]
1210 pub fn is_high_priority(&self) -> bool {
1211 self.metadata.is_high_priority()
1212 }
1213
1214 #[inline]
1216 #[must_use]
1217 pub fn is_low_priority(&self) -> bool {
1218 self.metadata.is_low_priority()
1219 }
1220
1221 #[inline]
1225 #[must_use]
1226 pub fn is_pending(&self) -> bool {
1227 self.metadata.is_pending()
1228 }
1229
1230 #[inline]
1232 #[must_use]
1233 pub fn is_running(&self) -> bool {
1234 self.metadata.is_running()
1235 }
1236
1237 #[inline]
1239 #[must_use]
1240 pub fn is_succeeded(&self) -> bool {
1241 self.metadata.is_succeeded()
1242 }
1243
1244 #[inline]
1246 #[must_use]
1247 pub fn is_failed(&self) -> bool {
1248 self.metadata.is_failed()
1249 }
1250
1251 #[inline]
1253 #[must_use]
1254 pub fn is_retrying(&self) -> bool {
1255 self.metadata.is_retrying()
1256 }
1257
1258 #[inline]
1260 #[must_use]
1261 pub fn is_reserved(&self) -> bool {
1262 self.metadata.is_reserved()
1263 }
1264
1265 #[inline]
1269 #[must_use]
1270 pub fn time_remaining(&self) -> Option<chrono::Duration> {
1271 self.metadata.time_remaining()
1272 }
1273
1274 #[inline]
1276 #[must_use]
1277 pub fn time_elapsed(&self) -> chrono::Duration {
1278 self.metadata.time_elapsed()
1279 }
1280
1281 #[inline]
1285 #[must_use]
1286 pub fn can_retry(&self) -> bool {
1287 self.metadata.can_retry()
1288 }
1289
1290 #[inline]
1292 #[must_use]
1293 pub const fn retry_count(&self) -> u32 {
1294 self.metadata.retry_count()
1295 }
1296
1297 #[inline]
1299 #[must_use]
1300 pub const fn retries_remaining(&self) -> u32 {
1301 self.metadata.retries_remaining()
1302 }
1303
1304 #[inline]
1308 #[must_use]
1309 pub fn is_part_of_workflow(&self) -> bool {
1310 self.metadata.is_part_of_workflow()
1311 }
1312
1313 #[inline]
1315 #[must_use]
1316 pub fn get_group_id(&self) -> Option<&Uuid> {
1317 self.metadata.get_group_id()
1318 }
1319
1320 #[inline]
1322 #[must_use]
1323 pub fn get_chord_id(&self) -> Option<&Uuid> {
1324 self.metadata.get_chord_id()
1325 }
1326}
1327
1328impl fmt::Display for SerializedTask {
1329 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1330 write!(
1331 f,
1332 "SerializedTask[{}] name={} payload={}B state={}",
1333 &self.metadata.id.to_string()[..8],
1334 self.metadata.name,
1335 self.payload.len(),
1336 self.metadata.state
1337 )?;
1338 if self.metadata.has_priority() {
1339 write!(f, " priority={}", self.metadata.priority)?;
1340 }
1341 if let Some(group_id) = self.metadata.group_id {
1342 write!(f, " group={}", &group_id.to_string()[..8])?;
1343 }
1344 if let Some(chord_id) = self.metadata.chord_id {
1345 write!(f, " chord={}", &chord_id.to_string()[..8])?;
1346 }
1347 Ok(())
1348 }
1349}
1350
1351#[cfg(test)]
1352mod tests {
1353 use super::*;
1354
1355 #[test]
1356 fn test_task_metadata_creation() {
1357 let metadata = TaskMetadata::new("test_task".to_string())
1358 .with_max_retries(5)
1359 .with_timeout(60)
1360 .with_priority(10);
1361
1362 assert_eq!(metadata.name, "test_task");
1363 assert_eq!(metadata.max_retries, 5);
1364 assert_eq!(metadata.timeout_secs, Some(60));
1365 assert_eq!(metadata.priority, 10);
1366 assert_eq!(metadata.state, TaskState::Pending);
1367 }
1368
1369 #[test]
1370 fn test_task_dependencies() {
1371 let dep1 = TaskId::new_v4();
1372 let dep2 = TaskId::new_v4();
1373
1374 let metadata = TaskMetadata::new("test_task".to_string())
1375 .with_dependency(dep1)
1376 .with_dependency(dep2);
1377
1378 assert!(metadata.has_dependencies());
1379 assert_eq!(metadata.dependency_count(), 2);
1380 assert!(metadata.depends_on(&dep1));
1381 assert!(metadata.depends_on(&dep2));
1382 }
1383
1384 #[test]
1385 fn test_task_with_dependencies() {
1386 let dep1 = TaskId::new_v4();
1387 let dep2 = TaskId::new_v4();
1388 let deps = vec![dep1, dep2];
1389
1390 let metadata = TaskMetadata::new("test_task".to_string()).with_dependencies(deps);
1391
1392 assert_eq!(metadata.dependency_count(), 2);
1393 assert!(metadata.depends_on(&dep1));
1394 assert!(metadata.depends_on(&dep2));
1395 }
1396
1397 #[test]
1398 fn test_remove_dependency() {
1399 let dep1 = TaskId::new_v4();
1400 let dep2 = TaskId::new_v4();
1401
1402 let mut metadata = TaskMetadata::new("test_task".to_string())
1403 .with_dependency(dep1)
1404 .with_dependency(dep2);
1405
1406 assert_eq!(metadata.dependency_count(), 2);
1407
1408 let removed = metadata.remove_dependency(&dep1);
1409 assert!(removed);
1410 assert_eq!(metadata.dependency_count(), 1);
1411 assert!(!metadata.depends_on(&dep1));
1412 assert!(metadata.depends_on(&dep2));
1413 }
1414
1415 #[test]
1416 fn test_clear_dependencies() {
1417 let dep1 = TaskId::new_v4();
1418 let dep2 = TaskId::new_v4();
1419
1420 let mut metadata = TaskMetadata::new("test_task".to_string())
1421 .with_dependency(dep1)
1422 .with_dependency(dep2);
1423
1424 assert!(metadata.has_dependencies());
1425 metadata.clear_dependencies();
1426 assert!(!metadata.has_dependencies());
1427 assert_eq!(metadata.dependency_count(), 0);
1428 }
1429
1430 #[test]
1431 fn test_serialized_task_dependencies() {
1432 let dep1 = TaskId::new_v4();
1433 let dep2 = TaskId::new_v4();
1434
1435 let task = SerializedTask::new("test_task".to_string(), vec![1, 2, 3])
1436 .with_dependency(dep1)
1437 .with_dependency(dep2);
1438
1439 assert!(task.has_dependencies());
1440 assert_eq!(task.dependency_count(), 2);
1441 assert!(task.depends_on(&dep1));
1442 assert!(task.depends_on(&dep2));
1443 }
1444
1445 #[cfg(test)]
1447 mod integration_tests {
1448 use super::*;
1449 use crate::{StateHistory, TaskState};
1450
1451 #[test]
1452 fn test_complete_task_lifecycle() {
1453 let mut task = SerializedTask::new("process_data".to_string(), vec![1, 2, 3, 4, 5])
1455 .with_priority(5)
1456 .with_max_retries(3)
1457 .with_timeout(60);
1458
1459 assert_eq!(task.metadata.state, TaskState::Pending);
1460 assert!(task.is_active());
1461 assert!(!task.is_terminal());
1462
1463 let mut history = StateHistory::with_initial(task.metadata.state.clone());
1465
1466 task.metadata.state = TaskState::Received;
1468 history.transition(task.metadata.state.clone());
1469
1470 task.metadata.state = TaskState::Reserved;
1472 history.transition(task.metadata.state.clone());
1473
1474 task.metadata.state = TaskState::Running;
1476 history.transition(task.metadata.state.clone());
1477
1478 task.metadata.state = TaskState::Succeeded(vec![10, 20, 30]);
1480 history.transition(task.metadata.state.clone());
1481
1482 assert!(task.is_terminal());
1483 assert!(!task.is_active());
1484 assert_eq!(history.transition_count(), 4);
1485 }
1486
1487 #[test]
1488 fn test_task_retry_lifecycle() {
1489 let mut task =
1490 SerializedTask::new("failing_task".to_string(), vec![1, 2, 3]).with_max_retries(3);
1491
1492 let mut history = StateHistory::with_initial(TaskState::Pending);
1493
1494 task.metadata.state = TaskState::Running;
1496 history.transition(task.metadata.state.clone());
1497
1498 task.metadata.state = TaskState::Failed("Network error".to_string());
1499 history.transition(task.metadata.state.clone());
1500
1501 assert!(task.metadata.state.can_retry(task.metadata.max_retries));
1503
1504 task.metadata.state = TaskState::Retrying(1);
1506 history.transition(task.metadata.state.clone());
1507 assert_eq!(task.metadata.state.retry_count(), 1);
1508
1509 task.metadata.state = TaskState::Failed("Still failing".to_string());
1510 history.transition(task.metadata.state.clone());
1511
1512 task.metadata.state = TaskState::Retrying(2);
1514 history.transition(task.metadata.state.clone());
1515 assert_eq!(task.metadata.state.retry_count(), 2);
1516
1517 task.metadata.state = TaskState::Succeeded(vec![]);
1519 history.transition(task.metadata.state.clone());
1520
1521 assert!(task.is_terminal());
1522 assert_eq!(history.transition_count(), 6);
1523 }
1524
1525 #[test]
1526 fn test_task_with_dependencies_lifecycle() {
1527 let parent1_id = TaskId::new_v4();
1529 let parent2_id = TaskId::new_v4();
1530
1531 let child_task = SerializedTask::new("child_task".to_string(), vec![1, 2, 3])
1533 .with_dependency(parent1_id)
1534 .with_dependency(parent2_id)
1535 .with_priority(10);
1536
1537 assert!(child_task.has_dependencies());
1538 assert_eq!(child_task.dependency_count(), 2);
1539 assert!(child_task.depends_on(&parent1_id));
1540 assert!(child_task.depends_on(&parent2_id));
1541
1542 assert_eq!(child_task.metadata.priority, 10);
1544 assert!(child_task.is_high_priority());
1545 }
1546
1547 #[test]
1548 fn test_task_serialization_roundtrip() {
1549 let original = SerializedTask::new("test_task".to_string(), vec![1, 2, 3, 4, 5])
1550 .with_priority(5)
1551 .with_max_retries(3)
1552 .with_timeout(120)
1553 .with_dependency(TaskId::new_v4());
1554
1555 let json = serde_json::to_string(&original).expect("Failed to serialize");
1557
1558 let deserialized: SerializedTask =
1560 serde_json::from_str(&json).expect("Failed to deserialize");
1561
1562 assert_eq!(deserialized.metadata.name, original.metadata.name);
1563 assert_eq!(deserialized.metadata.priority, original.metadata.priority);
1564 assert_eq!(
1565 deserialized.metadata.max_retries,
1566 original.metadata.max_retries
1567 );
1568 assert_eq!(
1569 deserialized.metadata.timeout_secs,
1570 original.metadata.timeout_secs
1571 );
1572 assert_eq!(deserialized.payload, original.payload);
1573 assert_eq!(deserialized.dependency_count(), original.dependency_count());
1574 }
1575
1576 #[test]
1577 fn test_task_validation_lifecycle() {
1578 let valid_task = SerializedTask::new("valid_task".to_string(), vec![1, 2, 3])
1580 .with_max_retries(5)
1581 .with_timeout(30);
1582
1583 assert!(valid_task.validate().is_ok());
1584
1585 let mut invalid_task = SerializedTask::new(String::new(), vec![1, 2, 3]);
1587 assert!(invalid_task.metadata.validate().is_err());
1588
1589 invalid_task =
1591 SerializedTask::new("task".to_string(), vec![1, 2, 3]).with_max_retries(10000);
1592 assert!(invalid_task.metadata.validate().is_err());
1593
1594 let mut invalid_metadata = TaskMetadata::new("task".to_string());
1596 invalid_metadata.timeout_secs = Some(0);
1597 assert!(invalid_metadata.validate().is_err());
1598 }
1599
1600 #[test]
1601 fn test_task_expiration_lifecycle() {
1602 let task =
1604 SerializedTask::new("expiring_task".to_string(), vec![1, 2, 3]).with_timeout(1);
1605
1606 assert!(!task.is_expired());
1608
1609 std::thread::sleep(std::time::Duration::from_secs(2));
1611
1612 assert!(task.is_expired());
1614 }
1615
1616 #[test]
1617 fn test_workflow_with_multiple_dependencies() {
1618 use crate::TaskDag;
1619
1620 let mut dag = TaskDag::new();
1623
1624 let task1 = TaskId::new_v4();
1625 let task2 = TaskId::new_v4();
1626 let task3 = TaskId::new_v4();
1627 let task4 = TaskId::new_v4();
1628
1629 dag.add_node(task1, "load_data");
1630 dag.add_node(task2, "transform_data");
1631 dag.add_node(task3, "save_results");
1632 dag.add_node(task4, "validate_data");
1633
1634 dag.add_dependency(task2, task1).unwrap();
1636 dag.add_dependency(task4, task1).unwrap();
1638 dag.add_dependency(task3, task2).unwrap();
1640 dag.add_dependency(task3, task4).unwrap();
1641
1642 assert!(dag.validate().is_ok());
1644
1645 let order = dag.topological_sort().unwrap();
1647 assert_eq!(order.len(), 4);
1648
1649 assert_eq!(order[0], task1);
1651 assert_eq!(order[3], task3);
1653 }
1654
1655 #[test]
1656 fn test_task_state_history_full_lifecycle() {
1657 let mut history = StateHistory::with_initial(TaskState::Pending);
1658
1659 history.transition(TaskState::Received);
1661 history.transition(TaskState::Reserved);
1662 history.transition(TaskState::Running);
1663 history.transition(TaskState::Failed("Temporary error".to_string()));
1664 history.transition(TaskState::Retrying(1));
1665 history.transition(TaskState::Running);
1666 history.transition(TaskState::Succeeded(vec![1, 2, 3]));
1667
1668 assert_eq!(history.transition_count(), 7);
1669 assert!(history.current_state().unwrap().is_terminal());
1670
1671 assert!(history.has_been_in_state("RECEIVED"));
1673 assert!(history.has_been_in_state("RESERVED"));
1674 assert!(history.has_been_in_state("RUNNING"));
1675 assert!(history.has_been_in_state("FAILURE"));
1676 assert!(history.has_been_in_state("RETRYING"));
1677 assert!(history.has_been_in_state("SUCCESS"));
1678
1679 assert_eq!(history.current_state().unwrap().name(), "SUCCESS");
1681 }
1682 }
1683}