Skip to main content

mofa_foundation/workflow/dsl/
parser.rs

1//! Workflow DSL Parser
2//!
3//! Parses YAML/TOML workflow definitions and builds executable workflows.
4
5use super::env::substitute_env_recursive;
6use super::schema::*;
7use super::{DslError, DslResult};
8use crate::llm::LLMAgent;
9use crate::workflow::builder::WorkflowBuilder;
10use crate::workflow::state::WorkflowValue;
11use std::collections::HashMap;
12use std::fs;
13use std::path::Path;
14use std::sync::Arc;
15
16/// Workflow DSL parser
17pub struct WorkflowDslParser;
18
19impl WorkflowDslParser {
20    /// Parse workflow definition from YAML string
21    pub fn from_yaml(content: &str) -> DslResult<WorkflowDefinition> {
22        let value: serde_yaml::Value = serde_yaml::from_str(content)?;
23        // Apply environment variable substitution
24        let json_value: serde_json::Value = serde_json::to_value(&value)?;
25        let substituted = substitute_env_recursive(&json_value);
26        let def: WorkflowDefinition = serde_json::from_value(substituted)?;
27        Ok(def)
28    }
29
30    /// Parse workflow definition from TOML string
31    pub fn from_toml(content: &str) -> DslResult<WorkflowDefinition> {
32        let value: toml::Value = toml::from_str(content)?;
33        // Convert to JSON for env substitution
34        let json_value: serde_json::Value = serde_json::to_value(&value)
35            .map_err(|e| DslError::Validation(format!("TOML to JSON conversion error: {}", e)))?;
36        let substituted = substitute_env_recursive(&json_value);
37        let def: WorkflowDefinition = serde_json::from_value(substituted)?;
38        Ok(def)
39    }
40
41    /// Parse workflow definition from file (auto-detect format)
42    pub fn from_file(path: impl AsRef<Path>) -> DslResult<WorkflowDefinition> {
43        let path = path.as_ref();
44        let content = fs::read_to_string(path)?;
45
46        let extension = path
47            .extension()
48            .and_then(|e| e.to_str())
49            .ok_or_else(|| DslError::Validation("No file extension".to_string()))?;
50
51        match extension.to_lowercase().as_str() {
52            "yaml" | "yml" => Self::from_yaml(&content),
53            "toml" => Self::from_toml(&content),
54            _ => Err(DslError::Validation(format!(
55                "Unsupported file extension: {}",
56                extension
57            ))),
58        }
59    }
60
61    /// Build a workflow graph from definition
62    ///
63    /// This method requires a registry of pre-built LLMAgent instances.
64    /// Agents referenced in the workflow definition must be available in the registry.
65    pub async fn build_with_agents(
66        definition: WorkflowDefinition,
67        agent_registry: &HashMap<String, Arc<LLMAgent>>,
68    ) -> DslResult<crate::workflow::WorkflowGraph> {
69        // Validate definition
70        Self::validate(&definition)?;
71
72        // Build workflow
73        let mut builder = WorkflowBuilder::new(&definition.metadata.id, &definition.metadata.name)
74            .description(&definition.metadata.description);
75
76        // Add nodes
77        for node_def in definition.nodes {
78            builder = Self::add_node(builder, node_def, agent_registry).await?;
79        }
80
81        // Add edges
82        for edge in definition.edges {
83            if let Some(condition) = &edge.condition {
84                builder = builder.conditional_edge(&edge.from, &edge.to, condition);
85            } else {
86                builder = builder.edge(&edge.from, &edge.to);
87            }
88        }
89
90        Ok(builder.build())
91    }
92
93    /// Validate workflow definition
94    fn validate(definition: &WorkflowDefinition) -> DslResult<()> {
95        // Check for required nodes
96        let node_ids: Vec<&str> = definition.nodes.iter().map(|n| n.id()).collect();
97
98        // Verify start node exists
99        if !node_ids.iter().any(|&id| {
100            definition
101                .nodes
102                .iter()
103                .any(|n| matches!(n, NodeDefinition::Start { id: start_id, .. } if start_id == id))
104        }) {
105            return Err(DslError::Validation(
106                "Workflow must have a start node".to_string(),
107            ));
108        }
109
110        // Verify end node exists
111        if !node_ids.iter().any(|&id| {
112            definition
113                .nodes
114                .iter()
115                .any(|n| matches!(n, NodeDefinition::End { id: end_id, .. } if end_id == id))
116        }) {
117            return Err(DslError::Validation(
118                "Workflow must have an end node".to_string(),
119            ));
120        }
121
122        // Verify all edge references are valid
123        for edge in &definition.edges {
124            if !node_ids.contains(&edge.from.as_str()) {
125                return Err(DslError::InvalidEdge {
126                    from: edge.from.clone(),
127                    to: edge.to.clone(),
128                });
129            }
130            if !node_ids.contains(&edge.to.as_str()) {
131                return Err(DslError::InvalidEdge {
132                    from: edge.from.clone(),
133                    to: edge.to.clone(),
134                });
135            }
136        }
137
138        // Verify agent references
139        for node in &definition.nodes {
140            if let NodeDefinition::LLM_AGENT { agent, .. } = node {
141                match agent {
142                    AgentRef::Registry { agent_id } => {
143                        if !definition.agents.contains_key(agent_id) {
144                            return Err(DslError::AgentNotFound(agent_id.clone()));
145                        }
146                    }
147                    AgentRef::Inline(_) => {
148                        // Inline agents are self-contained
149                    }
150                }
151            }
152        }
153
154        Ok(())
155    }
156
157    /// Add a node to the workflow builder
158    async fn add_node(
159        mut builder: WorkflowBuilder,
160        node_def: NodeDefinition,
161        agent_registry: &HashMap<String, Arc<LLMAgent>>,
162    ) -> DslResult<WorkflowBuilder> {
163        match node_def {
164            NodeDefinition::Start { id, .. } => {
165                builder = builder.start_with_id(&id);
166            }
167            NodeDefinition::End { id, .. } => {
168                builder = builder.end_with_id(&id);
169            }
170            NodeDefinition::Task {
171                id, name, executor, ..
172            } => {
173                // For now, tasks are limited to simple operations
174                // More complex task execution will be added later
175                match executor {
176                    TaskExecutorDef::None => {
177                        builder = builder.task(&id, &name, |_ctx, input| async move { Ok(input) });
178                    }
179                    _ => {
180                        return Err(DslError::Validation(
181                            "Only 'none' executor type is currently supported for task nodes"
182                                .to_string(),
183                        ));
184                    }
185                }
186            }
187            NodeDefinition::LLM_AGENT {
188                id,
189                name,
190                agent,
191                prompt_template,
192                ..
193            } => {
194                let llm_agent = match agent {
195                    AgentRef::Registry { agent_id } => agent_registry
196                        .get(agent_id.as_str())
197                        .ok_or_else(|| DslError::AgentNotFound(agent_id.clone()))?
198                        .clone(),
199                    AgentRef::Inline(_) => {
200                        // Build agent from inline config
201                        // Note: This requires a provider to be available
202                        // For now, we'll return an error
203                        return Err(DslError::Build(
204                            "Inline agent configuration requires a provider. Use agent registry instead.".to_string(),
205                        ));
206                    }
207                };
208
209                if let Some(template) = prompt_template {
210                    builder = builder.llm_agent_with_template(&id, &name, llm_agent, template);
211                } else {
212                    builder = builder.llm_agent(&id, &name, llm_agent);
213                }
214            }
215            NodeDefinition::Condition { id, name, .. } => {
216                // Condition nodes need special handling - use the agent node type
217                // with a custom executor that evaluates to true/false
218                builder = builder.task(&id, &name, |_ctx, _input| async move {
219                    Ok(WorkflowValue::Bool(true))
220                });
221            }
222            NodeDefinition::Parallel { id, name, .. } => {
223                // Parallel node - just mark it, actual parallelism handled by edges
224                builder = builder.task(&id, &name, |_ctx, input| async move { Ok(input) });
225            }
226            NodeDefinition::Join {
227                id, name, wait_for, ..
228            } => {
229                let wait_for_refs: Vec<&str> = wait_for.iter().map(|s| s.as_str()).collect();
230                builder = builder.goto(&id);
231                // Note: The join node will be connected later
232                let _ = (id, name, wait_for_refs);
233            }
234            NodeDefinition::Loop { id, name, body, .. } => match body {
235                TaskExecutorDef::None => {
236                    builder = builder.loop_node(
237                        &id,
238                        &name,
239                        |_ctx, input| async move { Ok(input) },
240                        |_ctx, _input| async move { false },
241                        10,
242                    );
243                }
244                _ => {
245                    return Err(DslError::Validation(
246                        "Loop body executor not supported yet".to_string(),
247                    ));
248                }
249            },
250            NodeDefinition::Transform { id, name, .. } => {
251                builder = builder.transform(&id, &name, |inputs| async move {
252                    inputs.get("input").cloned().unwrap_or(WorkflowValue::Null)
253                });
254            }
255            NodeDefinition::SubWorkflow {
256                id,
257                name,
258                workflow_id,
259                ..
260            } => {
261                builder = builder.sub_workflow(&id, &name, &workflow_id);
262            }
263            NodeDefinition::Wait {
264                id,
265                name,
266                event_type,
267                ..
268            } => {
269                builder = builder.wait(&id, &name, &event_type);
270            }
271        }
272
273        Ok(builder)
274    }
275}