1use std::fmt;
5use std::path::PathBuf;
6use std::str::FromStr;
7
8use serde::{Deserialize, Serialize};
9use uuid::Uuid;
10pub use zeph_config::FailureStrategy;
11use zeph_memory::store::graph_store::{GraphSummary, RawGraphStore};
12
13use super::error::OrchestrationError;
14use super::verify_predicate::{PredicateOutcome, VerifyPredicate};
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
32pub struct TaskId(pub u32);
33
34impl TaskId {
35 #[must_use]
37 pub fn index(self) -> usize {
38 self.0 as usize
39 }
40
41 #[must_use]
43 pub fn as_u32(self) -> u32 {
44 self.0
45 }
46}
47
48impl fmt::Display for TaskId {
49 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50 write!(f, "{}", self.0)
51 }
52}
53
54#[derive(
72 Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize, schemars::JsonSchema,
73)]
74#[schemars(transparent)]
75pub struct PlanSlug(pub String);
76
77impl PlanSlug {
78 #[must_use]
80 pub fn as_str(&self) -> &str {
81 &self.0
82 }
83}
84
85impl From<String> for PlanSlug {
86 fn from(s: String) -> Self {
87 Self(s)
88 }
89}
90
91impl From<&str> for PlanSlug {
92 fn from(s: &str) -> Self {
93 Self(s.to_owned())
94 }
95}
96
97impl fmt::Display for PlanSlug {
98 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99 f.write_str(&self.0)
100 }
101}
102
103#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
121pub struct GraphId(Uuid);
122
123impl GraphId {
124 #[must_use]
126 pub fn new() -> Self {
127 Self(Uuid::new_v4())
128 }
129}
130
131impl Default for GraphId {
132 fn default() -> Self {
133 Self::new()
134 }
135}
136
137impl fmt::Display for GraphId {
138 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
139 write!(f, "{}", self.0)
140 }
141}
142
143impl FromStr for GraphId {
144 type Err = OrchestrationError;
145
146 fn from_str(s: &str) -> Result<Self, Self::Err> {
147 Uuid::parse_str(s)
148 .map(GraphId)
149 .map_err(|e| OrchestrationError::InvalidGraph(format!("invalid graph id '{s}': {e}")))
150 }
151}
152
153#[non_exhaustive]
177#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
178#[serde(rename_all = "snake_case")]
179pub enum TaskStatus {
180 Pending,
182 Ready,
184 Running,
186 Completed,
188 Failed,
190 Skipped,
192 Canceled,
194}
195
196impl TaskStatus {
197 #[must_use]
199 pub fn is_terminal(self) -> bool {
200 matches!(
201 self,
202 TaskStatus::Completed | TaskStatus::Failed | TaskStatus::Skipped | TaskStatus::Canceled
203 )
204 }
205}
206
207impl fmt::Display for TaskStatus {
208 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209 match self {
210 TaskStatus::Pending => write!(f, "pending"),
211 TaskStatus::Ready => write!(f, "ready"),
212 TaskStatus::Running => write!(f, "running"),
213 TaskStatus::Completed => write!(f, "completed"),
214 TaskStatus::Failed => write!(f, "failed"),
215 TaskStatus::Skipped => write!(f, "skipped"),
216 TaskStatus::Canceled => write!(f, "canceled"),
217 }
218 }
219}
220
221#[non_exhaustive]
232#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
233#[serde(rename_all = "snake_case")]
234pub enum GraphStatus {
235 Created,
237 Running,
239 Completed,
241 Failed,
243 Canceled,
245 Paused,
247}
248
249impl fmt::Display for GraphStatus {
250 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
251 match self {
252 GraphStatus::Created => write!(f, "created"),
253 GraphStatus::Running => write!(f, "running"),
254 GraphStatus::Completed => write!(f, "completed"),
255 GraphStatus::Failed => write!(f, "failed"),
256 GraphStatus::Canceled => write!(f, "canceled"),
257 GraphStatus::Paused => write!(f, "paused"),
258 }
259 }
260}
261
262#[derive(Debug, Clone, Serialize, Deserialize)]
269pub struct TaskResult {
270 pub output: String,
272 pub artifacts: Vec<PathBuf>,
274 pub duration_ms: u64,
276 pub agent_id: Option<String>,
278 pub agent_def: Option<String>,
280}
281
282#[non_exhaustive]
300#[derive(
301 Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize, schemars::JsonSchema,
302)]
303#[serde(rename_all = "snake_case")]
304pub enum ExecutionMode {
305 #[default]
307 Parallel,
308 Sequential,
311}
312
313#[derive(Debug, Clone, Serialize, Deserialize)]
331pub struct TaskNode {
332 pub id: TaskId,
334 pub title: String,
336 pub description: String,
338 pub agent_hint: Option<String>,
340 pub status: TaskStatus,
342 pub depends_on: Vec<TaskId>,
344 pub result: Option<TaskResult>,
346 pub assigned_agent: Option<String>,
348 pub retry_count: u32,
350 #[serde(default)]
352 pub predicate_rerun_count: u32,
353 pub failure_strategy: Option<FailureStrategy>,
355 pub max_retries: Option<u32>,
357 #[serde(default)]
360 pub execution_mode: ExecutionMode,
361 #[serde(default)]
368 pub verify_predicate: Option<VerifyPredicate>,
369 #[serde(default)]
375 pub predicate_outcome: Option<PredicateOutcome>,
376 #[serde(default, skip_serializing_if = "Option::is_none")]
379 pub execution_environment: Option<String>,
380
381 #[serde(default, skip_serializing_if = "Option::is_none")]
386 pub token_budget_cents: Option<f64>,
387}
388
389impl TaskNode {
390 #[must_use]
392 pub fn new(id: u32, title: impl Into<String>, description: impl Into<String>) -> Self {
393 Self {
394 id: TaskId(id),
395 title: title.into(),
396 description: description.into(),
397 agent_hint: None,
398 status: TaskStatus::Pending,
399 depends_on: Vec::new(),
400 result: None,
401 assigned_agent: None,
402 retry_count: 0,
403 predicate_rerun_count: 0,
404 failure_strategy: None,
405 max_retries: None,
406 execution_mode: ExecutionMode::default(),
407 verify_predicate: None,
408 predicate_outcome: None,
409 execution_environment: None,
410 token_budget_cents: None,
411 }
412 }
413}
414
415#[derive(Debug, Clone, Serialize, Deserialize)]
435pub struct TaskGraph {
436 pub id: GraphId,
438 pub goal: String,
440 pub tasks: Vec<TaskNode>,
442 pub status: GraphStatus,
444 pub default_failure_strategy: FailureStrategy,
446 pub default_max_retries: u32,
448 pub created_at: String,
450 pub finished_at: Option<String>,
452}
453
454impl TaskGraph {
455 #[must_use]
457 pub fn new(goal: impl Into<String>) -> Self {
458 Self {
459 id: GraphId::new(),
460 goal: goal.into(),
461 tasks: Vec::new(),
462 status: GraphStatus::Created,
463 default_failure_strategy: FailureStrategy::default(),
464 default_max_retries: 3,
465 created_at: chrono_now(),
466 finished_at: None,
467 }
468 }
469}
470
471pub(crate) fn chrono_now() -> String {
472 let secs = std::time::SystemTime::now()
475 .duration_since(std::time::UNIX_EPOCH)
476 .map_or(0, |d| d.as_secs());
477 let (y, mo, d, h, mi, s) = epoch_secs_to_datetime(secs);
480 format!("{y:04}-{mo:02}-{d:02}T{h:02}:{mi:02}:{s:02}Z")
481}
482
483fn epoch_secs_to_datetime(secs: u64) -> (u64, u8, u8, u8, u8, u8) {
485 let s = (secs % 60) as u8;
486 let mins = secs / 60;
487 let mi = (mins % 60) as u8;
488 let hours = mins / 60;
489 let h = (hours % 24) as u8;
490 let days = hours / 24; let (mut year, mut remaining_days) = {
495 let cycles = days / 146_097;
496 let rem = days % 146_097;
497 (1970 + cycles * 400, rem)
498 };
499 let centuries = (remaining_days / 36_524).min(3);
501 year += centuries * 100;
502 remaining_days -= centuries * 36_524;
503 let quads = remaining_days / 1_461;
505 year += quads * 4;
506 remaining_days -= quads * 1_461;
507 let extra_years = (remaining_days / 365).min(3);
509 year += extra_years;
510 remaining_days -= extra_years * 365;
511
512 let is_leap = (year % 4 == 0 && year % 100 != 0) || (year % 400 == 0);
513 let days_in_month: [u64; 12] = if is_leap {
514 [31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
515 } else {
516 [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
517 };
518
519 let mut month = 0u8;
520 for (i, &dim) in days_in_month.iter().enumerate() {
521 if remaining_days < dim {
522 month = u8::try_from(i + 1).unwrap_or(1);
524 break;
525 }
526 remaining_days -= dim;
527 }
528 let day = u8::try_from(remaining_days + 1).unwrap_or(1);
530
531 (year, month, day, h, mi, s)
532}
533
534const MAX_GOAL_LEN: usize = 1024;
536
537pub struct GraphPersistence<S: RawGraphStore> {
550 store: S,
551}
552
553impl<S: RawGraphStore> GraphPersistence<S> {
554 pub fn new(store: S) -> Self {
556 Self { store }
557 }
558
559 #[tracing::instrument(name = "orchestration.graph_store.save", skip(self, graph), fields(graph.id = %graph.id))]
568 pub async fn save(&self, graph: &TaskGraph) -> Result<(), OrchestrationError> {
569 if graph.goal.len() > MAX_GOAL_LEN {
570 return Err(OrchestrationError::InvalidGraph(format!(
571 "goal exceeds {MAX_GOAL_LEN} character limit ({} chars)",
572 graph.goal.len()
573 )));
574 }
575 let json = serde_json::to_string(graph)
576 .map_err(|e| OrchestrationError::Persistence(e.to_string()))?;
577 self.store
578 .save_graph(
579 &graph.id.to_string(),
580 &graph.goal,
581 &graph.status.to_string(),
582 &json,
583 &graph.created_at,
584 graph.finished_at.as_deref(),
585 )
586 .await
587 .map_err(|e| OrchestrationError::Persistence(e.to_string()))
588 }
589
590 #[tracing::instrument(name = "orchestration.graph_store.load", skip(self), fields(graph.id = %id))]
598 pub async fn load(&self, id: &GraphId) -> Result<Option<TaskGraph>, OrchestrationError> {
599 match self
600 .store
601 .load_graph(&id.to_string())
602 .await
603 .map_err(|e| OrchestrationError::Persistence(e.to_string()))?
604 {
605 Some(json) => {
606 let graph = serde_json::from_str(&json)
607 .map_err(|e| OrchestrationError::Persistence(e.to_string()))?;
608 Ok(Some(graph))
609 }
610 None => Ok(None),
611 }
612 }
613
614 #[tracing::instrument(name = "orchestration.graph_store.list", skip(self), fields(limit))]
620 pub async fn list(&self, limit: u32) -> Result<Vec<GraphSummary>, OrchestrationError> {
621 self.store
622 .list_graphs(limit)
623 .await
624 .map_err(|e| OrchestrationError::Persistence(e.to_string()))
625 }
626
627 #[tracing::instrument(name = "orchestration.graph_store.delete", skip(self), fields(graph.id = %id))]
635 pub async fn delete(&self, id: &GraphId) -> Result<bool, OrchestrationError> {
636 self.store
637 .delete_graph(&id.to_string())
638 .await
639 .map_err(|e| OrchestrationError::Persistence(e.to_string()))
640 }
641}
642
643#[cfg(test)]
644mod tests {
645 use super::*;
646
647 #[test]
648 fn test_taskid_display() {
649 assert_eq!(TaskId(3).to_string(), "3");
650 }
651
652 #[test]
653 fn test_graphid_display_and_new() {
654 let id = GraphId::new();
655 let s = id.to_string();
656 assert_eq!(s.len(), 36, "UUID string should be 36 chars");
657 let parsed: GraphId = s.parse().expect("should parse back");
658 assert_eq!(id, parsed);
659 }
660
661 #[test]
662 fn test_graphid_from_str_invalid() {
663 let err = "not-a-uuid".parse::<GraphId>();
664 assert!(err.is_err());
665 }
666
667 #[test]
668 fn test_task_status_is_terminal() {
669 assert!(TaskStatus::Completed.is_terminal());
670 assert!(TaskStatus::Failed.is_terminal());
671 assert!(TaskStatus::Skipped.is_terminal());
672 assert!(TaskStatus::Canceled.is_terminal());
673
674 assert!(!TaskStatus::Pending.is_terminal());
675 assert!(!TaskStatus::Ready.is_terminal());
676 assert!(!TaskStatus::Running.is_terminal());
677 }
678
679 #[test]
680 fn test_task_status_display() {
681 assert_eq!(TaskStatus::Pending.to_string(), "pending");
682 assert_eq!(TaskStatus::Ready.to_string(), "ready");
683 assert_eq!(TaskStatus::Running.to_string(), "running");
684 assert_eq!(TaskStatus::Completed.to_string(), "completed");
685 assert_eq!(TaskStatus::Failed.to_string(), "failed");
686 assert_eq!(TaskStatus::Skipped.to_string(), "skipped");
687 assert_eq!(TaskStatus::Canceled.to_string(), "canceled");
688 }
689
690 #[test]
691 fn test_failure_strategy_default() {
692 assert_eq!(FailureStrategy::default(), FailureStrategy::Abort);
693 }
694
695 #[test]
696 fn test_failure_strategy_display() {
697 assert_eq!(FailureStrategy::Abort.to_string(), "abort");
698 assert_eq!(FailureStrategy::Retry.to_string(), "retry");
699 assert_eq!(FailureStrategy::Skip.to_string(), "skip");
700 assert_eq!(FailureStrategy::Ask.to_string(), "ask");
701 }
702
703 #[test]
704 fn test_graph_status_display() {
705 assert_eq!(GraphStatus::Created.to_string(), "created");
706 assert_eq!(GraphStatus::Running.to_string(), "running");
707 assert_eq!(GraphStatus::Completed.to_string(), "completed");
708 assert_eq!(GraphStatus::Failed.to_string(), "failed");
709 assert_eq!(GraphStatus::Canceled.to_string(), "canceled");
710 assert_eq!(GraphStatus::Paused.to_string(), "paused");
711 }
712
713 #[test]
714 fn test_task_graph_serde_roundtrip() {
715 let mut graph = TaskGraph::new("test goal");
716 graph.tasks.push(TaskNode::new(0, "task 0", "do something"));
717 let json = serde_json::to_string(&graph).expect("serialize");
718 let restored: TaskGraph = serde_json::from_str(&json).expect("deserialize");
719 assert_eq!(graph.id, restored.id);
720 assert_eq!(graph.goal, restored.goal);
721 assert_eq!(graph.tasks.len(), restored.tasks.len());
722 }
723
724 #[test]
725 fn test_task_node_serde_roundtrip() {
726 let mut node = TaskNode::new(1, "compile", "run cargo build");
727 node.agent_hint = Some("rust-dev".to_string());
728 node.depends_on = vec![TaskId(0)];
729 let json = serde_json::to_string(&node).expect("serialize");
730 let restored: TaskNode = serde_json::from_str(&json).expect("deserialize");
731 assert_eq!(node.id, restored.id);
732 assert_eq!(node.title, restored.title);
733 assert_eq!(node.depends_on, restored.depends_on);
734 }
735
736 #[test]
737 fn test_task_result_serde_roundtrip() {
738 let result = TaskResult {
739 output: "ok".to_string(),
740 artifacts: vec![PathBuf::from("/tmp/out.bin")],
741 duration_ms: 500,
742 agent_id: Some("agent-1".to_string()),
743 agent_def: None,
744 };
745 let json = serde_json::to_string(&result).expect("serialize");
746 let restored: TaskResult = serde_json::from_str(&json).expect("deserialize");
747 assert_eq!(result.output, restored.output);
748 assert_eq!(result.duration_ms, restored.duration_ms);
749 assert_eq!(result.artifacts, restored.artifacts);
750 }
751
752 #[test]
753 fn test_failure_strategy_from_str() {
754 assert_eq!(
755 "abort".parse::<FailureStrategy>().unwrap(),
756 FailureStrategy::Abort
757 );
758 assert_eq!(
759 "retry".parse::<FailureStrategy>().unwrap(),
760 FailureStrategy::Retry
761 );
762 assert_eq!(
763 "skip".parse::<FailureStrategy>().unwrap(),
764 FailureStrategy::Skip
765 );
766 assert_eq!(
767 "ask".parse::<FailureStrategy>().unwrap(),
768 FailureStrategy::Ask
769 );
770 assert!("abort_all".parse::<FailureStrategy>().is_err());
771 assert!("".parse::<FailureStrategy>().is_err());
772 }
773
774 #[test]
775 fn test_chrono_now_iso8601_format() {
776 let ts = chrono_now();
777 assert_eq!(ts.len(), 20, "timestamp should be 20 chars: {ts}");
779 assert!(ts.ends_with('Z'), "should end with Z: {ts}");
780 assert!(ts.contains('T'), "should contain T: {ts}");
781 let year: u32 = ts[..4].parse().expect("year should be numeric");
783 assert!(year >= 2024, "year should be >= 2024: {year}");
784 }
785
786 #[test]
787 fn test_failure_strategy_serde_snake_case() {
788 assert_eq!(
789 serde_json::to_string(&FailureStrategy::Abort).unwrap(),
790 "\"abort\""
791 );
792 assert_eq!(
793 serde_json::to_string(&FailureStrategy::Retry).unwrap(),
794 "\"retry\""
795 );
796 assert_eq!(
797 serde_json::to_string(&FailureStrategy::Skip).unwrap(),
798 "\"skip\""
799 );
800 assert_eq!(
801 serde_json::to_string(&FailureStrategy::Ask).unwrap(),
802 "\"ask\""
803 );
804 }
805
806 #[test]
807 fn test_graph_persistence_save_rejects_long_goal() {
808 let long_goal = "x".repeat(MAX_GOAL_LEN + 1);
811 let mut graph = TaskGraph::new(long_goal);
812 graph.goal = "x".repeat(MAX_GOAL_LEN + 1);
813 assert!(
814 graph.goal.len() > MAX_GOAL_LEN,
815 "test setup: goal must exceed limit"
816 );
817 assert_eq!(MAX_GOAL_LEN, 1024);
820 }
821
822 #[test]
823 fn test_task_node_predicate_fields_default_to_none() {
824 let json = r#"{
827 "id": 0,
828 "title": "t",
829 "description": "d",
830 "agent_hint": null,
831 "status": "pending",
832 "depends_on": [],
833 "result": null,
834 "assigned_agent": null,
835 "retry_count": 0,
836 "failure_strategy": null,
837 "max_retries": null
838 }"#;
839 let node: TaskNode = serde_json::from_str(json).expect("should deserialize old JSON");
840 assert!(node.verify_predicate.is_none());
841 assert!(node.predicate_outcome.is_none());
842 }
843
844 #[test]
845 fn test_task_node_missing_execution_mode_deserializes_as_parallel() {
846 let json = r#"{
849 "id": 0,
850 "title": "t",
851 "description": "d",
852 "agent_hint": null,
853 "status": "pending",
854 "depends_on": [],
855 "result": null,
856 "assigned_agent": null,
857 "retry_count": 0,
858 "failure_strategy": null,
859 "max_retries": null
860 }"#;
861 let node: TaskNode = serde_json::from_str(json).expect("should deserialize old JSON");
862 assert_eq!(node.execution_mode, ExecutionMode::Parallel);
863 }
864
865 #[test]
866 fn test_execution_mode_serde_snake_case() {
867 assert_eq!(
868 serde_json::to_string(&ExecutionMode::Parallel).unwrap(),
869 "\"parallel\""
870 );
871 assert_eq!(
872 serde_json::to_string(&ExecutionMode::Sequential).unwrap(),
873 "\"sequential\""
874 );
875 let p: ExecutionMode = serde_json::from_str("\"parallel\"").unwrap();
876 assert_eq!(p, ExecutionMode::Parallel);
877 let s: ExecutionMode = serde_json::from_str("\"sequential\"").unwrap();
878 assert_eq!(s, ExecutionMode::Sequential);
879 }
880}