mofa_foundation/workflow/dsl/
parser.rs1use super::env::substitute_env_recursive;
6use super::schema::*;
7use super::{DslError, DslResult};
8use crate::llm::LLMAgent;
9use crate::workflow::builder::WorkflowBuilder;
10use crate::workflow::state::WorkflowValue;
11use std::collections::HashMap;
12use std::fs;
13use std::path::Path;
14use std::sync::Arc;
15
16pub struct WorkflowDslParser;
18
19impl WorkflowDslParser {
20 pub fn from_yaml(content: &str) -> DslResult<WorkflowDefinition> {
22 let value: serde_yaml::Value = serde_yaml::from_str(content)?;
23 let json_value: serde_json::Value = serde_json::to_value(&value)?;
25 let substituted = substitute_env_recursive(&json_value);
26 let def: WorkflowDefinition = serde_json::from_value(substituted)?;
27 Ok(def)
28 }
29
30 pub fn from_toml(content: &str) -> DslResult<WorkflowDefinition> {
32 let value: toml::Value = toml::from_str(content)?;
33 let json_value: serde_json::Value = serde_json::to_value(&value)
35 .map_err(|e| DslError::Validation(format!("TOML to JSON conversion error: {}", e)))?;
36 let substituted = substitute_env_recursive(&json_value);
37 let def: WorkflowDefinition = serde_json::from_value(substituted)?;
38 Ok(def)
39 }
40
41 pub fn from_file(path: impl AsRef<Path>) -> DslResult<WorkflowDefinition> {
43 let path = path.as_ref();
44 let content = fs::read_to_string(path)?;
45
46 let extension = path
47 .extension()
48 .and_then(|e| e.to_str())
49 .ok_or_else(|| DslError::Validation("No file extension".to_string()))?;
50
51 match extension.to_lowercase().as_str() {
52 "yaml" | "yml" => Self::from_yaml(&content),
53 "toml" => Self::from_toml(&content),
54 _ => Err(DslError::Validation(format!(
55 "Unsupported file extension: {}",
56 extension
57 ))),
58 }
59 }
60
61 pub async fn build_with_agents(
66 definition: WorkflowDefinition,
67 agent_registry: &HashMap<String, Arc<LLMAgent>>,
68 ) -> DslResult<crate::workflow::WorkflowGraph> {
69 Self::validate(&definition)?;
71
72 let mut builder = WorkflowBuilder::new(&definition.metadata.id, &definition.metadata.name)
74 .description(&definition.metadata.description);
75
76 for node_def in definition.nodes {
78 builder = Self::add_node(builder, node_def, agent_registry).await?;
79 }
80
81 for edge in definition.edges {
83 if let Some(condition) = &edge.condition {
84 builder = builder.conditional_edge(&edge.from, &edge.to, condition);
85 } else {
86 builder = builder.edge(&edge.from, &edge.to);
87 }
88 }
89
90 Ok(builder.build())
91 }
92
93 fn validate(definition: &WorkflowDefinition) -> DslResult<()> {
95 let node_ids: Vec<&str> = definition.nodes.iter().map(|n| n.id()).collect();
97
98 if !node_ids.iter().any(|&id| {
100 definition
101 .nodes
102 .iter()
103 .any(|n| matches!(n, NodeDefinition::Start { id: start_id, .. } if start_id == id))
104 }) {
105 return Err(DslError::Validation(
106 "Workflow must have a start node".to_string(),
107 ));
108 }
109
110 if !node_ids.iter().any(|&id| {
112 definition
113 .nodes
114 .iter()
115 .any(|n| matches!(n, NodeDefinition::End { id: end_id, .. } if end_id == id))
116 }) {
117 return Err(DslError::Validation(
118 "Workflow must have an end node".to_string(),
119 ));
120 }
121
122 for edge in &definition.edges {
124 if !node_ids.contains(&edge.from.as_str()) {
125 return Err(DslError::InvalidEdge {
126 from: edge.from.clone(),
127 to: edge.to.clone(),
128 });
129 }
130 if !node_ids.contains(&edge.to.as_str()) {
131 return Err(DslError::InvalidEdge {
132 from: edge.from.clone(),
133 to: edge.to.clone(),
134 });
135 }
136 }
137
138 for node in &definition.nodes {
140 if let NodeDefinition::LLM_AGENT { agent, .. } = node {
141 match agent {
142 AgentRef::Registry { agent_id } => {
143 if !definition.agents.contains_key(agent_id) {
144 return Err(DslError::AgentNotFound(agent_id.clone()));
145 }
146 }
147 AgentRef::Inline(_) => {
148 }
150 }
151 }
152 }
153
154 Ok(())
155 }
156
157 async fn add_node(
159 mut builder: WorkflowBuilder,
160 node_def: NodeDefinition,
161 agent_registry: &HashMap<String, Arc<LLMAgent>>,
162 ) -> DslResult<WorkflowBuilder> {
163 match node_def {
164 NodeDefinition::Start { id, .. } => {
165 builder = builder.start_with_id(&id);
166 }
167 NodeDefinition::End { id, .. } => {
168 builder = builder.end_with_id(&id);
169 }
170 NodeDefinition::Task {
171 id, name, executor, ..
172 } => {
173 match executor {
176 TaskExecutorDef::None => {
177 builder = builder.task(&id, &name, |_ctx, input| async move { Ok(input) });
178 }
179 _ => {
180 return Err(DslError::Validation(
181 "Only 'none' executor type is currently supported for task nodes"
182 .to_string(),
183 ));
184 }
185 }
186 }
187 NodeDefinition::LLM_AGENT {
188 id,
189 name,
190 agent,
191 prompt_template,
192 ..
193 } => {
194 let llm_agent = match agent {
195 AgentRef::Registry { agent_id } => agent_registry
196 .get(agent_id.as_str())
197 .ok_or_else(|| DslError::AgentNotFound(agent_id.clone()))?
198 .clone(),
199 AgentRef::Inline(_) => {
200 return Err(DslError::Build(
204 "Inline agent configuration requires a provider. Use agent registry instead.".to_string(),
205 ));
206 }
207 };
208
209 if let Some(template) = prompt_template {
210 builder = builder.llm_agent_with_template(&id, &name, llm_agent, template);
211 } else {
212 builder = builder.llm_agent(&id, &name, llm_agent);
213 }
214 }
215 NodeDefinition::Condition { id, name, .. } => {
216 builder = builder.task(&id, &name, |_ctx, _input| async move {
219 Ok(WorkflowValue::Bool(true))
220 });
221 }
222 NodeDefinition::Parallel { id, name, .. } => {
223 builder = builder.task(&id, &name, |_ctx, input| async move { Ok(input) });
225 }
226 NodeDefinition::Join {
227 id, name, wait_for, ..
228 } => {
229 let wait_for_refs: Vec<&str> = wait_for.iter().map(|s| s.as_str()).collect();
230 builder = builder.goto(&id);
231 let _ = (id, name, wait_for_refs);
233 }
234 NodeDefinition::Loop { id, name, body, .. } => match body {
235 TaskExecutorDef::None => {
236 builder = builder.loop_node(
237 &id,
238 &name,
239 |_ctx, input| async move { Ok(input) },
240 |_ctx, _input| async move { false },
241 10,
242 );
243 }
244 _ => {
245 return Err(DslError::Validation(
246 "Loop body executor not supported yet".to_string(),
247 ));
248 }
249 },
250 NodeDefinition::Transform { id, name, .. } => {
251 builder = builder.transform(&id, &name, |inputs| async move {
252 inputs.get("input").cloned().unwrap_or(WorkflowValue::Null)
253 });
254 }
255 NodeDefinition::SubWorkflow {
256 id,
257 name,
258 workflow_id,
259 ..
260 } => {
261 builder = builder.sub_workflow(&id, &name, &workflow_id);
262 }
263 NodeDefinition::Wait {
264 id,
265 name,
266 event_type,
267 ..
268 } => {
269 builder = builder.wait(&id, &name, &event_type);
270 }
271 }
272
273 Ok(builder)
274 }
275}