dataflow_rs/engine/
workflow.rs

1use crate::engine::error::{DataflowError, Result};
2use crate::engine::task::Task;
3use serde::Deserialize;
4use serde_json::Value;
5use std::fs;
6use std::path::Path;
7
8/// Workflow represents a collection of tasks that execute sequentially
9#[derive(Clone, Debug, Deserialize)]
10pub struct Workflow {
11    pub id: String,
12    pub name: String,
13    #[serde(default)]
14    pub priority: u32,
15    pub description: Option<String>,
16    #[serde(default = "default_condition")]
17    pub condition: Value,
18    #[serde(skip)]
19    pub condition_index: Option<usize>,
20    pub tasks: Vec<Task>,
21    #[serde(default)]
22    pub continue_on_error: bool,
23}
24
25fn default_condition() -> Value {
26    Value::Bool(true)
27}
28
29impl Default for Workflow {
30    fn default() -> Self {
31        Self::new()
32    }
33}
34
35impl Workflow {
36    pub fn new() -> Self {
37        Workflow {
38            id: String::new(),
39            name: String::new(),
40            priority: 0,
41            description: None,
42            condition: Value::Bool(true),
43            condition_index: None,
44            tasks: Vec::new(),
45            continue_on_error: false,
46        }
47    }
48
49    /// Load workflow from JSON string
50    pub fn from_json(json_str: &str) -> Result<Self> {
51        serde_json::from_str(json_str).map_err(DataflowError::from_serde)
52    }
53
54    /// Load workflow from JSON file
55    pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
56        let json_str = fs::read_to_string(path).map_err(DataflowError::from_io)?;
57
58        Self::from_json(&json_str)
59    }
60
61    /// Validate the workflow structure
62    pub fn validate(&self) -> Result<()> {
63        // Check required fields
64        if self.id.is_empty() {
65            return Err(DataflowError::Workflow(
66                "Workflow id cannot be empty".to_string(),
67            ));
68        }
69
70        if self.name.is_empty() {
71            return Err(DataflowError::Workflow(
72                "Workflow name cannot be empty".to_string(),
73            ));
74        }
75
76        // Check tasks
77        if self.tasks.is_empty() {
78            return Err(DataflowError::Workflow(
79                "Workflow must have at least one task".to_string(),
80            ));
81        }
82
83        // Validate that task IDs are unique
84        let mut task_ids = std::collections::HashSet::new();
85        for task in &self.tasks {
86            if !task_ids.insert(&task.id) {
87                return Err(DataflowError::Workflow(format!(
88                    "Duplicate task ID '{}' in workflow",
89                    task.id
90                )));
91            }
92        }
93
94        Ok(())
95    }
96}