1use crate::dag::{EdgeType, WorkflowDag};
12use crate::error::Result;
13use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
18pub enum GraphFormat {
19 Dot,
21 Mermaid,
23 Json,
25 Svg,
27 Ascii,
29 PlantUml,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct VisualizationConfig {
36 pub format: GraphFormat,
38 pub show_status_colors: bool,
40 pub show_durations: bool,
42 pub show_dependencies: bool,
44 pub highlight_critical_path: bool,
46 pub direction: String,
48 pub show_edge_labels: bool,
50 pub show_descriptions: bool,
52 pub show_resources: bool,
54 pub custom_colors: HashMap<String, String>,
56 pub task_statuses: HashMap<String, TaskVisualStatus>,
58}
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
62pub enum TaskVisualStatus {
63 Pending,
65 Running,
67 Completed,
69 Failed,
71 Skipped,
73 Cancelled,
75}
76
77impl TaskVisualStatus {
78 fn dot_color(&self) -> &'static str {
80 match self {
81 Self::Pending => "#e0e0e0",
82 Self::Running => "#64b5f6",
83 Self::Completed => "#81c784",
84 Self::Failed => "#e57373",
85 Self::Skipped => "#fff176",
86 Self::Cancelled => "#bdbdbd",
87 }
88 }
89
90 fn svg_color(&self) -> &'static str {
92 match self {
93 Self::Pending => "#e0e0e0",
94 Self::Running => "#64b5f6",
95 Self::Completed => "#81c784",
96 Self::Failed => "#e57373",
97 Self::Skipped => "#fff176",
98 Self::Cancelled => "#bdbdbd",
99 }
100 }
101
102 fn mermaid_class(&self) -> &'static str {
104 match self {
105 Self::Pending => "pending",
106 Self::Running => "running",
107 Self::Completed => "completed",
108 Self::Failed => "failed",
109 Self::Skipped => "skipped",
110 Self::Cancelled => "cancelled",
111 }
112 }
113}
114
115impl Default for VisualizationConfig {
116 fn default() -> Self {
117 Self {
118 format: GraphFormat::Dot,
119 show_status_colors: true,
120 show_durations: true,
121 show_dependencies: true,
122 highlight_critical_path: false,
123 direction: "TB".to_string(),
124 show_edge_labels: true,
125 show_descriptions: false,
126 show_resources: false,
127 custom_colors: HashMap::new(),
128 task_statuses: HashMap::new(),
129 }
130 }
131}
132
133pub struct DagVisualizer {
135 config: VisualizationConfig,
136}
137
138impl DagVisualizer {
139 pub fn new() -> Self {
141 Self {
142 config: VisualizationConfig::default(),
143 }
144 }
145
146 pub fn with_config(config: VisualizationConfig) -> Self {
148 Self { config }
149 }
150
151 pub fn set_format(&mut self, format: GraphFormat) {
153 self.config.format = format;
154 }
155
156 pub fn set_task_statuses(&mut self, statuses: HashMap<String, TaskVisualStatus>) {
158 self.config.task_statuses = statuses;
159 }
160
161 pub fn set_task_status(&mut self, task_id: &str, status: TaskVisualStatus) {
163 self.config
164 .task_statuses
165 .insert(task_id.to_string(), status);
166 }
167
168 pub fn visualize(&self, dag: &WorkflowDag) -> Result<String> {
170 match self.config.format {
171 GraphFormat::Dot => self.to_dot(dag),
172 GraphFormat::Mermaid => self.to_mermaid(dag),
173 GraphFormat::Json => self.to_json(dag),
174 GraphFormat::Svg => self.to_svg(dag),
175 GraphFormat::Ascii => self.to_ascii(dag),
176 GraphFormat::PlantUml => self.to_plantuml(dag),
177 }
178 }
179
180 fn node_fill_color(&self, task_id: &str) -> String {
182 if let Some(color) = self.config.custom_colors.get(task_id) {
184 return color.clone();
185 }
186 if let Some(status) = self.config.task_statuses.get(task_id) {
188 return status.dot_color().to_string();
189 }
190 if self.config.show_status_colors {
192 "lightblue".to_string()
193 } else {
194 "white".to_string()
195 }
196 }
197
198 fn to_dot(&self, dag: &WorkflowDag) -> Result<String> {
202 let mut dot = String::from("digraph workflow {\n");
203 dot.push_str(&format!(" rankdir={};\n", self.config.direction));
204 dot.push_str(" node [shape=box, style=\"rounded,filled\", fontname=\"Helvetica\"];\n");
205 dot.push_str(" edge [fontname=\"Helvetica\", fontsize=10];\n\n");
206
207 for node in dag.tasks() {
209 let mut label_parts = vec![node.name.clone()];
210 if self.config.show_durations {
211 label_parts.push(format!("id: {}", node.id));
212 }
213 if self.config.show_descriptions {
214 if let Some(ref desc) = node.description {
215 label_parts.push(desc.clone());
216 }
217 }
218 if self.config.show_resources {
219 label_parts.push(format!(
220 "cpu: {:.1}, mem: {}MB",
221 node.resources.cpu_cores, node.resources.memory_mb
222 ));
223 }
224
225 let label = label_parts.join("\\n");
226 let color = self.node_fill_color(&node.id);
227
228 dot.push_str(&format!(
229 " \"{}\" [label=\"{}\", fillcolor=\"{}\"];\n",
230 node.id, label, color
231 ));
232 }
233
234 dot.push('\n');
235
236 if self.config.show_dependencies {
238 for (from_id, to_id, edge) in dag.edges() {
239 let edge_style = match edge.edge_type {
240 EdgeType::Data => "solid",
241 EdgeType::Control => "dashed",
242 EdgeType::Conditional => "dotted",
243 };
244
245 let edge_color = match edge.edge_type {
246 EdgeType::Data => "#2196F3",
247 EdgeType::Control => "#757575",
248 EdgeType::Conditional => "#FF9800",
249 };
250
251 let mut attrs = vec![
252 format!("style={}", edge_style),
253 format!("color=\"{}\"", edge_color),
254 ];
255
256 if self.config.show_edge_labels {
257 if let Some(ref condition) = edge.condition {
258 attrs.push(format!("label=\"{}\"", condition));
259 } else {
260 match edge.edge_type {
262 EdgeType::Data => attrs.push("label=\"data\"".to_string()),
263 EdgeType::Conditional => {
264 attrs.push("label=\"conditional\"".to_string())
265 }
266 EdgeType::Control => {} }
268 }
269 }
270
271 dot.push_str(&format!(
272 " \"{}\" -> \"{}\" [{}];\n",
273 from_id,
274 to_id,
275 attrs.join(", ")
276 ));
277 }
278 }
279
280 dot.push_str("}\n");
281
282 Ok(dot)
283 }
284
285 fn to_mermaid(&self, dag: &WorkflowDag) -> Result<String> {
289 let mut mermaid = format!("graph {}\n", self.config.direction);
290
291 for node in dag.tasks() {
293 let label = if self.config.show_durations {
294 format!("{}<br/>id: {}", node.name, node.id)
295 } else {
296 node.name.clone()
297 };
298
299 mermaid.push_str(&format!(" {}[\"{}\"]\n", node.id, label));
301 }
302
303 mermaid.push('\n');
304
305 if self.config.show_dependencies {
307 for (from_id, to_id, edge) in dag.edges() {
308 let arrow = match edge.edge_type {
309 EdgeType::Data => "-->",
310 EdgeType::Control => "-.->",
311 EdgeType::Conditional => "==>",
312 };
313
314 if self.config.show_edge_labels {
315 if let Some(ref condition) = edge.condition {
316 mermaid.push_str(&format!(
317 " {} {}|\"{}\"| {}\n",
318 from_id, arrow, condition, to_id
319 ));
320 } else {
321 match edge.edge_type {
322 EdgeType::Data => {
323 mermaid.push_str(&format!(
324 " {} {}|data| {}\n",
325 from_id, arrow, to_id
326 ));
327 }
328 _ => {
329 mermaid.push_str(&format!(" {} {} {}\n", from_id, arrow, to_id));
330 }
331 }
332 }
333 } else {
334 mermaid.push_str(&format!(" {} {} {}\n", from_id, arrow, to_id));
335 }
336 }
337 }
338
339 if !self.config.task_statuses.is_empty() {
341 mermaid.push('\n');
342 mermaid.push_str(" classDef pending fill:#e0e0e0,stroke:#9e9e9e\n");
344 mermaid.push_str(" classDef running fill:#64b5f6,stroke:#1976d2\n");
345 mermaid.push_str(" classDef completed fill:#81c784,stroke:#388e3c\n");
346 mermaid.push_str(" classDef failed fill:#e57373,stroke:#d32f2f\n");
347 mermaid.push_str(" classDef skipped fill:#fff176,stroke:#f9a825\n");
348 mermaid.push_str(" classDef cancelled fill:#bdbdbd,stroke:#616161\n");
349
350 for (task_id, status) in &self.config.task_statuses {
352 mermaid.push_str(&format!(" class {} {}\n", task_id, status.mermaid_class()));
353 }
354 }
355
356 Ok(mermaid)
357 }
358
359 fn to_json(&self, dag: &WorkflowDag) -> Result<String> {
363 #[derive(Serialize)]
364 struct JsonEdge {
365 from: String,
366 to: String,
367 edge_type: String,
368 #[serde(skip_serializing_if = "Option::is_none")]
369 condition: Option<String>,
370 }
371
372 #[derive(Serialize)]
373 struct JsonNode {
374 id: String,
375 name: String,
376 #[serde(skip_serializing_if = "Option::is_none")]
377 description: Option<String>,
378 dependencies: Vec<String>,
379 dependents: Vec<String>,
380 #[serde(skip_serializing_if = "Option::is_none")]
381 timeout_secs: Option<u64>,
382 #[serde(skip_serializing_if = "Option::is_none")]
383 status: Option<String>,
384 metadata: HashMap<String, String>,
385 }
386
387 #[derive(Serialize)]
388 struct JsonSummary {
389 node_count: usize,
390 edge_count: usize,
391 root_count: usize,
392 leaf_count: usize,
393 }
394
395 #[derive(Serialize)]
396 struct JsonGraph {
397 nodes: Vec<JsonNode>,
398 edges: Vec<JsonEdge>,
399 roots: Vec<String>,
400 leaves: Vec<String>,
401 summary: JsonSummary,
402 }
403
404 let nodes: Vec<JsonNode> = dag
405 .tasks()
406 .iter()
407 .map(|node| {
408 let status = self
409 .config
410 .task_statuses
411 .get(&node.id)
412 .map(|s| format!("{:?}", s));
413
414 JsonNode {
415 id: node.id.clone(),
416 name: node.name.clone(),
417 description: node.description.clone(),
418 dependencies: dag.get_dependencies(&node.id),
419 dependents: dag.get_dependents(&node.id),
420 timeout_secs: node.timeout_secs,
421 status,
422 metadata: node.metadata.clone(),
423 }
424 })
425 .collect();
426
427 let edges: Vec<JsonEdge> = dag
428 .edges()
429 .iter()
430 .map(|(from_id, to_id, edge)| {
431 let edge_type_str = match edge.edge_type {
432 EdgeType::Data => "data",
433 EdgeType::Control => "control",
434 EdgeType::Conditional => "conditional",
435 };
436 JsonEdge {
437 from: from_id.to_string(),
438 to: to_id.to_string(),
439 edge_type: edge_type_str.to_string(),
440 condition: edge.condition.clone(),
441 }
442 })
443 .collect();
444
445 let roots: Vec<String> = dag.root_tasks().iter().map(|t| t.id.clone()).collect();
446 let leaves: Vec<String> = dag.leaf_tasks().iter().map(|t| t.id.clone()).collect();
447
448 let dag_summary = dag.summary();
449 let summary = JsonSummary {
450 node_count: dag_summary.node_count,
451 edge_count: dag_summary.edge_count,
452 root_count: dag_summary.root_count,
453 leaf_count: dag_summary.leaf_count,
454 };
455
456 let graph = JsonGraph {
457 nodes,
458 edges,
459 roots,
460 leaves,
461 summary,
462 };
463
464 serde_json::to_string_pretty(&graph)
465 .map_err(|e| crate::error::WorkflowError::monitoring(format!("JSON error: {}", e)))
466 }
467
468 fn to_svg(&self, dag: &WorkflowDag) -> Result<String> {
475 let tasks = dag.tasks();
476 if tasks.is_empty() {
477 return Ok(
478 r#"<svg xmlns="http://www.w3.org/2000/svg" width="200" height="100">
479 <text x="100" y="50" text-anchor="middle" font-family="Helvetica" font-size="14">Empty DAG</text>
480</svg>"#
481 .to_string(),
482 );
483 }
484
485 let layers = crate::dag::create_execution_plan(dag)?;
487
488 let node_width: f64 = 160.0;
490 let node_height: f64 = 50.0;
491 let layer_gap: f64 = 80.0;
492 let node_gap: f64 = 40.0;
493 let padding: f64 = 40.0;
494
495 let mut positions: HashMap<String, (f64, f64)> = HashMap::new();
497
498 let max_layer_width = layers.iter().map(|layer| layer.len()).max().unwrap_or(1);
499
500 let canvas_width =
501 max_layer_width as f64 * (node_width + node_gap) - node_gap + 2.0 * padding;
502
503 for (layer_idx, layer) in layers.iter().enumerate() {
504 let layer_width = layer.len() as f64 * (node_width + node_gap) - node_gap;
505 let x_offset = (canvas_width - layer_width) / 2.0;
506
507 for (node_idx, task_id) in layer.iter().enumerate() {
508 let x = x_offset + node_idx as f64 * (node_width + node_gap);
509 let y = padding + layer_idx as f64 * (node_height + layer_gap);
510 positions.insert(task_id.clone(), (x, y));
511 }
512 }
513
514 let canvas_height =
515 layers.len() as f64 * (node_height + layer_gap) - layer_gap + 2.0 * padding;
516
517 let mut svg = format!(
518 r#"<svg xmlns="http://www.w3.org/2000/svg" width="{}" height="{}" viewBox="0 0 {} {}">"#,
519 canvas_width, canvas_height, canvas_width, canvas_height
520 );
521 svg.push('\n');
522
523 svg.push_str(r##" <defs>
525 <marker id="arrowhead" markerWidth="10" markerHeight="7" refX="10" refY="3.5" orient="auto">
526 <polygon points="0 0, 10 3.5, 0 7" fill="#757575"/>
527 </marker>
528 <marker id="arrowhead-data" markerWidth="10" markerHeight="7" refX="10" refY="3.5" orient="auto">
529 <polygon points="0 0, 10 3.5, 0 7" fill="#2196F3"/>
530 </marker>
531 <marker id="arrowhead-cond" markerWidth="10" markerHeight="7" refX="10" refY="3.5" orient="auto">
532 <polygon points="0 0, 10 3.5, 0 7" fill="#FF9800"/>
533 </marker>
534 </defs>
535"##);
536
537 svg.push_str(&format!(
539 " <rect width=\"{}\" height=\"{}\" fill=\"{}\" rx=\"8\"/>\n",
540 canvas_width, canvas_height, "#fafafa"
541 ));
542
543 if self.config.show_dependencies {
545 for (from_id, to_id, edge) in dag.edges() {
546 if let (Some(&(fx, fy)), Some(&(tx, ty))) =
547 (positions.get(from_id), positions.get(to_id))
548 {
549 let x1 = fx + node_width / 2.0;
550 let y1 = fy + node_height;
551 let x2 = tx + node_width / 2.0;
552 let y2 = ty;
553
554 let (stroke, dash, marker) = match edge.edge_type {
555 EdgeType::Data => ("#2196F3", "", "url(#arrowhead-data)"),
556 EdgeType::Control => {
557 ("#757575", "stroke-dasharray=\"6,3\"", "url(#arrowhead)")
558 }
559 EdgeType::Conditional => (
560 "#FF9800",
561 "stroke-dasharray=\"3,3\"",
562 "url(#arrowhead-cond)",
563 ),
564 };
565
566 let mid_y = (y1 + y2) / 2.0;
568 svg.push_str(&format!(
569 " <path d=\"M {:.1} {:.1} C {:.1} {:.1}, {:.1} {:.1}, {:.1} {:.1}\" \
570 fill=\"none\" stroke=\"{}\" stroke-width=\"1.5\" {} marker-end=\"{}\"/>\n",
571 x1, y1, x1, mid_y, x2, mid_y, x2, y2, stroke, dash, marker
572 ));
573
574 if self.config.show_edge_labels {
576 if let Some(ref condition) = edge.condition {
577 let label_x = (x1 + x2) / 2.0;
578 let label_y = mid_y - 6.0;
579 svg.push_str(&format!(
580 " <text x=\"{:.1}\" y=\"{:.1}\" text-anchor=\"middle\" \
581 font-family=\"Helvetica\" font-size=\"9\" fill=\"{}\">{}</text>\n",
582 label_x,
583 label_y,
584 stroke,
585 html_escape(condition)
586 ));
587 }
588 }
589 }
590 }
591 }
592
593 for node in dag.tasks() {
595 if let Some(&(x, y)) = positions.get(&node.id) {
596 let fill = if let Some(color) = self.config.custom_colors.get(&node.id) {
597 color.clone()
598 } else if let Some(status) = self.config.task_statuses.get(&node.id) {
599 status.svg_color().to_string()
600 } else {
601 "#e3f2fd".to_string()
602 };
603
604 let stroke_color = "#90caf9";
605 let text_color = "#212121";
606
607 svg.push_str(&format!(
609 " <rect x=\"{:.1}\" y=\"{:.1}\" width=\"{:.1}\" height=\"{:.1}\" \
610 rx=\"6\" ry=\"6\" fill=\"{}\" stroke=\"{}\" stroke-width=\"1.5\"/>\n",
611 x, y, node_width, node_height, fill, stroke_color
612 ));
613
614 let label_x = x + node_width / 2.0;
616 let label_y = y + node_height / 2.0 + 5.0;
617 svg.push_str(&format!(
618 " <text x=\"{:.1}\" y=\"{:.1}\" text-anchor=\"middle\" \
619 font-family=\"Helvetica\" font-size=\"12\" font-weight=\"500\" \
620 fill=\"{}\">{}</text>\n",
621 label_x,
622 label_y,
623 text_color,
624 html_escape(&node.name)
625 ));
626 }
627 }
628
629 svg.push_str("</svg>\n");
630
631 Ok(svg)
632 }
633
634 fn to_ascii(&self, dag: &WorkflowDag) -> Result<String> {
638 let tasks = dag.tasks();
639 if tasks.is_empty() {
640 return Ok("(empty DAG)\n".to_string());
641 }
642
643 let layers = crate::dag::create_execution_plan(dag)?;
645 let mut output = String::new();
646
647 output.push_str(&format!(
649 "Workflow DAG ({} tasks, {} edges)\n",
650 dag.task_count(),
651 dag.dependency_count()
652 ));
653 output.push_str(&"=".repeat(50));
654 output.push('\n');
655
656 for (layer_idx, layer) in layers.iter().enumerate() {
657 if layer_idx > 0 {
658 self.ascii_draw_connectors(&mut output, dag, &layers[layer_idx - 1], layer);
660 }
661
662 self.ascii_draw_layer(&mut output, dag, layer, layer_idx);
664 }
665
666 output.push('\n');
668 output.push_str(&"-".repeat(50));
669 output.push('\n');
670
671 let summary = dag.summary();
672 output.push_str(&format!(
673 "Roots: {} | Leaves: {} | Max fan-in: {} | Max fan-out: {}\n",
674 summary.root_count, summary.leaf_count, summary.max_in_degree, summary.max_out_degree
675 ));
676
677 if summary.data_edge_count > 0 || summary.conditional_edge_count > 0 {
678 output.push_str(&format!(
679 "Edge types: {} data, {} control, {} conditional\n",
680 summary.data_edge_count, summary.control_edge_count, summary.conditional_edge_count
681 ));
682 }
683
684 Ok(output)
685 }
686
687 fn ascii_draw_layer(
689 &self,
690 output: &mut String,
691 _dag: &WorkflowDag,
692 layer: &[String],
693 layer_idx: usize,
694 ) {
695 let labels: Vec<String> = layer
697 .iter()
698 .map(|id| {
699 let status_marker = self
700 .config
701 .task_statuses
702 .get(id)
703 .map(|s| match s {
704 TaskVisualStatus::Pending => " [.]",
705 TaskVisualStatus::Running => " [>]",
706 TaskVisualStatus::Completed => " [+]",
707 TaskVisualStatus::Failed => " [X]",
708 TaskVisualStatus::Skipped => " [-]",
709 TaskVisualStatus::Cancelled => " [!]",
710 })
711 .unwrap_or("");
712 format!("{}{}", id, status_marker)
713 })
714 .collect();
715
716 let max_label_width = labels.iter().map(|l| l.len()).max().unwrap_or(0);
717 let box_width = max_label_width + 4; output.push_str(&format!("Layer {}:\n", layer_idx));
721
722 let top_border: Vec<String> = labels
724 .iter()
725 .map(|_| format!("+{}+", "-".repeat(box_width)))
726 .collect();
727 output.push_str(&format!(" {}\n", top_border.join(" ")));
728
729 let content: Vec<String> = labels
730 .iter()
731 .map(|label| {
732 let pad = box_width - label.len();
733 let left_pad = pad / 2;
734 let right_pad = pad - left_pad;
735 format!(
736 "|{}{}{}|",
737 " ".repeat(left_pad),
738 label,
739 " ".repeat(right_pad)
740 )
741 })
742 .collect();
743 output.push_str(&format!(" {}\n", content.join(" ")));
744
745 let bottom_border: Vec<String> = labels
746 .iter()
747 .map(|_| format!("+{}+", "-".repeat(box_width)))
748 .collect();
749 output.push_str(&format!(" {}\n", bottom_border.join(" ")));
750 }
751
752 fn ascii_draw_connectors(
754 &self,
755 output: &mut String,
756 dag: &WorkflowDag,
757 prev_layer: &[String],
758 current_layer: &[String],
759 ) {
760 let mut has_connections = false;
762 for to_id in current_layer {
763 for from_id in prev_layer {
764 if dag.has_dependency(from_id, to_id) {
765 has_connections = true;
766 break;
767 }
768 }
769 if has_connections {
770 break;
771 }
772 }
773
774 if has_connections {
775 let mut connector_lines = Vec::new();
777 for to_id in current_layer {
778 let deps_in_prev: Vec<&str> = prev_layer
779 .iter()
780 .filter(|from_id| dag.has_dependency(from_id, to_id))
781 .map(|s| s.as_str())
782 .collect();
783
784 if !deps_in_prev.is_empty() {
785 let edge_info: Vec<String> = deps_in_prev
786 .iter()
787 .map(|from_id| {
788 let edge_type = dag
789 .get_edge_between(from_id, to_id)
790 .map(|e| match e.edge_type {
791 EdgeType::Data => "~~>",
792 EdgeType::Control => "-->",
793 EdgeType::Conditional => "==>",
794 })
795 .unwrap_or("-->");
796 format!(" {} {} {}", from_id, edge_type, to_id)
797 })
798 .collect();
799 connector_lines.extend(edge_info);
800 }
801 }
802
803 for line in connector_lines {
804 output.push_str(&format!("{}\n", line));
805 }
806 }
807 }
808
809 fn to_plantuml(&self, dag: &WorkflowDag) -> Result<String> {
813 let mut uml = String::from("@startuml\n");
814 uml.push_str("!theme plain\n");
815
816 let direction = match self.config.direction.as_str() {
818 "LR" => "left to right direction\n",
819 _ => "top to bottom direction\n",
820 };
821 uml.push_str(direction);
822 uml.push('\n');
823
824 uml.push_str("skinparam activity {\n");
826 uml.push_str(" BackgroundColor #e3f2fd\n");
827 uml.push_str(" BorderColor #90caf9\n");
828 uml.push_str(" FontName Helvetica\n");
829 uml.push_str("}\n\n");
830
831 for node in dag.tasks() {
833 let color = if let Some(status) = self.config.task_statuses.get(&node.id) {
834 status.dot_color().to_string()
835 } else {
836 "#e3f2fd".to_string()
837 };
838
839 let label = if self.config.show_descriptions {
840 if let Some(ref desc) = node.description {
841 format!("{}\\n{}", node.name, desc)
842 } else {
843 node.name.clone()
844 }
845 } else {
846 node.name.clone()
847 };
848
849 uml.push_str(&format!(
850 "rectangle \"{}\" as {} {}\n",
851 label, node.id, color
852 ));
853 }
854
855 uml.push('\n');
856
857 if self.config.show_dependencies {
859 for (from_id, to_id, edge) in dag.edges() {
860 let arrow = match edge.edge_type {
861 EdgeType::Data => "-->",
862 EdgeType::Control => "..>",
863 EdgeType::Conditional => "-[#FF9800]->",
864 };
865
866 if self.config.show_edge_labels {
867 if let Some(ref condition) = edge.condition {
868 uml.push_str(&format!(
869 "{} {} {} : {}\n",
870 from_id, arrow, to_id, condition
871 ));
872 } else {
873 match edge.edge_type {
874 EdgeType::Data => {
875 uml.push_str(&format!("{} {} {} : data\n", from_id, arrow, to_id));
876 }
877 _ => {
878 uml.push_str(&format!("{} {} {}\n", from_id, arrow, to_id));
879 }
880 }
881 }
882 } else {
883 uml.push_str(&format!("{} {} {}\n", from_id, arrow, to_id));
884 }
885 }
886 }
887
888 uml.push_str("\n@enduml\n");
889
890 Ok(uml)
891 }
892
893 pub fn visualize_timeline(
897 &self,
898 execution_history: &[crate::monitoring::TaskExecutionRecord],
899 ) -> Result<String> {
900 let mut timeline = String::from("# Execution Timeline\n\n");
901
902 for task in execution_history {
903 let duration = task
904 .duration
905 .map(|d| format!("{:.2}s", d.as_secs_f64()))
906 .unwrap_or_else(|| "N/A".to_string());
907
908 let status = format!("{:?}", task.status);
909
910 timeline.push_str(&format!(
911 "- {} [{}] Duration: {} Status: {}\n",
912 task.task_name, task.task_id, duration, status
913 ));
914 }
915
916 Ok(timeline)
917 }
918
919 pub fn generate_gantt_data(
921 &self,
922 execution_history: &[crate::monitoring::TaskExecutionRecord],
923 ) -> Result<Vec<GanttTask>> {
924 let mut tasks = Vec::new();
925
926 for (idx, task) in execution_history.iter().enumerate() {
927 let start_ms = task.start_time.timestamp_millis();
928 let end_ms = task
929 .end_time
930 .map(|t| t.timestamp_millis())
931 .unwrap_or(start_ms);
932
933 tasks.push(GanttTask {
934 id: task.task_id.clone(),
935 name: task.task_name.clone(),
936 start: start_ms,
937 end: end_ms,
938 duration_ms: (end_ms - start_ms) as u64,
939 row: idx,
940 status: format!("{:?}", task.status),
941 });
942 }
943
944 Ok(tasks)
945 }
946
947 pub fn generate_mermaid_gantt(
949 &self,
950 execution_history: &[crate::monitoring::TaskExecutionRecord],
951 ) -> Result<String> {
952 let mut gantt = String::from("gantt\n");
953 gantt.push_str(" title Workflow Execution Timeline\n");
954 gantt.push_str(" dateFormat x\n");
955 gantt.push_str(" axisFormat %H:%M:%S\n\n");
956
957 if execution_history.is_empty() {
958 return Ok(gantt);
959 }
960
961 let base_time = execution_history
963 .iter()
964 .map(|t| t.start_time.timestamp_millis())
965 .min()
966 .unwrap_or(0);
967
968 for task in execution_history {
969 let start_offset = task.start_time.timestamp_millis() - base_time;
970 let duration_ms = task.duration.map(|d| d.as_millis() as i64).unwrap_or(1000);
971
972 let status_tag = match task.status {
973 crate::monitoring::TaskExecutionStatus::Success => "",
974 crate::monitoring::TaskExecutionStatus::Failed => "crit, ",
975 crate::monitoring::TaskExecutionStatus::Running => "active, ",
976 _ => "",
977 };
978
979 gantt.push_str(&format!(
980 " {} : {}{}, {}ms\n",
981 task.task_name, status_tag, start_offset, duration_ms
982 ));
983 }
984
985 Ok(gantt)
986 }
987
988 pub fn generate_html_visualization(&self, dag: &WorkflowDag) -> Result<String> {
992 let dot = self.to_dot(dag)?;
993
994 let mermaid_viz = {
996 let viz = self.clone_with_format(GraphFormat::Mermaid);
997 viz.visualize(dag)?
998 };
999
1000 let html = format!(
1001 r#"<!DOCTYPE html>
1002<html>
1003<head>
1004 <title>Workflow Visualization</title>
1005 <meta charset="utf-8"/>
1006 <script src="https://cdn.jsdelivr.net/npm/mermaid/dist/mermaid.min.js"></script>
1007 <style>
1008 body {{
1009 font-family: 'Helvetica Neue', Arial, sans-serif;
1010 margin: 20px;
1011 background: #fafafa;
1012 color: #212121;
1013 }}
1014 h1, h2 {{
1015 color: #1565c0;
1016 }}
1017 .container {{
1018 max-width: 1200px;
1019 margin: 0 auto;
1020 }}
1021 pre {{
1022 background: #f5f5f5;
1023 padding: 15px;
1024 border-radius: 8px;
1025 overflow-x: auto;
1026 border: 1px solid #e0e0e0;
1027 }}
1028 .mermaid {{
1029 text-align: center;
1030 padding: 20px;
1031 background: white;
1032 border-radius: 8px;
1033 border: 1px solid #e0e0e0;
1034 margin: 15px 0;
1035 }}
1036 .tab-container {{
1037 display: flex;
1038 gap: 0;
1039 border-bottom: 2px solid #1565c0;
1040 margin-top: 20px;
1041 }}
1042 .tab {{
1043 padding: 10px 20px;
1044 cursor: pointer;
1045 background: #e3f2fd;
1046 border: 1px solid #90caf9;
1047 border-bottom: none;
1048 border-radius: 8px 8px 0 0;
1049 }}
1050 .tab.active {{
1051 background: white;
1052 font-weight: bold;
1053 }}
1054 .tab-content {{
1055 display: none;
1056 padding: 15px;
1057 background: white;
1058 border: 1px solid #e0e0e0;
1059 border-top: none;
1060 }}
1061 .tab-content.active {{
1062 display: block;
1063 }}
1064 </style>
1065</head>
1066<body>
1067 <div class="container">
1068 <h1>Workflow DAG</h1>
1069
1070 <div class="tab-container">
1071 <div class="tab active" onclick="showTab('mermaid')">Mermaid</div>
1072 <div class="tab" onclick="showTab('dot')">DOT (Graphviz)</div>
1073 </div>
1074
1075 <div id="mermaid" class="tab-content active">
1076 <div class="mermaid">
1077{}
1078 </div>
1079 </div>
1080
1081 <div id="dot" class="tab-content">
1082 <pre>{}</pre>
1083 <p>Render this DOT graph at <a href="https://dreampuf.github.io/GraphvizOnline/" target="_blank">Graphviz Online</a></p>
1084 </div>
1085 </div>
1086
1087 <script>
1088 mermaid.initialize({{ startOnLoad: true, theme: 'default' }});
1089
1090 function showTab(tabId) {{
1091 document.querySelectorAll('.tab-content').forEach(c => c.classList.remove('active'));
1092 document.querySelectorAll('.tab').forEach(t => t.classList.remove('active'));
1093 document.getElementById(tabId).classList.add('active');
1094 event.target.classList.add('active');
1095 }}
1096 </script>
1097</body>
1098</html>"#,
1099 html_escape(&mermaid_viz),
1100 html_escape(&dot)
1101 );
1102
1103 Ok(html)
1104 }
1105
1106 fn clone_with_format(&self, format: GraphFormat) -> Self {
1108 let mut config = self.config.clone();
1109 config.format = format;
1110 Self { config }
1111 }
1112}
1113
1114impl Default for DagVisualizer {
1115 fn default() -> Self {
1116 Self::new()
1117 }
1118}
1119
1120#[derive(Debug, Clone, Serialize, Deserialize)]
1122pub struct GanttTask {
1123 pub id: String,
1125 pub name: String,
1127 pub start: i64,
1129 pub end: i64,
1131 pub duration_ms: u64,
1133 pub row: usize,
1135 pub status: String,
1137}
1138
1139fn html_escape(input: &str) -> String {
1141 input
1142 .replace('&', "&")
1143 .replace('<', "<")
1144 .replace('>', ">")
1145 .replace('"', """)
1146 .replace('\'', "'")
1147}
1148
1149#[derive(Debug, Clone, Serialize, Deserialize)]
1151pub struct ExecutionVisualization {
1152 pub workflow_id: String,
1154 pub execution_id: String,
1156 pub dag_structure: String,
1158 pub timeline: Vec<GanttTask>,
1160 pub statistics: HashMap<String, serde_json::Value>,
1162}
1163
1164#[cfg(test)]
1165mod tests {
1166 use super::*;
1167 use crate::dag::{EdgeType, ResourceRequirements, RetryPolicy, TaskEdge, TaskNode};
1168
1169 fn create_test_task(id: &str, name: &str) -> TaskNode {
1170 TaskNode {
1171 id: id.to_string(),
1172 name: name.to_string(),
1173 description: None,
1174 config: serde_json::json!({}),
1175 retry: RetryPolicy::default(),
1176 timeout_secs: Some(60),
1177 resources: ResourceRequirements::default(),
1178 metadata: HashMap::new(),
1179 }
1180 }
1181
1182 fn create_test_dag() -> WorkflowDag {
1183 let mut dag = WorkflowDag::new();
1184 dag.add_task(create_test_task("ingest", "Data Ingestion"))
1185 .expect("Failed to add task");
1186 dag.add_task(create_test_task("validate", "Validation"))
1187 .expect("Failed to add task");
1188 dag.add_task(create_test_task("transform", "Transform"))
1189 .expect("Failed to add task");
1190 dag.add_task(create_test_task("output", "Output"))
1191 .expect("Failed to add task");
1192
1193 dag.add_dependency(
1194 "ingest",
1195 "validate",
1196 TaskEdge {
1197 edge_type: EdgeType::Data,
1198 condition: None,
1199 },
1200 )
1201 .expect("Failed to add dependency");
1202 dag.add_dependency(
1203 "validate",
1204 "transform",
1205 TaskEdge {
1206 edge_type: EdgeType::Control,
1207 condition: None,
1208 },
1209 )
1210 .expect("Failed to add dependency");
1211 dag.add_dependency(
1212 "validate",
1213 "output",
1214 TaskEdge {
1215 edge_type: EdgeType::Conditional,
1216 condition: Some("skip_transform".to_string()),
1217 },
1218 )
1219 .expect("Failed to add dependency");
1220 dag.add_dependency(
1221 "transform",
1222 "output",
1223 TaskEdge {
1224 edge_type: EdgeType::Data,
1225 condition: None,
1226 },
1227 )
1228 .expect("Failed to add dependency");
1229
1230 dag
1231 }
1232
1233 #[test]
1234 fn test_visualizer_creation() {
1235 let visualizer = DagVisualizer::new();
1236 assert_eq!(visualizer.config.format, GraphFormat::Dot);
1237 }
1238
1239 #[test]
1240 fn test_dot_generation() {
1241 let visualizer = DagVisualizer::new();
1242 let mut dag = WorkflowDag::new();
1243
1244 dag.add_task(create_test_task("task1", "Task 1"))
1245 .expect("Failed to add task");
1246 dag.add_task(create_test_task("task2", "Task 2"))
1247 .expect("Failed to add task");
1248 dag.add_dependency("task1", "task2", TaskEdge::default())
1249 .expect("Failed to add dependency");
1250
1251 let dot = visualizer.visualize(&dag).expect("Failed to generate DOT");
1252 assert!(dot.contains("digraph workflow"));
1253 assert!(dot.contains("task1"));
1254 assert!(dot.contains("task2"));
1255 assert!(dot.contains("->"));
1256 }
1257
1258 #[test]
1259 fn test_dot_with_edge_types() {
1260 let visualizer = DagVisualizer::new();
1261 let dag = create_test_dag();
1262
1263 let dot = visualizer.visualize(&dag).expect("Failed to generate DOT");
1264 assert!(dot.contains("style=solid")); assert!(dot.contains("style=dashed")); assert!(dot.contains("style=dotted")); assert!(dot.contains("skip_transform")); }
1269
1270 #[test]
1271 fn test_mermaid_generation() {
1272 let mut visualizer = DagVisualizer::new();
1273 visualizer.set_format(GraphFormat::Mermaid);
1274
1275 let dag = create_test_dag();
1276 let mermaid = visualizer
1277 .visualize(&dag)
1278 .expect("Failed to generate Mermaid");
1279
1280 assert!(mermaid.contains("graph"));
1281 assert!(mermaid.contains("ingest"));
1282 assert!(mermaid.contains("validate"));
1283 assert!(mermaid.contains("transform"));
1284 assert!(mermaid.contains("output"));
1285 assert!(mermaid.contains("-->")); assert!(mermaid.contains("-.->") || mermaid.contains("==>"));
1288 }
1289
1290 #[test]
1291 fn test_mermaid_with_statuses() {
1292 let mut visualizer = DagVisualizer::new();
1293 visualizer.set_format(GraphFormat::Mermaid);
1294 visualizer.set_task_status("ingest", TaskVisualStatus::Completed);
1295 visualizer.set_task_status("validate", TaskVisualStatus::Running);
1296
1297 let dag = create_test_dag();
1298 let mermaid = visualizer
1299 .visualize(&dag)
1300 .expect("Failed to generate Mermaid");
1301
1302 assert!(mermaid.contains("classDef completed"));
1303 assert!(mermaid.contains("classDef running"));
1304 assert!(mermaid.contains("class ingest completed"));
1305 assert!(mermaid.contains("class validate running"));
1306 }
1307
1308 #[test]
1309 fn test_json_generation() {
1310 let mut visualizer = DagVisualizer::new();
1311 visualizer.set_format(GraphFormat::Json);
1312
1313 let dag = create_test_dag();
1314
1315 let json = visualizer.visualize(&dag).expect("Failed to generate JSON");
1316
1317 let parsed: serde_json::Value = serde_json::from_str(&json).expect("Invalid JSON output");
1319
1320 let nodes = parsed["nodes"].as_array().expect("nodes should be array");
1322 assert_eq!(nodes.len(), 4);
1323
1324 let edges = parsed["edges"].as_array().expect("edges should be array");
1326 assert_eq!(edges.len(), 4);
1327
1328 let validate_node = nodes
1330 .iter()
1331 .find(|n| n["id"] == "validate")
1332 .expect("validate node should exist");
1333 let validate_deps = validate_node["dependencies"]
1334 .as_array()
1335 .expect("dependencies should be array");
1336 assert_eq!(validate_deps.len(), 1);
1337 assert_eq!(validate_deps[0], "ingest");
1338
1339 let data_edge = edges
1341 .iter()
1342 .find(|e| e["from"] == "ingest" && e["to"] == "validate")
1343 .expect("data edge should exist");
1344 assert_eq!(data_edge["edge_type"], "data");
1345
1346 let roots = parsed["roots"].as_array().expect("roots should be array");
1348 assert_eq!(roots.len(), 1);
1349 assert_eq!(roots[0], "ingest");
1350
1351 let leaves = parsed["leaves"].as_array().expect("leaves should be array");
1352 assert_eq!(leaves.len(), 1);
1353 assert_eq!(leaves[0], "output");
1354
1355 assert_eq!(parsed["summary"]["node_count"], 4);
1357 assert_eq!(parsed["summary"]["edge_count"], 4);
1358 }
1359
1360 #[test]
1361 fn test_svg_generation() {
1362 let mut visualizer = DagVisualizer::new();
1363 visualizer.set_format(GraphFormat::Svg);
1364
1365 let dag = create_test_dag();
1366
1367 let svg = visualizer.visualize(&dag).expect("Failed to generate SVG");
1368 assert!(svg.contains("<svg"));
1369 assert!(svg.contains("</svg>"));
1370 assert!(svg.contains("<rect")); assert!(svg.contains("<text")); assert!(svg.contains("<path")); assert!(svg.contains("arrowhead")); assert!(svg.contains("Data Ingestion")); }
1376
1377 #[test]
1378 fn test_svg_empty_dag() {
1379 let visualizer = DagVisualizer::with_config(VisualizationConfig {
1380 format: GraphFormat::Svg,
1381 ..Default::default()
1382 });
1383
1384 let dag = WorkflowDag::new();
1385 let svg = visualizer.visualize(&dag).expect("Failed to generate SVG");
1386 assert!(svg.contains("Empty DAG"));
1387 }
1388
1389 #[test]
1390 fn test_ascii_generation() {
1391 let mut visualizer = DagVisualizer::new();
1392 visualizer.set_format(GraphFormat::Ascii);
1393
1394 let dag = create_test_dag();
1395
1396 let ascii = visualizer
1397 .visualize(&dag)
1398 .expect("Failed to generate ASCII");
1399
1400 assert!(ascii.contains("Workflow DAG"));
1401 assert!(ascii.contains("Layer 0"));
1402 assert!(ascii.contains("ingest"));
1403 assert!(ascii.contains("Roots:"));
1404 assert!(ascii.contains("Leaves:"));
1405 }
1406
1407 #[test]
1408 fn test_ascii_with_statuses() {
1409 let mut visualizer = DagVisualizer::new();
1410 visualizer.set_format(GraphFormat::Ascii);
1411 visualizer.set_task_status("ingest", TaskVisualStatus::Completed);
1412 visualizer.set_task_status("validate", TaskVisualStatus::Failed);
1413
1414 let dag = create_test_dag();
1415 let ascii = visualizer
1416 .visualize(&dag)
1417 .expect("Failed to generate ASCII");
1418
1419 assert!(ascii.contains("[+]")); assert!(ascii.contains("[X]")); }
1422
1423 #[test]
1424 fn test_plantuml_generation() {
1425 let mut visualizer = DagVisualizer::new();
1426 visualizer.set_format(GraphFormat::PlantUml);
1427
1428 let dag = create_test_dag();
1429
1430 let uml = visualizer
1431 .visualize(&dag)
1432 .expect("Failed to generate PlantUML");
1433
1434 assert!(uml.contains("@startuml"));
1435 assert!(uml.contains("@enduml"));
1436 assert!(uml.contains("rectangle"));
1437 assert!(uml.contains("ingest"));
1438 assert!(uml.contains("-->")); assert!(uml.contains("..>")); }
1441
1442 #[test]
1443 fn test_html_escape() {
1444 assert_eq!(html_escape("<test>"), "<test>");
1445 assert_eq!(html_escape("a & b"), "a & b");
1446 }
1447
1448 #[test]
1449 fn test_html_generation() {
1450 let visualizer = DagVisualizer::new();
1451 let mut dag = WorkflowDag::new();
1452 dag.add_task(create_test_task("task1", "Task 1"))
1453 .expect("Failed to add task");
1454
1455 let html = visualizer
1456 .generate_html_visualization(&dag)
1457 .expect("Failed to generate HTML");
1458
1459 assert!(html.contains("<!DOCTYPE html>"));
1460 assert!(html.contains("mermaid"));
1461 assert!(html.contains("digraph"));
1462 }
1463
1464 #[test]
1465 fn test_format_switching() {
1466 let _visualizer = DagVisualizer::new();
1467 let mut dag = WorkflowDag::new();
1468 dag.add_task(create_test_task("t1", "Task 1"))
1469 .expect("Failed to add task");
1470 dag.add_task(create_test_task("t2", "Task 2"))
1471 .expect("Failed to add task");
1472 dag.add_dependency("t1", "t2", TaskEdge::default())
1473 .expect("Failed to add dependency");
1474
1475 for format in &[
1477 GraphFormat::Dot,
1478 GraphFormat::Mermaid,
1479 GraphFormat::Json,
1480 GraphFormat::Svg,
1481 GraphFormat::Ascii,
1482 GraphFormat::PlantUml,
1483 ] {
1484 let vis = DagVisualizer::with_config(VisualizationConfig {
1485 format: *format,
1486 ..Default::default()
1487 });
1488 let result = vis.visualize(&dag);
1489 assert!(
1490 result.is_ok(),
1491 "Format {:?} failed to produce output",
1492 format
1493 );
1494 let output = result.expect("Failed to visualize");
1495 assert!(
1496 !output.is_empty(),
1497 "Format {:?} produced empty output",
1498 format
1499 );
1500 }
1501 }
1502
1503 #[test]
1504 fn test_custom_colors() {
1505 let visualizer = DagVisualizer::with_config(VisualizationConfig {
1506 format: GraphFormat::Dot,
1507 custom_colors: {
1508 let mut m = HashMap::new();
1509 m.insert("task1".to_string(), "#ff0000".to_string());
1510 m
1511 },
1512 ..Default::default()
1513 });
1514
1515 let mut dag = WorkflowDag::new();
1516 dag.add_task(create_test_task("task1", "Task 1"))
1517 .expect("Failed to add task");
1518
1519 let dot = visualizer.visualize(&dag).expect("Failed to generate DOT");
1520 assert!(dot.contains("#ff0000"));
1521 }
1522
1523 #[test]
1524 fn test_ascii_empty_dag() {
1525 let mut visualizer = DagVisualizer::new();
1526 visualizer.set_format(GraphFormat::Ascii);
1527
1528 let dag = WorkflowDag::new();
1529 let ascii = visualizer
1530 .visualize(&dag)
1531 .expect("Failed to generate ASCII");
1532 assert!(ascii.contains("empty DAG"));
1533 }
1534
1535 #[test]
1536 fn test_json_with_statuses() {
1537 let mut visualizer = DagVisualizer::new();
1538 visualizer.set_format(GraphFormat::Json);
1539 visualizer.set_task_status("task1", TaskVisualStatus::Completed);
1540
1541 let mut dag = WorkflowDag::new();
1542 dag.add_task(create_test_task("task1", "Task 1"))
1543 .expect("Failed to add task");
1544
1545 let json = visualizer.visualize(&dag).expect("Failed to generate JSON");
1546 let parsed: serde_json::Value = serde_json::from_str(&json).expect("Invalid JSON");
1547
1548 let node = &parsed["nodes"][0];
1549 assert!(node["status"].is_string());
1550 }
1551}