Skip to main content

agent_runtime/
workflow.rs

1use crate::step::{Step, StepType};
2use crate::types::JsonValue;
3use serde::{Deserialize, Serialize};
4
5#[cfg(test)]
6#[path = "workflow_test.rs"]
7mod workflow_test;
8
9/// Workflow execution state
10#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
11#[serde(rename_all = "lowercase")]
12pub enum WorkflowState {
13    Pending,
14    Running,
15    Completed,
16    Failed,
17}
18
19/// Workflow definition
20pub struct Workflow {
21    pub id: String,
22    pub steps: Vec<Box<dyn Step>>,
23    pub initial_input: JsonValue,
24    pub state: WorkflowState,
25}
26
27impl Workflow {
28    pub fn builder() -> WorkflowBuilder {
29        WorkflowBuilder::new()
30    }
31
32    /// Convenience method to create a new workflow with a name
33    /// This is primarily for testing - production code should use builder()
34    pub fn with_name(name: &str) -> WorkflowBuilder {
35        WorkflowBuilder::new().name(name.to_string())
36    }
37
38    /// Generate a Mermaid flowchart diagram of this workflow with full expansion
39    pub fn to_mermaid(&self) -> String {
40        let mut diagram = String::from("flowchart TD\n");
41        let mut node_counter = 0;
42
43        // Start node
44        diagram.push_str("    Start([Start])\n");
45
46        if self.steps.is_empty() {
47            diagram.push_str("    Start --> End\n");
48        } else {
49            // Connect start to first step
50            let first_node = format!("N{}", node_counter);
51            diagram.push_str(&format!("    Start --> {}\n", first_node));
52
53            // Generate recursive structure
54            let last_node =
55                self.generate_mermaid_steps(&mut diagram, &mut node_counter, &first_node, 0);
56
57            // Connect last step to end
58            diagram.push_str(&format!("    {} --> End\n", last_node));
59        }
60
61        diagram.push_str("    End([End])\n");
62
63        // Add styling
64        diagram.push('\n');
65        diagram.push_str("    classDef agentStyle fill:#e1f5ff,stroke:#01579b,stroke-width:2px\n");
66        diagram
67            .push_str("    classDef transformStyle fill:#f3e5f5,stroke:#4a148c,stroke-width:2px\n");
68        diagram.push_str(
69            "    classDef conditionalStyle fill:#fff3e0,stroke:#e65100,stroke-width:2px\n",
70        );
71        diagram.push_str(
72            "    classDef subworkflowStyle fill:#e8f5e9,stroke:#1b5e20,stroke-width:2px\n",
73        );
74        diagram
75            .push_str("    classDef convergeStyle fill:#f5f5f5,stroke:#757575,stroke-width:1px\n");
76
77        diagram
78    }
79
80    /// Recursive helper to generate mermaid steps
81    fn generate_mermaid_steps(
82        &self,
83        diagram: &mut String,
84        node_counter: &mut usize,
85        entry_node: &str,
86        step_index: usize,
87    ) -> String {
88        if step_index >= self.steps.len() {
89            return entry_node.to_string();
90        }
91
92        let step = &self.steps[step_index];
93        let step_type = step.step_type();
94
95        match step_type {
96            StepType::Conditional => {
97                // Get branches
98                if let Some((then_step, else_step)) = step.get_branches() {
99                    let conditional_node = entry_node;
100
101                    // Create conditional diamond
102                    let step_name = step.name();
103                    diagram.push_str(&format!(
104                        "    {}{{{{\"{}\"}}}}:::conditionalStyle\n",
105                        conditional_node, step_name
106                    ));
107
108                    // Then branch
109                    *node_counter += 1;
110                    let then_node = format!("N{}", node_counter);
111                    let then_exit_node;
112
113                    // Check if then branch is a SubWorkflow
114                    if then_step.step_type() == StepType::SubWorkflow {
115                        if let Some(sub_wf) = then_step.get_sub_workflow() {
116                            // Generate subworkflow and get (entry, exit)
117                            let (_entry, exit) = self.generate_subworkflow_inline(
118                                diagram,
119                                node_counter,
120                                &then_node,
121                                sub_wf,
122                                then_step.name(),
123                            );
124                            then_exit_node = exit;
125                        } else {
126                            self.generate_step_node(diagram, &then_node, then_step);
127                            then_exit_node = then_node.clone();
128                        }
129                    } else {
130                        self.generate_step_node(diagram, &then_node, then_step);
131                        then_exit_node = then_node.clone();
132                    }
133
134                    diagram.push_str(&format!(
135                        "    {} -->|\"✓ TRUE\"| {}\n",
136                        conditional_node, then_node
137                    ));
138
139                    // Else branch
140                    *node_counter += 1;
141                    let else_node = format!("N{}", node_counter);
142                    let else_exit_node;
143
144                    // Check if else branch is a SubWorkflow
145                    if else_step.step_type() == StepType::SubWorkflow {
146                        if let Some(sub_wf) = else_step.get_sub_workflow() {
147                            let (_entry, exit) = self.generate_subworkflow_inline(
148                                diagram,
149                                node_counter,
150                                &else_node,
151                                sub_wf,
152                                else_step.name(),
153                            );
154                            else_exit_node = exit;
155                        } else {
156                            self.generate_step_node(diagram, &else_node, else_step);
157                            else_exit_node = else_node.clone();
158                        }
159                    } else {
160                        self.generate_step_node(diagram, &else_node, else_step);
161                        else_exit_node = else_node.clone();
162                    }
163
164                    diagram.push_str(&format!(
165                        "    {} -->|\"✗ FALSE\"| {}\n",
166                        conditional_node, else_node
167                    ));
168
169                    // Convergence point - use EXIT nodes from branches
170                    *node_counter += 1;
171                    let converge_node = format!("N{}", node_counter);
172                    diagram.push_str(&format!("    {}(( )):::convergeStyle\n", converge_node));
173                    diagram.push_str(&format!("    {} --> {}\n", then_exit_node, converge_node));
174                    diagram.push_str(&format!("    {} --> {}\n", else_exit_node, converge_node));
175
176                    // Continue with next step
177                    if step_index + 1 < self.steps.len() {
178                        // Check if next step is a subworkflow - if so, DON'T create intermediate node
179                        let next_step = &self.steps[step_index + 1];
180                        if next_step.step_type() == StepType::SubWorkflow {
181                            // Pass converge_node directly as entry for subworkflow
182                            return self.generate_mermaid_steps(
183                                diagram,
184                                node_counter,
185                                &converge_node,
186                                step_index + 1,
187                            );
188                        } else {
189                            *node_counter += 1;
190                            let next_node = format!("N{}", node_counter);
191                            diagram.push_str(&format!("    {} --> {}\n", converge_node, next_node));
192                            return self.generate_mermaid_steps(
193                                diagram,
194                                node_counter,
195                                &next_node,
196                                step_index + 1,
197                            );
198                        }
199                    } else {
200                        return converge_node;
201                    }
202                }
203            }
204            StepType::SubWorkflow => {
205                // Get sub-workflow and expand it
206                if let Some(sub_wf) = step.get_sub_workflow() {
207                    let sub_start_node = entry_node;
208
209                    // Generate subworkflow and get its exit node
210                    let (_entry, sub_exit_node) = self.generate_subworkflow_inline(
211                        diagram,
212                        node_counter,
213                        sub_start_node,
214                        sub_wf,
215                        step.name(),
216                    );
217
218                    // Continue with next step from subworkflow exit
219                    return self.generate_mermaid_steps(
220                        diagram,
221                        node_counter,
222                        &sub_exit_node,
223                        step_index + 1,
224                    );
225                }
226            }
227            _ => {
228                // Regular step (Agent, Transform, etc.)
229                let current_node = entry_node;
230                self.generate_step_node(diagram, current_node, step.as_ref());
231
232                // Continue with next step if there is one
233                if step_index + 1 < self.steps.len() {
234                    // Check if next step is subworkflow
235                    let next_step = &self.steps[step_index + 1];
236                    if next_step.step_type() == StepType::SubWorkflow {
237                        // Pass current_node directly as entry for subworkflow
238                        return self.generate_mermaid_steps(
239                            diagram,
240                            node_counter,
241                            current_node,
242                            step_index + 1,
243                        );
244                    } else {
245                        *node_counter += 1;
246                        let next_node = format!("N{}", node_counter);
247                        diagram.push_str(&format!("    {} --> {}\n", current_node, next_node));
248                        return self.generate_mermaid_steps(
249                            diagram,
250                            node_counter,
251                            &next_node,
252                            step_index + 1,
253                        );
254                    }
255                } else {
256                    // This is the last step
257                    return current_node.to_string();
258                }
259            }
260        }
261
262        entry_node.to_string()
263    }
264
265    /// Generate a subworkflow inline as a subgraph
266    /// Returns (entry_node, exit_node) tuple
267    fn generate_subworkflow_inline(
268        &self,
269        diagram: &mut String,
270        node_counter: &mut usize,
271        entry_point: &str,
272        sub_wf: Workflow,
273        step_name: &str,
274    ) -> (String, String) {
275        let subgraph_id = *node_counter;
276
277        // Create sub-workflow container
278        diagram.push_str(&format!(
279            "    subgraph SUB{}[\"📦 {}\"]\n",
280            subgraph_id, step_name
281        ));
282
283        *node_counter += 1;
284        let sub_entry = format!("N{}", node_counter);
285        diagram.push_str(&format!("        {}([Start])\n", sub_entry));
286
287        // Generate sub-workflow steps recursively
288        let sub_exit =
289            sub_wf.generate_mermaid_steps_in_subgraph(diagram, node_counter, &sub_entry, 0);
290
291        *node_counter += 1;
292        let sub_end = format!("N{}", node_counter);
293        diagram.push_str(&format!("        {}([End])\n", sub_end));
294        diagram.push_str(&format!("        {} --> {}\n", sub_exit, sub_end));
295
296        diagram.push_str("    end\n");
297
298        // Connect entry point to subworkflow start
299        diagram.push_str(&format!("    {} --> {}\n", entry_point, sub_entry));
300
301        // Return both entry point (for conditional connection) and exit node (for continuation)
302        (entry_point.to_string(), sub_end)
303    }
304
305    /// Generate steps within a subgraph (different indentation)
306    fn generate_mermaid_steps_in_subgraph(
307        &self,
308        diagram: &mut String,
309        node_counter: &mut usize,
310        entry_node: &str,
311        step_index: usize,
312    ) -> String {
313        if step_index >= self.steps.len() {
314            return entry_node.to_string();
315        }
316
317        let step = &self.steps[step_index];
318        let step_type = step.step_type();
319
320        match step_type {
321            StepType::Conditional => {
322                if let Some((then_step, else_step)) = step.get_branches() {
323                    let conditional_node = entry_node;
324
325                    diagram.push_str(&format!(
326                        "        {}{{{{\"{}\"}}}}:::conditionalStyle\n",
327                        conditional_node,
328                        step.name()
329                    ));
330
331                    *node_counter += 1;
332                    let then_node = format!("N{}", node_counter);
333                    self.generate_step_node_indented(diagram, &then_node, then_step);
334                    diagram.push_str(&format!(
335                        "        {} -->|\"✓\"| {}\n",
336                        conditional_node, then_node
337                    ));
338
339                    *node_counter += 1;
340                    let else_node = format!("N{}", node_counter);
341                    self.generate_step_node_indented(diagram, &else_node, else_step);
342                    diagram.push_str(&format!(
343                        "        {} -->|\"✗\"| {}\n",
344                        conditional_node, else_node
345                    ));
346
347                    *node_counter += 1;
348                    let converge_node = format!("N{}", node_counter);
349                    diagram.push_str(&format!("        {}(( )):::convergeStyle\n", converge_node));
350                    diagram.push_str(&format!("        {} --> {}\n", then_node, converge_node));
351                    diagram.push_str(&format!("        {} --> {}\n", else_node, converge_node));
352
353                    *node_counter += 1;
354                    let next_node = format!("N{}", node_counter);
355                    diagram.push_str(&format!("        {} --> {}\n", converge_node, next_node));
356
357                    return self.generate_mermaid_steps_in_subgraph(
358                        diagram,
359                        node_counter,
360                        &next_node,
361                        step_index + 1,
362                    );
363                }
364            }
365            StepType::SubWorkflow => {
366                // Nested subworkflow within a subworkflow
367                if let Some(nested_wf) = step.get_sub_workflow() {
368                    let nested_entry = entry_node;
369
370                    let nested_id = *node_counter;
371                    diagram.push_str(&format!(
372                        "        subgraph NESTED{}[\"📦 {}\"]\n",
373                        nested_id,
374                        step.name()
375                    ));
376
377                    *node_counter += 1;
378                    let nested_start = format!("N{}", node_counter);
379                    diagram.push_str(&format!("            {}([Start])\n", nested_start));
380
381                    let nested_exit = nested_wf.generate_mermaid_steps_in_nested_subgraph(
382                        diagram,
383                        node_counter,
384                        &nested_start,
385                        0,
386                    );
387
388                    *node_counter += 1;
389                    let nested_end = format!("N{}", node_counter);
390                    diagram.push_str(&format!("            {}([End])\n", nested_end));
391                    diagram.push_str(&format!("            {} --> {}\n", nested_exit, nested_end));
392                    diagram.push_str("        end\n");
393
394                    diagram.push_str(&format!("        {} --> {}\n", nested_entry, nested_start));
395
396                    *node_counter += 1;
397                    let next_node = format!("N{}", node_counter);
398                    diagram.push_str(&format!("        {} --> {}\n", nested_end, next_node));
399
400                    return self.generate_mermaid_steps_in_subgraph(
401                        diagram,
402                        node_counter,
403                        &next_node,
404                        step_index + 1,
405                    );
406                }
407            }
408            _ => {
409                let current_node = entry_node;
410                self.generate_step_node_indented(diagram, current_node, step.as_ref());
411
412                *node_counter += 1;
413                let next_node = format!("N{}", node_counter);
414                diagram.push_str(&format!("        {} --> {}\n", current_node, next_node));
415
416                return self.generate_mermaid_steps_in_subgraph(
417                    diagram,
418                    node_counter,
419                    &next_node,
420                    step_index + 1,
421                );
422            }
423        }
424
425        entry_node.to_string()
426    }
427
428    /// Generate steps within a nested subgraph (triple indentation)
429    fn generate_mermaid_steps_in_nested_subgraph(
430        &self,
431        diagram: &mut String,
432        node_counter: &mut usize,
433        entry_node: &str,
434        step_index: usize,
435    ) -> String {
436        if step_index >= self.steps.len() {
437            return entry_node.to_string();
438        }
439
440        let step = &self.steps[step_index];
441        let current_node = entry_node;
442
443        // For simplicity at 3rd level, just show step names without further nesting
444        let step_name = step.name();
445        let step_type = step.step_type();
446
447        let (node_def, style) = match step_type {
448            StepType::Agent => (
449                format!("            {}[\"{}\"]", current_node, step_name),
450                ":::agentStyle",
451            ),
452            StepType::Transform => (
453                format!("            {}[/\"{}\"/]", current_node, step_name),
454                ":::transformStyle",
455            ),
456            StepType::Conditional => (
457                format!("            {}{{\"{}\"}}", current_node, step_name),
458                ":::conditionalStyle",
459            ),
460            _ => (
461                format!("            {}[\"{}\"]", current_node, step_name),
462                "",
463            ),
464        };
465
466        diagram.push_str(&format!("{}{}\n", node_def, style));
467
468        *node_counter += 1;
469        let next_node = format!("N{}", node_counter);
470        diagram.push_str(&format!("            {} --> {}\n", current_node, next_node));
471
472        self.generate_mermaid_steps_in_nested_subgraph(
473            diagram,
474            node_counter,
475            &next_node,
476            step_index + 1,
477        )
478    }
479
480    /// Generate a single step node (normal indentation)
481    fn generate_step_node(&self, diagram: &mut String, node_id: &str, step: &dyn Step) {
482        let step_name = step.name();
483        let step_type = step.step_type();
484
485        let (node_def, style) = match step_type {
486            StepType::Agent => (
487                format!("    {}[\"{}\"]", node_id, step_name),
488                ":::agentStyle",
489            ),
490            StepType::Transform => (
491                format!("    {}[/\"{}\"/]", node_id, step_name),
492                ":::transformStyle",
493            ),
494            StepType::SubWorkflow => (
495                format!("    {}[[\"{}\"]", node_id, step_name),
496                ":::subworkflowStyle",
497            ),
498            _ => (format!("    {}[\"{}\"]", node_id, step_name), ""),
499        };
500
501        diagram.push_str(&format!("{}{}\n", node_def, style));
502    }
503
504    /// Generate a single step node (indented for subgraph)
505    fn generate_step_node_indented(&self, diagram: &mut String, node_id: &str, step: &dyn Step) {
506        let step_name = step.name();
507        let step_type = step.step_type();
508
509        let (node_def, style) = match step_type {
510            StepType::Agent => (
511                format!("        {}[\"{}\"]", node_id, step_name),
512                ":::agentStyle",
513            ),
514            StepType::Transform => (
515                format!("        {}[/\"{}\"/]", node_id, step_name),
516                ":::transformStyle",
517            ),
518            _ => (format!("        {}[\"{}\"]", node_id, step_name), ""),
519        };
520
521        diagram.push_str(&format!("{}{}\n", node_def, style));
522    }
523}
524
525/// Builder for Workflow
526pub struct WorkflowBuilder {
527    name: Option<String>,
528    steps: Vec<Box<dyn Step>>,
529    initial_input: Option<JsonValue>,
530}
531
532impl WorkflowBuilder {
533    pub fn new() -> Self {
534        Self {
535            name: None,
536            steps: Vec::new(),
537            initial_input: None,
538        }
539    }
540
541    /// Set the workflow name/ID
542    pub fn name(mut self, name: String) -> Self {
543        self.name = Some(name);
544        self
545    }
546
547    /// Add a step to the workflow
548    pub fn step(mut self, step: Box<dyn Step>) -> Self {
549        self.steps.push(step);
550        self
551    }
552
553    /// Add a step to the workflow (alias for better readability)
554    pub fn add_step(mut self, step: Box<dyn Step>) -> Self {
555        self.steps.push(step);
556        self
557    }
558
559    /// Set the initial input
560    pub fn initial_input(mut self, input: JsonValue) -> Self {
561        self.initial_input = Some(input);
562        self
563    }
564
565    pub fn build(self) -> Workflow {
566        Workflow {
567            id: self
568                .name
569                .unwrap_or_else(|| format!("wf_{}", uuid::Uuid::new_v4())),
570            steps: self.steps,
571            initial_input: self.initial_input.unwrap_or(serde_json::json!({})),
572            state: WorkflowState::Pending,
573        }
574    }
575}
576
577impl Default for WorkflowBuilder {
578    fn default() -> Self {
579        Self::new()
580    }
581}
582
583/// A workflow execution run with complete history
584#[derive(Debug, Clone, Serialize, Deserialize)]
585pub struct WorkflowRun {
586    pub workflow_id: String,
587    pub state: WorkflowState,
588    pub steps: Vec<WorkflowStepRecord>,
589    pub final_output: Option<JsonValue>,
590
591    /// Parent workflow ID if this is a sub-workflow
592    #[serde(skip_serializing_if = "Option::is_none")]
593    pub parent_workflow_id: Option<String>,
594}
595
596impl WorkflowRun {
597    /// Generate a Mermaid flowchart with execution results
598    pub fn to_mermaid_with_results(&self) -> String {
599        let mut diagram = String::from("flowchart TD\n");
600
601        // Start node
602        let start_style = match self.state {
603            WorkflowState::Completed => ":::successStyle",
604            WorkflowState::Failed => ":::failureStyle",
605            _ => "",
606        };
607        diagram.push_str(&format!("    Start([Start]){}  \n", start_style));
608
609        // Connect start to first step
610        if !self.steps.is_empty() {
611            diagram.push_str("    Start --> Step0\n");
612        }
613
614        // Generate nodes for each step
615        for (i, step) in self.steps.iter().enumerate() {
616            let node_id = format!("Step{}", i);
617            let step_name = &step.step_name;
618            let step_type = &step.step_type;
619            let exec_time = step.execution_time_ms.unwrap_or(0);
620
621            // Determine style based on output
622            let has_output = step.output.is_some();
623            let style_class = if has_output {
624                ":::successStyle"
625            } else {
626                ":::failureStyle"
627            };
628
629            // Choose node shape based on step type
630            let node_def = if step_type.contains("Agent") {
631                format!(
632                    "    {}[\"{}<br/><i>{}</i><br/>{}ms\"]{}",
633                    node_id, step_name, step_type, exec_time, style_class
634                )
635            } else if step_type.contains("Transform") {
636                format!(
637                    "    {}[/\"{}<br/><i>{}</i><br/>{}ms\"/]{}",
638                    node_id, step_name, step_type, exec_time, style_class
639                )
640            } else if step_type.contains("Conditional") {
641                format!(
642                    "    {}{{\"{}<br/><i>{}</i><br/>{}ms\"}}{}",
643                    node_id, step_name, step_type, exec_time, style_class
644                )
645            } else if step_type.contains("SubWorkflow") {
646                format!(
647                    "    {}[[\"{}<br/><i>{}</i><br/>{}ms\"]]{}",
648                    node_id, step_name, step_type, exec_time, style_class
649                )
650            } else {
651                format!(
652                    "    {}[\"{}<br/><i>{}</i><br/>{}ms\"]{}",
653                    node_id, step_name, step_type, exec_time, style_class
654                )
655            };
656
657            diagram.push_str(&node_def);
658            diagram.push('\n');
659
660            // Connect to next step
661            if i < self.steps.len() - 1 {
662                diagram.push_str(&format!("    {} --> Step{}\n", node_id, i + 1));
663            }
664        }
665
666        // End node
667        if !self.steps.is_empty() {
668            let last_step = self.steps.len() - 1;
669            diagram.push_str(&format!("    Step{} --> End\n", last_step));
670        } else {
671            diagram.push_str("    Start --> End\n");
672        }
673
674        let end_style = match self.state {
675            WorkflowState::Completed => ":::successStyle",
676            WorkflowState::Failed => ":::failureStyle",
677            _ => "",
678        };
679        diagram.push_str(&format!("    End([End]){}\n", end_style));
680
681        // Add styling
682        diagram.push('\n');
683        diagram
684            .push_str("    classDef successStyle fill:#c8e6c9,stroke:#2e7d32,stroke-width:3px\n");
685        diagram
686            .push_str("    classDef failureStyle fill:#ffcdd2,stroke:#c62828,stroke-width:3px\n");
687
688        diagram
689    }
690}
691
692/// A single step record in workflow execution
693#[derive(Debug, Clone, Serialize, Deserialize)]
694pub struct WorkflowStepRecord {
695    pub step_index: usize,
696    pub step_name: String,
697    pub step_type: String,
698    pub input: JsonValue,
699    pub output: Option<JsonValue>,
700    pub execution_time_ms: Option<u64>,
701}