1use std::fmt;
5use std::path::PathBuf;
6use std::str::FromStr;
7
8use serde::{Deserialize, Serialize};
9use uuid::Uuid;
10use zeph_memory::store::graph_store::{GraphSummary, RawGraphStore};
11
12use super::error::OrchestrationError;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
30pub struct TaskId(pub u32);
31
32impl TaskId {
33 #[must_use]
35 pub fn index(self) -> usize {
36 self.0 as usize
37 }
38
39 #[must_use]
41 pub fn as_u32(self) -> u32 {
42 self.0
43 }
44}
45
46impl fmt::Display for TaskId {
47 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
48 write!(f, "{}", self.0)
49 }
50}
51
52#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
70pub struct GraphId(Uuid);
71
72impl GraphId {
73 #[must_use]
75 pub fn new() -> Self {
76 Self(Uuid::new_v4())
77 }
78}
79
80impl Default for GraphId {
81 fn default() -> Self {
82 Self::new()
83 }
84}
85
86impl fmt::Display for GraphId {
87 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88 write!(f, "{}", self.0)
89 }
90}
91
92impl FromStr for GraphId {
93 type Err = OrchestrationError;
94
95 fn from_str(s: &str) -> Result<Self, Self::Err> {
96 Uuid::parse_str(s)
97 .map(GraphId)
98 .map_err(|e| OrchestrationError::InvalidGraph(format!("invalid graph id '{s}': {e}")))
99 }
100}
101
102#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
126#[serde(rename_all = "snake_case")]
127pub enum TaskStatus {
128 Pending,
130 Ready,
132 Running,
134 Completed,
136 Failed,
138 Skipped,
140 Canceled,
142}
143
144impl TaskStatus {
145 #[must_use]
147 pub fn is_terminal(self) -> bool {
148 matches!(
149 self,
150 TaskStatus::Completed | TaskStatus::Failed | TaskStatus::Skipped | TaskStatus::Canceled
151 )
152 }
153}
154
155impl fmt::Display for TaskStatus {
156 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
157 match self {
158 TaskStatus::Pending => write!(f, "pending"),
159 TaskStatus::Ready => write!(f, "ready"),
160 TaskStatus::Running => write!(f, "running"),
161 TaskStatus::Completed => write!(f, "completed"),
162 TaskStatus::Failed => write!(f, "failed"),
163 TaskStatus::Skipped => write!(f, "skipped"),
164 TaskStatus::Canceled => write!(f, "canceled"),
165 }
166 }
167}
168
169#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
180#[serde(rename_all = "snake_case")]
181pub enum GraphStatus {
182 Created,
184 Running,
186 Completed,
188 Failed,
190 Canceled,
192 Paused,
194}
195
196impl fmt::Display for GraphStatus {
197 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
198 match self {
199 GraphStatus::Created => write!(f, "created"),
200 GraphStatus::Running => write!(f, "running"),
201 GraphStatus::Completed => write!(f, "completed"),
202 GraphStatus::Failed => write!(f, "failed"),
203 GraphStatus::Canceled => write!(f, "canceled"),
204 GraphStatus::Paused => write!(f, "paused"),
205 }
206 }
207}
208
209#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
225#[serde(rename_all = "snake_case")]
226pub enum FailureStrategy {
227 #[default]
229 Abort,
230 Retry,
232 Skip,
234 Ask,
236}
237
238impl fmt::Display for FailureStrategy {
239 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
240 match self {
241 FailureStrategy::Abort => write!(f, "abort"),
242 FailureStrategy::Retry => write!(f, "retry"),
243 FailureStrategy::Skip => write!(f, "skip"),
244 FailureStrategy::Ask => write!(f, "ask"),
245 }
246 }
247}
248
249impl FromStr for FailureStrategy {
250 type Err = OrchestrationError;
251
252 fn from_str(s: &str) -> Result<Self, Self::Err> {
253 match s {
254 "abort" => Ok(FailureStrategy::Abort),
255 "retry" => Ok(FailureStrategy::Retry),
256 "skip" => Ok(FailureStrategy::Skip),
257 "ask" => Ok(FailureStrategy::Ask),
258 other => Err(OrchestrationError::InvalidGraph(format!(
259 "unknown failure strategy '{other}': expected one of abort, retry, skip, ask"
260 ))),
261 }
262 }
263}
264
265#[derive(Debug, Clone, Serialize, Deserialize)]
272pub struct TaskResult {
273 pub output: String,
275 pub artifacts: Vec<PathBuf>,
277 pub duration_ms: u64,
279 pub agent_id: Option<String>,
281 pub agent_def: Option<String>,
283}
284
285#[derive(
303 Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize, schemars::JsonSchema,
304)]
305#[serde(rename_all = "snake_case")]
306pub enum ExecutionMode {
307 #[default]
309 Parallel,
310 Sequential,
313}
314
315#[derive(Debug, Clone, Serialize, Deserialize)]
333pub struct TaskNode {
334 pub id: TaskId,
336 pub title: String,
338 pub description: String,
340 pub agent_hint: Option<String>,
342 pub status: TaskStatus,
344 pub depends_on: Vec<TaskId>,
346 pub result: Option<TaskResult>,
348 pub assigned_agent: Option<String>,
350 pub retry_count: u32,
352 pub failure_strategy: Option<FailureStrategy>,
354 pub max_retries: Option<u32>,
356 #[serde(default)]
359 pub execution_mode: ExecutionMode,
360}
361
362impl TaskNode {
363 #[must_use]
365 pub fn new(id: u32, title: impl Into<String>, description: impl Into<String>) -> Self {
366 Self {
367 id: TaskId(id),
368 title: title.into(),
369 description: description.into(),
370 agent_hint: None,
371 status: TaskStatus::Pending,
372 depends_on: Vec::new(),
373 result: None,
374 assigned_agent: None,
375 retry_count: 0,
376 failure_strategy: None,
377 max_retries: None,
378 execution_mode: ExecutionMode::default(),
379 }
380 }
381}
382
383#[derive(Debug, Clone, Serialize, Deserialize)]
403pub struct TaskGraph {
404 pub id: GraphId,
406 pub goal: String,
408 pub tasks: Vec<TaskNode>,
410 pub status: GraphStatus,
412 pub default_failure_strategy: FailureStrategy,
414 pub default_max_retries: u32,
416 pub created_at: String,
418 pub finished_at: Option<String>,
420}
421
422impl TaskGraph {
423 #[must_use]
425 pub fn new(goal: impl Into<String>) -> Self {
426 Self {
427 id: GraphId::new(),
428 goal: goal.into(),
429 tasks: Vec::new(),
430 status: GraphStatus::Created,
431 default_failure_strategy: FailureStrategy::default(),
432 default_max_retries: 3,
433 created_at: chrono_now(),
434 finished_at: None,
435 }
436 }
437}
438
439pub(crate) fn chrono_now() -> String {
440 let secs = std::time::SystemTime::now()
443 .duration_since(std::time::UNIX_EPOCH)
444 .map_or(0, |d| d.as_secs());
445 let (y, mo, d, h, mi, s) = epoch_secs_to_datetime(secs);
448 format!("{y:04}-{mo:02}-{d:02}T{h:02}:{mi:02}:{s:02}Z")
449}
450
451fn epoch_secs_to_datetime(secs: u64) -> (u64, u8, u8, u8, u8, u8) {
453 let s = (secs % 60) as u8;
454 let mins = secs / 60;
455 let mi = (mins % 60) as u8;
456 let hours = mins / 60;
457 let h = (hours % 24) as u8;
458 let days = hours / 24; let (mut year, mut remaining_days) = {
463 let cycles = days / 146_097;
464 let rem = days % 146_097;
465 (1970 + cycles * 400, rem)
466 };
467 let centuries = (remaining_days / 36_524).min(3);
469 year += centuries * 100;
470 remaining_days -= centuries * 36_524;
471 let quads = remaining_days / 1_461;
473 year += quads * 4;
474 remaining_days -= quads * 1_461;
475 let extra_years = (remaining_days / 365).min(3);
477 year += extra_years;
478 remaining_days -= extra_years * 365;
479
480 let is_leap = (year % 4 == 0 && year % 100 != 0) || (year % 400 == 0);
481 let days_in_month: [u64; 12] = if is_leap {
482 [31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
483 } else {
484 [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
485 };
486
487 let mut month = 0u8;
488 for (i, &dim) in days_in_month.iter().enumerate() {
489 if remaining_days < dim {
490 month = u8::try_from(i + 1).unwrap_or(1);
492 break;
493 }
494 remaining_days -= dim;
495 }
496 let day = u8::try_from(remaining_days + 1).unwrap_or(1);
498
499 (year, month, day, h, mi, s)
500}
501
502const MAX_GOAL_LEN: usize = 1024;
504
505pub struct GraphPersistence<S: RawGraphStore> {
518 store: S,
519}
520
521impl<S: RawGraphStore> GraphPersistence<S> {
522 pub fn new(store: S) -> Self {
524 Self { store }
525 }
526
527 pub async fn save(&self, graph: &TaskGraph) -> Result<(), OrchestrationError> {
536 if graph.goal.len() > MAX_GOAL_LEN {
537 return Err(OrchestrationError::InvalidGraph(format!(
538 "goal exceeds {MAX_GOAL_LEN} character limit ({} chars)",
539 graph.goal.len()
540 )));
541 }
542 let json = serde_json::to_string(graph)
543 .map_err(|e| OrchestrationError::Persistence(e.to_string()))?;
544 self.store
545 .save_graph(
546 &graph.id.to_string(),
547 &graph.goal,
548 &graph.status.to_string(),
549 &json,
550 &graph.created_at,
551 graph.finished_at.as_deref(),
552 )
553 .await
554 .map_err(|e| OrchestrationError::Persistence(e.to_string()))
555 }
556
557 pub async fn load(&self, id: &GraphId) -> Result<Option<TaskGraph>, OrchestrationError> {
565 match self
566 .store
567 .load_graph(&id.to_string())
568 .await
569 .map_err(|e| OrchestrationError::Persistence(e.to_string()))?
570 {
571 Some(json) => {
572 let graph = serde_json::from_str(&json)
573 .map_err(|e| OrchestrationError::Persistence(e.to_string()))?;
574 Ok(Some(graph))
575 }
576 None => Ok(None),
577 }
578 }
579
580 pub async fn list(&self, limit: u32) -> Result<Vec<GraphSummary>, OrchestrationError> {
586 self.store
587 .list_graphs(limit)
588 .await
589 .map_err(|e| OrchestrationError::Persistence(e.to_string()))
590 }
591
592 pub async fn delete(&self, id: &GraphId) -> Result<bool, OrchestrationError> {
600 self.store
601 .delete_graph(&id.to_string())
602 .await
603 .map_err(|e| OrchestrationError::Persistence(e.to_string()))
604 }
605}
606
607#[cfg(test)]
608mod tests {
609 use super::*;
610
611 #[test]
612 fn test_taskid_display() {
613 assert_eq!(TaskId(3).to_string(), "3");
614 }
615
616 #[test]
617 fn test_graphid_display_and_new() {
618 let id = GraphId::new();
619 let s = id.to_string();
620 assert_eq!(s.len(), 36, "UUID string should be 36 chars");
621 let parsed: GraphId = s.parse().expect("should parse back");
622 assert_eq!(id, parsed);
623 }
624
625 #[test]
626 fn test_graphid_from_str_invalid() {
627 let err = "not-a-uuid".parse::<GraphId>();
628 assert!(err.is_err());
629 }
630
631 #[test]
632 fn test_task_status_is_terminal() {
633 assert!(TaskStatus::Completed.is_terminal());
634 assert!(TaskStatus::Failed.is_terminal());
635 assert!(TaskStatus::Skipped.is_terminal());
636 assert!(TaskStatus::Canceled.is_terminal());
637
638 assert!(!TaskStatus::Pending.is_terminal());
639 assert!(!TaskStatus::Ready.is_terminal());
640 assert!(!TaskStatus::Running.is_terminal());
641 }
642
643 #[test]
644 fn test_task_status_display() {
645 assert_eq!(TaskStatus::Pending.to_string(), "pending");
646 assert_eq!(TaskStatus::Ready.to_string(), "ready");
647 assert_eq!(TaskStatus::Running.to_string(), "running");
648 assert_eq!(TaskStatus::Completed.to_string(), "completed");
649 assert_eq!(TaskStatus::Failed.to_string(), "failed");
650 assert_eq!(TaskStatus::Skipped.to_string(), "skipped");
651 assert_eq!(TaskStatus::Canceled.to_string(), "canceled");
652 }
653
654 #[test]
655 fn test_failure_strategy_default() {
656 assert_eq!(FailureStrategy::default(), FailureStrategy::Abort);
657 }
658
659 #[test]
660 fn test_failure_strategy_display() {
661 assert_eq!(FailureStrategy::Abort.to_string(), "abort");
662 assert_eq!(FailureStrategy::Retry.to_string(), "retry");
663 assert_eq!(FailureStrategy::Skip.to_string(), "skip");
664 assert_eq!(FailureStrategy::Ask.to_string(), "ask");
665 }
666
667 #[test]
668 fn test_graph_status_display() {
669 assert_eq!(GraphStatus::Created.to_string(), "created");
670 assert_eq!(GraphStatus::Running.to_string(), "running");
671 assert_eq!(GraphStatus::Completed.to_string(), "completed");
672 assert_eq!(GraphStatus::Failed.to_string(), "failed");
673 assert_eq!(GraphStatus::Canceled.to_string(), "canceled");
674 assert_eq!(GraphStatus::Paused.to_string(), "paused");
675 }
676
677 #[test]
678 fn test_task_graph_serde_roundtrip() {
679 let mut graph = TaskGraph::new("test goal");
680 graph.tasks.push(TaskNode::new(0, "task 0", "do something"));
681 let json = serde_json::to_string(&graph).expect("serialize");
682 let restored: TaskGraph = serde_json::from_str(&json).expect("deserialize");
683 assert_eq!(graph.id, restored.id);
684 assert_eq!(graph.goal, restored.goal);
685 assert_eq!(graph.tasks.len(), restored.tasks.len());
686 }
687
688 #[test]
689 fn test_task_node_serde_roundtrip() {
690 let mut node = TaskNode::new(1, "compile", "run cargo build");
691 node.agent_hint = Some("rust-dev".to_string());
692 node.depends_on = vec![TaskId(0)];
693 let json = serde_json::to_string(&node).expect("serialize");
694 let restored: TaskNode = serde_json::from_str(&json).expect("deserialize");
695 assert_eq!(node.id, restored.id);
696 assert_eq!(node.title, restored.title);
697 assert_eq!(node.depends_on, restored.depends_on);
698 }
699
700 #[test]
701 fn test_task_result_serde_roundtrip() {
702 let result = TaskResult {
703 output: "ok".to_string(),
704 artifacts: vec![PathBuf::from("/tmp/out.bin")],
705 duration_ms: 500,
706 agent_id: Some("agent-1".to_string()),
707 agent_def: None,
708 };
709 let json = serde_json::to_string(&result).expect("serialize");
710 let restored: TaskResult = serde_json::from_str(&json).expect("deserialize");
711 assert_eq!(result.output, restored.output);
712 assert_eq!(result.duration_ms, restored.duration_ms);
713 assert_eq!(result.artifacts, restored.artifacts);
714 }
715
716 #[test]
717 fn test_failure_strategy_from_str() {
718 assert_eq!(
719 "abort".parse::<FailureStrategy>().unwrap(),
720 FailureStrategy::Abort
721 );
722 assert_eq!(
723 "retry".parse::<FailureStrategy>().unwrap(),
724 FailureStrategy::Retry
725 );
726 assert_eq!(
727 "skip".parse::<FailureStrategy>().unwrap(),
728 FailureStrategy::Skip
729 );
730 assert_eq!(
731 "ask".parse::<FailureStrategy>().unwrap(),
732 FailureStrategy::Ask
733 );
734 assert!("abort_all".parse::<FailureStrategy>().is_err());
735 assert!("".parse::<FailureStrategy>().is_err());
736 }
737
738 #[test]
739 fn test_chrono_now_iso8601_format() {
740 let ts = chrono_now();
741 assert_eq!(ts.len(), 20, "timestamp should be 20 chars: {ts}");
743 assert!(ts.ends_with('Z'), "should end with Z: {ts}");
744 assert!(ts.contains('T'), "should contain T: {ts}");
745 let year: u32 = ts[..4].parse().expect("year should be numeric");
747 assert!(year >= 2024, "year should be >= 2024: {year}");
748 }
749
750 #[test]
751 fn test_failure_strategy_serde_snake_case() {
752 assert_eq!(
753 serde_json::to_string(&FailureStrategy::Abort).unwrap(),
754 "\"abort\""
755 );
756 assert_eq!(
757 serde_json::to_string(&FailureStrategy::Retry).unwrap(),
758 "\"retry\""
759 );
760 assert_eq!(
761 serde_json::to_string(&FailureStrategy::Skip).unwrap(),
762 "\"skip\""
763 );
764 assert_eq!(
765 serde_json::to_string(&FailureStrategy::Ask).unwrap(),
766 "\"ask\""
767 );
768 }
769
770 #[test]
771 fn test_graph_persistence_save_rejects_long_goal() {
772 let long_goal = "x".repeat(MAX_GOAL_LEN + 1);
775 let mut graph = TaskGraph::new(long_goal);
776 graph.goal = "x".repeat(MAX_GOAL_LEN + 1);
777 assert!(
778 graph.goal.len() > MAX_GOAL_LEN,
779 "test setup: goal must exceed limit"
780 );
781 assert_eq!(MAX_GOAL_LEN, 1024);
784 }
785
786 #[test]
787 fn test_task_node_missing_execution_mode_deserializes_as_parallel() {
788 let json = r#"{
791 "id": 0,
792 "title": "t",
793 "description": "d",
794 "agent_hint": null,
795 "status": "pending",
796 "depends_on": [],
797 "result": null,
798 "assigned_agent": null,
799 "retry_count": 0,
800 "failure_strategy": null,
801 "max_retries": null
802 }"#;
803 let node: TaskNode = serde_json::from_str(json).expect("should deserialize old JSON");
804 assert_eq!(node.execution_mode, ExecutionMode::Parallel);
805 }
806
807 #[test]
808 fn test_execution_mode_serde_snake_case() {
809 assert_eq!(
810 serde_json::to_string(&ExecutionMode::Parallel).unwrap(),
811 "\"parallel\""
812 );
813 assert_eq!(
814 serde_json::to_string(&ExecutionMode::Sequential).unwrap(),
815 "\"sequential\""
816 );
817 let p: ExecutionMode = serde_json::from_str("\"parallel\"").unwrap();
818 assert_eq!(p, ExecutionMode::Parallel);
819 let s: ExecutionMode = serde_json::from_str("\"sequential\"").unwrap();
820 assert_eq!(s, ExecutionMode::Sequential);
821 }
822}