Skip to main content

libbrat_workflow/
executor.rs

1//! Workflow executor - creates convoys and tasks from workflow templates.
2
3use std::collections::HashMap;
4
5use chrono::Utc;
6use uuid::Uuid;
7
8use libbrat_grite::{DependencyType, GriteClient};
9
10use crate::error::WorkflowError;
11use crate::parser::WorkflowParser;
12use crate::schema::{WorkflowTemplate, WorkflowType};
13
14/// Result of executing a workflow.
15#[derive(Debug, Clone, serde::Serialize)]
16pub struct WorkflowInstance {
17    /// Unique instance ID.
18    pub instance_id: String,
19    /// Workflow name.
20    pub workflow_name: String,
21    /// Convoy ID created for this instance.
22    pub convoy_id: String,
23    /// Task IDs created (in execution order).
24    pub task_ids: Vec<String>,
25    /// Input variables used.
26    pub variables: HashMap<String, String>,
27    /// Timestamp when executed.
28    pub executed_at: String,
29}
30
31/// Executor for running workflow templates.
32pub struct WorkflowExecutor {
33    /// Grite client for creating convoys/tasks.
34    grite: GriteClient,
35}
36
37impl WorkflowExecutor {
38    /// Create a new executor with the given Grite client.
39    pub fn new(grite: GriteClient) -> Self {
40        Self { grite }
41    }
42
43    /// Execute a workflow template with the given variables.
44    pub fn execute(
45        &self,
46        template: &WorkflowTemplate,
47        vars: HashMap<String, String>,
48    ) -> Result<WorkflowInstance, WorkflowError> {
49        // Validate required inputs
50        for (name, spec) in &template.inputs {
51            if spec.required && !vars.contains_key(name) {
52                if spec.default.is_none() {
53                    return Err(WorkflowError::MissingInput(name.clone()));
54                }
55            }
56        }
57
58        // Build complete variables map with defaults
59        let mut complete_vars = HashMap::new();
60        for (name, spec) in &template.inputs {
61            if let Some(value) = vars.get(name) {
62                complete_vars.insert(name.clone(), value.clone());
63            } else if let Some(ref default) = spec.default {
64                complete_vars.insert(name.clone(), default.clone());
65            }
66        }
67
68        // Generate instance ID
69        let instance_id = format!("wf-{}", Uuid::new_v4().to_string().split('-').next().unwrap());
70
71        // Create convoy
72        let convoy_title = WorkflowParser::substitute_vars(
73            &format!("[{}] {}", template.name, template.description.as_deref().unwrap_or(&template.name)),
74            &complete_vars,
75        );
76        let convoy_body = format!(
77            "Workflow instance: {}\nWorkflow: {}\nVariables: {:?}",
78            instance_id, template.name, complete_vars
79        );
80
81        let convoy = self.grite.convoy_create(&convoy_title, Some(&convoy_body))?;
82
83        // Create tasks based on workflow type
84        let task_ids = match template.workflow_type {
85            WorkflowType::Workflow => self.create_sequential_tasks(template, &convoy.convoy_id, &complete_vars, &instance_id)?,
86            WorkflowType::Convoy => self.create_parallel_tasks(template, &convoy.convoy_id, &complete_vars, &instance_id)?,
87        };
88
89        Ok(WorkflowInstance {
90            instance_id,
91            workflow_name: template.name.clone(),
92            convoy_id: convoy.convoy_id,
93            task_ids,
94            variables: complete_vars,
95            executed_at: Utc::now().to_rfc3339(),
96        })
97    }
98
99    /// Create tasks for a sequential workflow.
100    fn create_sequential_tasks(
101        &self,
102        template: &WorkflowTemplate,
103        convoy_id: &str,
104        vars: &HashMap<String, String>,
105        instance_id: &str,
106    ) -> Result<Vec<String>, WorkflowError> {
107        let mut task_ids = Vec::new();
108        let mut step_to_task: HashMap<String, (String, String)> = HashMap::new(); // step_id -> (task_id, grite_issue_id)
109
110        // Topological sort for dependency ordering
111        let ordered_steps = self.topological_sort_steps(template)?;
112
113        for step in ordered_steps {
114            let title = WorkflowParser::substitute_vars(&step.title, vars);
115            let mut body = WorkflowParser::substitute_vars(&step.body, vars);
116
117            // Add workflow metadata to body
118            body = format!(
119                "{}\n\n---\nWorkflow: {}\nInstance: {}\nStep: {}",
120                body, template.name, instance_id, step.id
121            );
122
123            let task = self.grite.task_create(convoy_id, &title, Some(&body))?;
124
125            // Add dependencies using grite DAG
126            for dep_step_id in &step.needs {
127                if let Some((_, dep_grite_issue_id)) = step_to_task.get(dep_step_id) {
128                    // This task depends on the dependency task
129                    if let Err(e) = self.grite.task_dep_add(
130                        &task.grite_issue_id,
131                        dep_grite_issue_id,
132                        DependencyType::DependsOn,
133                    ) {
134                        // Log but don't fail - dependency tracking is optional
135                        eprintln!(
136                            "Warning: Failed to add dependency {} -> {}: {}",
137                            task.task_id, dep_step_id, e
138                        );
139                    }
140                }
141            }
142
143            step_to_task.insert(step.id.clone(), (task.task_id.clone(), task.grite_issue_id.clone()));
144            task_ids.push(task.task_id);
145        }
146
147        Ok(task_ids)
148    }
149
150    /// Create tasks for a parallel convoy.
151    fn create_parallel_tasks(
152        &self,
153        template: &WorkflowTemplate,
154        convoy_id: &str,
155        vars: &HashMap<String, String>,
156        instance_id: &str,
157    ) -> Result<Vec<String>, WorkflowError> {
158        let mut task_ids = Vec::new();
159        let mut leg_grite_issue_ids = Vec::new();
160
161        // Create a task for each leg (all start as queued - parallel execution)
162        for leg in &template.legs {
163            let title = WorkflowParser::substitute_vars(&leg.title, vars);
164            let mut body = WorkflowParser::substitute_vars(&leg.body, vars);
165
166            // Add workflow metadata
167            body = format!(
168                "{}\n\n---\nWorkflow: {}\nInstance: {}\nLeg: {}",
169                body, template.name, instance_id, leg.id
170            );
171
172            let task = self.grite.task_create(convoy_id, &title, Some(&body))?;
173            leg_grite_issue_ids.push(task.grite_issue_id.clone());
174            task_ids.push(task.task_id);
175        }
176
177        // Create synthesis task if defined
178        if let Some(ref synthesis) = template.synthesis {
179            let title = WorkflowParser::substitute_vars(&synthesis.title, vars);
180            let mut body = WorkflowParser::substitute_vars(&synthesis.body, vars);
181
182            // Add workflow metadata
183            body = format!(
184                "{}\n\n---\nWorkflow: {}\nInstance: {}\nSynthesis: true",
185                body, template.name, instance_id
186            );
187
188            let task = self.grite.task_create(convoy_id, &title, Some(&body))?;
189
190            // Add dependencies from synthesis to all legs using grite DAG
191            for leg_issue_id in &leg_grite_issue_ids {
192                if let Err(e) = self.grite.task_dep_add(
193                    &task.grite_issue_id,
194                    leg_issue_id,
195                    DependencyType::DependsOn,
196                ) {
197                    // Log but don't fail - dependency tracking is optional
198                    eprintln!(
199                        "Warning: Failed to add synthesis dependency: {}",
200                        e
201                    );
202                }
203            }
204
205            task_ids.push(task.task_id);
206        }
207
208        Ok(task_ids)
209    }
210
211    /// Topological sort of workflow steps based on dependencies.
212    fn topological_sort_steps<'a>(
213        &self,
214        template: &'a WorkflowTemplate,
215    ) -> Result<Vec<&'a crate::schema::StepSpec>, WorkflowError> {
216        let mut result = Vec::new();
217        let mut visited = std::collections::HashSet::new();
218        let mut temp_visited = std::collections::HashSet::new();
219
220        // Build step map for quick lookup
221        let step_map: HashMap<&str, &crate::schema::StepSpec> = template
222            .steps
223            .iter()
224            .map(|s| (s.id.as_str(), s))
225            .collect();
226
227        // Visit each step
228        for step in &template.steps {
229            if !visited.contains(&step.id) {
230                self.visit_step(
231                    &step.id,
232                    &step_map,
233                    &mut visited,
234                    &mut temp_visited,
235                    &mut result,
236                )?;
237            }
238        }
239
240        Ok(result)
241    }
242
243    /// Helper for topological sort - visits a step and its dependencies.
244    fn visit_step<'a>(
245        &self,
246        step_id: &str,
247        step_map: &HashMap<&str, &'a crate::schema::StepSpec>,
248        visited: &mut std::collections::HashSet<String>,
249        temp_visited: &mut std::collections::HashSet<String>,
250        result: &mut Vec<&'a crate::schema::StepSpec>,
251    ) -> Result<(), WorkflowError> {
252        if temp_visited.contains(step_id) {
253            return Err(WorkflowError::CircularDependency);
254        }
255        if visited.contains(step_id) {
256            return Ok(());
257        }
258
259        temp_visited.insert(step_id.to_string());
260
261        let step = step_map
262            .get(step_id)
263            .ok_or_else(|| WorkflowError::UnknownStep(step_id.to_string()))?;
264
265        // Visit dependencies first
266        for dep in &step.needs {
267            self.visit_step(dep, step_map, visited, temp_visited, result)?;
268        }
269
270        temp_visited.remove(step_id);
271        visited.insert(step_id.to_string());
272        result.push(step);
273
274        Ok(())
275    }
276}