dataflow_rs/engine/
workflow.rs

1use crate::engine::error::{DataflowError, Result};
2use crate::engine::task::Task;
3use serde::Deserialize;
4use serde_json::Value;
5use std::fs;
6use std::path::Path;
7
8#[derive(Deserialize, Clone, Debug)]
9pub struct Workflow {
10    pub id: String,
11    pub name: String,
12    pub description: Option<String>,
13    pub condition: Option<Value>,
14    pub tasks: Vec<Task>,
15}
16
17impl Default for Workflow {
18    fn default() -> Self {
19        Self::new()
20    }
21}
22
23impl Workflow {
24    pub fn new() -> Self {
25        Workflow {
26            id: String::new(),
27            name: String::new(),
28            description: None,
29            condition: None,
30            tasks: Vec::new(),
31        }
32    }
33
34    /// Load workflow from JSON string
35    pub fn from_json(json_str: &str) -> Result<Self> {
36        serde_json::from_str(json_str).map_err(DataflowError::from_serde)
37    }
38
39    /// Load workflow from JSON file
40    pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
41        let json_str = fs::read_to_string(path).map_err(DataflowError::from_io)?;
42
43        Self::from_json(&json_str)
44    }
45
46    /// Validate the workflow structure
47    pub fn validate(&self) -> Result<()> {
48        // Check required fields
49        if self.id.is_empty() {
50            return Err(DataflowError::Workflow(
51                "Workflow id cannot be empty".to_string(),
52            ));
53        }
54
55        if self.name.is_empty() {
56            return Err(DataflowError::Workflow(
57                "Workflow name cannot be empty".to_string(),
58            ));
59        }
60
61        // Check tasks
62        if self.tasks.is_empty() {
63            return Err(DataflowError::Workflow(
64                "Workflow must have at least one task".to_string(),
65            ));
66        }
67
68        // Validate that task IDs are unique
69        let mut task_ids = std::collections::HashSet::new();
70        for task in &self.tasks {
71            if !task_ids.insert(&task.id) {
72                return Err(DataflowError::Workflow(format!(
73                    "Duplicate task ID '{}' in workflow",
74                    task.id
75                )));
76            }
77        }
78
79        Ok(())
80    }
81}