Skip to main content

agentic_workflow/template/
composer.rs

1use std::collections::HashMap;
2
3use chrono::Utc;
4use uuid::Uuid;
5
6use crate::types::{
7    CompositionOperator, DataBridge, MetaWorkflow,
8    WorkflowError, WorkflowResult,
9};
10
11/// Workflow composition algebra engine.
12pub struct CompositionEngine {
13    meta_workflows: HashMap<String, MetaWorkflow>,
14}
15
16impl CompositionEngine {
17    pub fn new() -> Self {
18        Self {
19            meta_workflows: HashMap::new(),
20        }
21    }
22
23    /// Create a sequence composition: A → B → C.
24    pub fn sequence(&mut self, name: &str, workflow_ids: Vec<String>) -> WorkflowResult<String> {
25        let id = Uuid::new_v4().to_string();
26        let meta = MetaWorkflow {
27            id: id.clone(),
28            name: name.to_string(),
29            operators: vec![CompositionOperator::Sequence(workflow_ids)],
30            data_bridges: Vec::new(),
31            created_at: Utc::now(),
32        };
33
34        self.meta_workflows.insert(id.clone(), meta);
35        Ok(id)
36    }
37
38    /// Create a parallel composition: A || B || C.
39    pub fn parallel(&mut self, name: &str, workflow_ids: Vec<String>) -> WorkflowResult<String> {
40        let id = Uuid::new_v4().to_string();
41        let meta = MetaWorkflow {
42            id: id.clone(),
43            name: name.to_string(),
44            operators: vec![CompositionOperator::Parallel(workflow_ids)],
45            data_bridges: Vec::new(),
46            created_at: Utc::now(),
47        };
48
49        self.meta_workflows.insert(id.clone(), meta);
50        Ok(id)
51    }
52
53    /// Create a conditional composition: if pred then A else B.
54    pub fn conditional(
55        &mut self,
56        name: &str,
57        predicate: &str,
58        if_true: &str,
59        if_false: &str,
60    ) -> WorkflowResult<String> {
61        let id = Uuid::new_v4().to_string();
62        let meta = MetaWorkflow {
63            id: id.clone(),
64            name: name.to_string(),
65            operators: vec![CompositionOperator::Conditional {
66                predicate: predicate.to_string(),
67                if_true: if_true.to_string(),
68                if_false: if_false.to_string(),
69            }],
70            data_bridges: Vec::new(),
71            created_at: Utc::now(),
72        };
73
74        self.meta_workflows.insert(id.clone(), meta);
75        Ok(id)
76    }
77
78    /// Add a data bridge between composed workflows.
79    pub fn add_bridge(
80        &mut self,
81        meta_id: &str,
82        from_workflow_id: &str,
83        from_output: &str,
84        to_workflow_id: &str,
85        to_input: &str,
86        transform: Option<String>,
87    ) -> WorkflowResult<()> {
88        let meta = self
89            .meta_workflows
90            .get_mut(meta_id)
91            .ok_or_else(|| WorkflowError::Internal(format!("Meta-workflow not found: {}", meta_id)))?;
92
93        meta.data_bridges.push(DataBridge {
94            from_workflow_id: from_workflow_id.to_string(),
95            from_output: from_output.to_string(),
96            to_workflow_id: to_workflow_id.to_string(),
97            to_input: to_input.to_string(),
98            transform,
99        });
100
101        Ok(())
102    }
103
104    /// Validate a composed meta-workflow.
105    pub fn validate(&self, meta_id: &str) -> WorkflowResult<Vec<String>> {
106        let meta = self
107            .meta_workflows
108            .get(meta_id)
109            .ok_or_else(|| WorkflowError::Internal(format!("Meta-workflow not found: {}", meta_id)))?;
110
111        let mut warnings = Vec::new();
112
113        // Check for empty compositions
114        for op in &meta.operators {
115            match op {
116                CompositionOperator::Sequence(ids) if ids.is_empty() => {
117                    warnings.push("Empty sequence composition".to_string());
118                }
119                CompositionOperator::Parallel(ids) if ids.is_empty() => {
120                    warnings.push("Empty parallel composition".to_string());
121                }
122                _ => {}
123            }
124        }
125
126        Ok(warnings)
127    }
128
129    /// Get a meta-workflow.
130    pub fn get_meta(&self, meta_id: &str) -> WorkflowResult<&MetaWorkflow> {
131        self.meta_workflows
132            .get(meta_id)
133            .ok_or_else(|| WorkflowError::Internal(format!("Meta-workflow not found: {}", meta_id)))
134    }
135
136    /// List all meta-workflows.
137    pub fn list_meta(&self) -> Vec<&MetaWorkflow> {
138        self.meta_workflows.values().collect()
139    }
140}
141
142impl Default for CompositionEngine {
143    fn default() -> Self {
144        Self::new()
145    }
146}
147
148#[cfg(test)]
149mod tests {
150    use super::*;
151
152    #[test]
153    fn test_sequence_composition() {
154        let mut engine = CompositionEngine::new();
155        let mid = engine
156            .sequence(
157                "deploy-pipeline",
158                vec!["build".into(), "test".into(), "deploy".into()],
159            )
160            .unwrap();
161
162        let meta = engine.get_meta(&mid).unwrap();
163        assert_eq!(meta.name, "deploy-pipeline");
164
165        let warnings = engine.validate(&mid).unwrap();
166        assert!(warnings.is_empty());
167    }
168}