1use std::collections::HashMap;
4
5use chrono::Utc;
6use uuid::Uuid;
7
8use libbrat_grite::{DependencyType, GriteClient};
9
10use crate::error::WorkflowError;
11use crate::parser::WorkflowParser;
12use crate::schema::{WorkflowTemplate, WorkflowType};
13
14#[derive(Debug, Clone, serde::Serialize)]
16pub struct WorkflowInstance {
17 pub instance_id: String,
19 pub workflow_name: String,
21 pub convoy_id: String,
23 pub task_ids: Vec<String>,
25 pub variables: HashMap<String, String>,
27 pub executed_at: String,
29}
30
31pub struct WorkflowExecutor {
33 grite: GriteClient,
35}
36
37impl WorkflowExecutor {
38 pub fn new(grite: GriteClient) -> Self {
40 Self { grite }
41 }
42
43 pub fn execute(
45 &self,
46 template: &WorkflowTemplate,
47 vars: HashMap<String, String>,
48 ) -> Result<WorkflowInstance, WorkflowError> {
49 for (name, spec) in &template.inputs {
51 if spec.required && !vars.contains_key(name) {
52 if spec.default.is_none() {
53 return Err(WorkflowError::MissingInput(name.clone()));
54 }
55 }
56 }
57
58 let mut complete_vars = HashMap::new();
60 for (name, spec) in &template.inputs {
61 if let Some(value) = vars.get(name) {
62 complete_vars.insert(name.clone(), value.clone());
63 } else if let Some(ref default) = spec.default {
64 complete_vars.insert(name.clone(), default.clone());
65 }
66 }
67
68 let instance_id = format!("wf-{}", Uuid::new_v4().to_string().split('-').next().unwrap());
70
71 let convoy_title = WorkflowParser::substitute_vars(
73 &format!("[{}] {}", template.name, template.description.as_deref().unwrap_or(&template.name)),
74 &complete_vars,
75 );
76 let convoy_body = format!(
77 "Workflow instance: {}\nWorkflow: {}\nVariables: {:?}",
78 instance_id, template.name, complete_vars
79 );
80
81 let convoy = self.grite.convoy_create(&convoy_title, Some(&convoy_body))?;
82
83 let task_ids = match template.workflow_type {
85 WorkflowType::Workflow => self.create_sequential_tasks(template, &convoy.convoy_id, &complete_vars, &instance_id)?,
86 WorkflowType::Convoy => self.create_parallel_tasks(template, &convoy.convoy_id, &complete_vars, &instance_id)?,
87 };
88
89 Ok(WorkflowInstance {
90 instance_id,
91 workflow_name: template.name.clone(),
92 convoy_id: convoy.convoy_id,
93 task_ids,
94 variables: complete_vars,
95 executed_at: Utc::now().to_rfc3339(),
96 })
97 }
98
99 fn create_sequential_tasks(
101 &self,
102 template: &WorkflowTemplate,
103 convoy_id: &str,
104 vars: &HashMap<String, String>,
105 instance_id: &str,
106 ) -> Result<Vec<String>, WorkflowError> {
107 let mut task_ids = Vec::new();
108 let mut step_to_task: HashMap<String, (String, String)> = HashMap::new(); let ordered_steps = self.topological_sort_steps(template)?;
112
113 for step in ordered_steps {
114 let title = WorkflowParser::substitute_vars(&step.title, vars);
115 let mut body = WorkflowParser::substitute_vars(&step.body, vars);
116
117 body = format!(
119 "{}\n\n---\nWorkflow: {}\nInstance: {}\nStep: {}",
120 body, template.name, instance_id, step.id
121 );
122
123 let task = self.grite.task_create(convoy_id, &title, Some(&body))?;
124
125 for dep_step_id in &step.needs {
127 if let Some((_, dep_grite_issue_id)) = step_to_task.get(dep_step_id) {
128 if let Err(e) = self.grite.task_dep_add(
130 &task.grite_issue_id,
131 dep_grite_issue_id,
132 DependencyType::DependsOn,
133 ) {
134 eprintln!(
136 "Warning: Failed to add dependency {} -> {}: {}",
137 task.task_id, dep_step_id, e
138 );
139 }
140 }
141 }
142
143 step_to_task.insert(step.id.clone(), (task.task_id.clone(), task.grite_issue_id.clone()));
144 task_ids.push(task.task_id);
145 }
146
147 Ok(task_ids)
148 }
149
150 fn create_parallel_tasks(
152 &self,
153 template: &WorkflowTemplate,
154 convoy_id: &str,
155 vars: &HashMap<String, String>,
156 instance_id: &str,
157 ) -> Result<Vec<String>, WorkflowError> {
158 let mut task_ids = Vec::new();
159 let mut leg_grite_issue_ids = Vec::new();
160
161 for leg in &template.legs {
163 let title = WorkflowParser::substitute_vars(&leg.title, vars);
164 let mut body = WorkflowParser::substitute_vars(&leg.body, vars);
165
166 body = format!(
168 "{}\n\n---\nWorkflow: {}\nInstance: {}\nLeg: {}",
169 body, template.name, instance_id, leg.id
170 );
171
172 let task = self.grite.task_create(convoy_id, &title, Some(&body))?;
173 leg_grite_issue_ids.push(task.grite_issue_id.clone());
174 task_ids.push(task.task_id);
175 }
176
177 if let Some(ref synthesis) = template.synthesis {
179 let title = WorkflowParser::substitute_vars(&synthesis.title, vars);
180 let mut body = WorkflowParser::substitute_vars(&synthesis.body, vars);
181
182 body = format!(
184 "{}\n\n---\nWorkflow: {}\nInstance: {}\nSynthesis: true",
185 body, template.name, instance_id
186 );
187
188 let task = self.grite.task_create(convoy_id, &title, Some(&body))?;
189
190 for leg_issue_id in &leg_grite_issue_ids {
192 if let Err(e) = self.grite.task_dep_add(
193 &task.grite_issue_id,
194 leg_issue_id,
195 DependencyType::DependsOn,
196 ) {
197 eprintln!(
199 "Warning: Failed to add synthesis dependency: {}",
200 e
201 );
202 }
203 }
204
205 task_ids.push(task.task_id);
206 }
207
208 Ok(task_ids)
209 }
210
211 fn topological_sort_steps<'a>(
213 &self,
214 template: &'a WorkflowTemplate,
215 ) -> Result<Vec<&'a crate::schema::StepSpec>, WorkflowError> {
216 let mut result = Vec::new();
217 let mut visited = std::collections::HashSet::new();
218 let mut temp_visited = std::collections::HashSet::new();
219
220 let step_map: HashMap<&str, &crate::schema::StepSpec> = template
222 .steps
223 .iter()
224 .map(|s| (s.id.as_str(), s))
225 .collect();
226
227 for step in &template.steps {
229 if !visited.contains(&step.id) {
230 self.visit_step(
231 &step.id,
232 &step_map,
233 &mut visited,
234 &mut temp_visited,
235 &mut result,
236 )?;
237 }
238 }
239
240 Ok(result)
241 }
242
243 fn visit_step<'a>(
245 &self,
246 step_id: &str,
247 step_map: &HashMap<&str, &'a crate::schema::StepSpec>,
248 visited: &mut std::collections::HashSet<String>,
249 temp_visited: &mut std::collections::HashSet<String>,
250 result: &mut Vec<&'a crate::schema::StepSpec>,
251 ) -> Result<(), WorkflowError> {
252 if temp_visited.contains(step_id) {
253 return Err(WorkflowError::CircularDependency);
254 }
255 if visited.contains(step_id) {
256 return Ok(());
257 }
258
259 temp_visited.insert(step_id.to_string());
260
261 let step = step_map
262 .get(step_id)
263 .ok_or_else(|| WorkflowError::UnknownStep(step_id.to_string()))?;
264
265 for dep in &step.needs {
267 self.visit_step(dep, step_map, visited, temp_visited, result)?;
268 }
269
270 temp_visited.remove(step_id);
271 visited.insert(step_id.to_string());
272 result.push(step);
273
274 Ok(())
275 }
276}